You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/19 09:30:47 UTC

[rocketmq] branch snode updated: Polish push message process in client(add broker name)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 3c264c8  Polish push message process in client(add broker name)
3c264c8 is described below

commit 3c264c8f2cf9c9672a26e5b9967dcd10f2f23fba
Author: ShannonDing <li...@163.com>
AuthorDate: Tue Feb 19 17:30:24 2019 +0800

    Polish push message process in client(add broker name)
---
 .../client/impl/ClientRemotingProcessor.java       |  2 +-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 36 +++++++++++++---------
 .../client/impl/factory/MQClientInstance.java      |  6 +++-
 3 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index c8c3919..f3cfff7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -218,12 +218,12 @@ public class ClientRemotingProcessor implements RequestProcessor {
         final PushMessageHeader requestHeader =
             (PushMessageHeader) request
                 .decodeCommandCustomHeader(PushMessageHeader.class);
-
         final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
         boolean result =
             this.mqClientFactory.processSnodePushMessage(msg,
                 requestHeader.getConsumerGroup(),
                 requestHeader.getTopic(),
+                requestHeader.getEnodeName(),
                 requestHeader.getQueueId(),
                 requestHeader.getQueueOffset());
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 000d569..f5251f5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -308,12 +308,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             public void onSuccess(PullResult pullResult) {
                 if (pullResult != null) {
                     //Update local offset according remote offset
-                    String localOffsetKey = pullRequest.getConsumerGroup()
-                        + "@" + pullRequest.getMessageQueue().getTopic()
-                        + "@" + pullRequest.getMessageQueue().getQueueId();
+                    String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
+                        pullRequest.getMessageQueue().getTopic(),
+                        pullRequest.getMessageQueue().getBrokerName(),
+                        pullRequest.getMessageQueue().getQueueId());
                     AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
                     if (localOffset == null) {
-                        localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
+                        localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
                     }
                     localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
 
@@ -474,9 +475,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
-        String localOffsetKey = pullRequest.getConsumerGroup()
-            + "@" + pullRequest.getMessageQueue().getTopic()
-            + "@" + pullRequest.getMessageQueue().getQueueId();
+        String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
+            pullRequest.getMessageQueue().getTopic(),
+            pullRequest.getMessageQueue().getBrokerName(),
+            pullRequest.getMessageQueue().getQueueId());
         if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
             //Stop pull request
             log.info("Stop pull request, {}", localOffsetKey);
@@ -498,9 +500,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     public void executePullRequestImmediately(final PullRequest pullRequest) {
-        String localOffsetKey = pullRequest.getConsumerGroup()
-            + "@" + pullRequest.getMessageQueue().getTopic()
-            + "@" + pullRequest.getMessageQueue().getQueueId();
+        String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
+            pullRequest.getMessageQueue().getTopic(),
+            pullRequest.getMessageQueue().getBrokerName(),
+            pullRequest.getMessageQueue().getQueueId());
         if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
             //Stop pull request
             log.info("Stop pull request, {}", localOffsetKey);
@@ -1197,21 +1200,23 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     public boolean processPushMessage(final MessageExt msg,
         final String consumerGroup,
         final String topic,
+        final String brokerName,
         final int queueID,
         final long offset) {
-        String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID;
+        String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
         AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
         if (localOffset == null) {
             log.info("Current Local offset have not set, initiallized to -1.");
             this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
             return false;
         }
-        if (localOffset.get() < offset) {
+        if (localOffset.get() + 1 < offset) {
             //should start pull message process
             log.debug("Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
             return false;
         } else {
             //Stop pull request
+            log.debug("Process Push : Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
             AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
             if (pullStop == null) {
                 this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
@@ -1233,10 +1238,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                 processQueue = processQueues.get(localOffsetKey);
             }
             processQueue.putMessage(messageExtList);
-            MessageQueue messageQueue = new MessageQueue(topic, "", queueID);
+            MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
             this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
-            log.info(".......submitConsumeRequest:{},Offset:{}...", localOffsetKey, offset);
         }
         return true;
     }
+
+    private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
+        return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 4a81c3b..0953fdc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1376,15 +1376,19 @@ public class MQClientInstance {
     public ClientConfig getNettyClientConfig() {
         return nettyClientConfig;
     }
+
     public boolean processSnodePushMessage(final MessageExt msg,
         final String consumerGroup,
         final String topic,
+        final String brokerName,
         final int queueID,
         final long offset) {
+        log.debug("Recieve:processSnodePushMessage :{}-{}-{}-{}-{}",
+            consumerGroup, topic, brokerName, queueID, offset);
         MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
         if (null != mqConsumerInner) {
             DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
-            consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset);
+            consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset);
             return true;
         }