You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/06/06 03:38:23 UTC

[03/51] [abbrv] incubator-rocketmq git commit: Fix possible NullPointerException when retry in send Async way

Fix possible NullPointerException when retry in send Async way


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/02acf1a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/02acf1a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/02acf1a0

Branch: refs/heads/master
Commit: 02acf1a074289cb46909f00e88c86c52d356523b
Parents: 47fad3c
Author: Jaskey <li...@gmail.com>
Authored: Wed Feb 15 22:17:47 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/client/impl/MQClientAPIImpl.java   | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/02acf1a0/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index bdce883..6119e24 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -438,34 +438,40 @@ public class MQClientAPIImpl {
     ) {
         int tmp = curTimes.incrementAndGet();
         if (needRetry && tmp <= timesTotal) {
-            MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
-            String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
+            String retryBrokerName = brokerName;//by default, it will send to the same broker
+            if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
+                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
+                retryBrokerName = mqChosen.getBrokerName();
+            }
+            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
             log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
-                tmpmq.getBrokerName());
+                retryBrokerName);
             try {
                 request.setOpaque(RemotingCommand.createNewRequestId());
-                sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
+                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                     timesTotal, curTimes, context, producer);
             } catch (InterruptedException e1) {
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                     context, false, producer);
             } catch (RemotingConnectException e1) {
                 producer.updateFaultItem(brokerName, 3000, true);
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                     context, true, producer);
             } catch (RemotingTooMuchRequestException e1) {
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                     context, false, producer);
             } catch (RemotingException e1) {
                 producer.updateFaultItem(brokerName, 3000, true);
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                     context, true, producer);
             }
         } else {
+
             if (context != null) {
                 context.setException(e);
                 context.getProducer().executeSendMessageHookAfter(context);
             }
+
             try {
                 sendCallback.onException(e);
             } catch (Exception ignored) {