You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/11/30 09:14:55 UTC

[GitHub] [rocketmq] lwclover opened a new issue #3556: When broker is down, rocketmq client can not retry under Async send model

lwclover opened a new issue #3556:
URL: https://github.com/apache/rocketmq/issues/3556


   RocketMQ client updates topic route infomations every 30 seconds,
   When broker is down,Rocketmq client can not connect to the broker. Sending async message can not retry,Throwing a RemotingConnectException.
   
   The RemotingConnectException below:
   ```
   org.apache.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [5]ms, Topic: SSP_VC_COMMAND_CFG_PULL_PUSH_RESPOND_RECORD, BrokersSent: [broker-ssp-11]
   See http://rocketmq.apache.org/docs/faq/ for further details.
   	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:610)
   	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:87)
   	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$2.run(DefaultMQProducerImpl.java:467)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <x.x.x.x10911> failed
   	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:540)
   	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:375)
   	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage$original$hWukCcmf(MQClientAPIImpl.java:332)
   	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage$original$hWukCcmf$accessor$wRKVBfSV(MQClientAPIImpl.java)
   	at org.apache.rocketmq.client.impl.MQClientAPIImpl$auxiliary$5onkBFUr.call(Unknown Source)
   	at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
   	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java)
   	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:765)
   	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:529)
   	... 7 common frames omitted
   ```
   
   The RemotingConnectException caused by:  throw new RemotingConnectException(addr);
   ```
       @Override
       public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
           throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
           RemotingSendRequestException {
           long beginStartTime = System.currentTimeMillis();
           final Channel channel = this.getAndCreateChannel(addr);
           if (channel != null && channel.isActive()) {
               try {
                   doBeforeRpcHooks(addr, request);
                   long costTime = System.currentTimeMillis() - beginStartTime;
                   if (timeoutMillis < costTime) {
                       throw new RemotingTooMuchRequestException("invokeAsync call timeout");
                   }
                   this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
               } catch (RemotingSendRequestException e) {
                   log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                   this.closeChannel(addr, channel);
                   throw e;
               }
           } else {
               this.closeChannel(addr, channel);
               throw new RemotingConnectException(addr);
           }
       }
   ```
    CommunicationMode.Async retry times is 1, when throws RemotingConnectException can not retry.
   ```
   private SendResult sendDefaultImpl(
           Message msg,
           final CommunicationMode communicationMode,
           final SendCallback sendCallback,
           final long timeout
       ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
           this.makeSureStateOK();
           Validators.checkMessage(msg, this.defaultMQProducer);
           final long invokeID = random.nextLong();
           long beginTimestampFirst = System.currentTimeMillis();
           long beginTimestampPrev = beginTimestampFirst;
           long endTimestamp = beginTimestampFirst;
           TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
           if (topicPublishInfo != null && topicPublishInfo.ok()) {
               boolean callTimeout = false;
               MessageQueue mq = null;
               Exception exception = null;
               SendResult sendResult = null;
               int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
               int times = 0;
               String[] brokersSent = new String[timesTotal];
               for (; times < timesTotal; times++) {
                   String lastBrokerName = null == mq ? null : mq.getBrokerName();
                   MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                   if (mqSelected != null) {
                       mq = mqSelected;
                       brokersSent[times] = mq.getBrokerName();
                       try {
                           beginTimestampPrev = System.currentTimeMillis();
                           if (times > 0) {
                               //Reset topic with namespace during resend.
                               msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                           }
                           long costTime = beginTimestampPrev - beginTimestampFirst;
                           if (timeout < costTime) {
                               callTimeout = true;
                               break;
                           }
   
                           sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                           switch (communicationMode) {
                               case ASYNC:
                                   return null;
                               case ONEWAY:
                                   return null;
                               case SYNC:
                                   if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                       if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                           continue;
                                       }
                                   }
   
                                   return sendResult;
                               default:
                                   break;
                           }
                       } catch (RemotingException | MQClientException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                           log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
                           exception = e;
                           continue;
                       } catch (MQBrokerException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                           log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
                           exception = e;
                           if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                               continue;
                           } else {
                               if (sendResult != null) {
                                   return sendResult;
                               }
   
                               throw e;
                           }
                       } catch (InterruptedException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                           log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
   
                           log.warn("sendKernelImpl exception", e);
                           log.warn(msg.toString());
                           throw e;
                       }
                   } else {
                       break;
                   }
               }
   
               if (sendResult != null) {
                   return sendResult;
               }
   
               String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                   times,
                   System.currentTimeMillis() - beginTimestampFirst,
                   msg.getTopic(),
                   Arrays.toString(brokersSent));
   
               info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
   
               MQClientException mqClientException = new MQClientException(info, exception);
               if (callTimeout) {
                   throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
               }
   
               if (exception instanceof MQBrokerException) {
                   mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
               } else if (exception instanceof RemotingConnectException) {
                   mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
               } else if (exception instanceof RemotingTimeoutException) {
                   mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
               } else if (exception instanceof MQClientException) {
                   mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
               }
   
               throw mqClientException;
           }
   
           validateNameServerSetting();
   
           throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
               null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] odbozhou closed issue #3556: When broker is down, rocketmq client can not retry under Async send model

Posted by GitBox <gi...@apache.org>.
odbozhou closed issue #3556:
URL: https://github.com/apache/rocketmq/issues/3556


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] odbozhou commented on issue #3556: When broker is down, rocketmq client can not retry under Async send model

Posted by GitBox <gi...@apache.org>.
odbozhou commented on issue #3556:
URL: https://github.com/apache/rocketmq/issues/3556#issuecomment-1012796923


   merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org