You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/02/20 10:33:42 UTC

[GitHub] [rocketmq] Cczzzz opened a new issue #1781: about async send retry

Cczzzz opened a new issue #1781: about async send retry
URL: https://github.com/apache/rocketmq/issues/1781
 
 
   retryTimesWhenSendAsyncFailed  this field in DefaultMQProducer is  no effective。
   ```
        producer.send(msg, new SendCallback() {
                       @Override
                       public void onSuccess(SendResult sendResult) {
                           countDownLatch.countDown();
                           System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                       }
   
                       @Override
                       public void onException(Throwable e) {
                           countDownLatch.countDown();
                           System.out.printf("%-10d Exception %s %n", index, e);
                           e.printStackTrace();
                       }
                   });
   ```
   Finally enter MQClientAPIImpl#sendMessageAsync ,If something goes Exception。
   Will call  onExceptionImpl(), but timeoutMillis always is 0.
   
   onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                               retryTimesWhenSendFailed, times, ex, context, true, producer);
   ```
       private void onExceptionImpl(final String brokerName,
                                    final Message msg,
                                    final long timeoutMillis,
                                    final RemotingCommand request,
                                    final SendCallback sendCallback,
                                    final TopicPublishInfo topicPublishInfo,
                                    final MQClientInstance instance,
                                    final int timesTotal,
                                    final AtomicInteger curTimes,
                                    final Exception e,
                                    final SendMessageContext context,
                                    final boolean needRetry,
                                    final DefaultMQProducerImpl producer
       )
   ```
   Will cause the send to fail in this ,invokeAsync timeoutMillis is 0.
   ```
   NettyRemotingClient#invokeAsync 
    @Override
       public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
           throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
           RemotingSendRequestException {
           long beginStartTime = System.currentTimeMillis();
           final Channel channel = this.getAndCreateChannel(addr);
           if (channel != null && channel.isActive()) {
               try {
                   doBeforeRpcHooks(addr, request);
                   long costTime = System.currentTimeMillis() - beginStartTime;
                   if (timeoutMillis < costTime) {
                       throw new RemotingTooMuchRequestException("invokeAsync call timeout");
                   }
                   this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
               } catch (RemotingSendRequestException e) {
                   log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                   this.closeChannel(addr, channel);
                   throw e;
               }
           } else {
               this.closeChannel(addr, channel);
               throw new RemotingConnectException(addr);
           }
       }
   ```
   And the requestId will be reset when retrying, but there will be duplicate requestId and the callback function will not be executed
   ```.
    private volatile int opaque = requestId.getAndIncrement();
      public static int createNewRequestId() {
           return requestId.incrementAndGet();
       }
   ```
   incrementAndGet and getAndIncrement  will cause duplicate id.
    I try to fix it if this is a bug。
   ····································
   关于异步发送重试的问题
   异步发送的重试次数并没有生效,因为在 方法中  MQClientAPIImpl#sendMessageAsync 发送失败后调用时 timeoutMillis 是0 ,导致后面超时判断直接判定超时,第一次重试就失败。
   onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                               retryTimesWhenSendFailed, times, ex, context, true, producer);
   然后,在重试时会重置请求id,但是重置的方法和创建request时分配请求id的方法会导致出现反复id。incrementAndGet 和 getAndIncrement   一起使用了,导致其他请求的响应影响的异步发送的响应,导致回调函数没有执行,重试中断。
   不知道这是不是一个bug,如果是,我修复了它,希望可以提交rp
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq] Cczzzz closed issue #1781: about async send retry

Posted by GitBox <gi...@apache.org>.
Cczzzz closed issue #1781: about async send retry
URL: https://github.com/apache/rocketmq/issues/1781
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq] duhenglucky edited a comment on issue #1781: about async send retry

Posted by GitBox <gi...@apache.org>.
duhenglucky edited a comment on issue #1781: about async send retry
URL: https://github.com/apache/rocketmq/issues/1781#issuecomment-591470583
 
 
   @Cczzzz The implementation of asynchronous sending is a bit messy, including not only the timeout parameter of 0, the acquisition of the requestId, but also the retry policy is not very appropriate. Improvements are welcome, but it seems to require more than one PR to do the job :), so it would be nice if you can keep making improvements in this place.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq] duhenglucky commented on issue #1781: about async send retry

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on issue #1781: about async send retry
URL: https://github.com/apache/rocketmq/issues/1781#issuecomment-591470583
 
 
   @Cczzzz This is not a very well designed place, including not only the timeout parameter of 0, the acquisition of the requestId, but also the retry policy is not very appropriate. Improvements are welcome, but it seems to require more than one PR to do the job :), so it would be nice if you keep making improvements in this place.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services