You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/19 08:26:59 UTC

[rocketmq] branch snode updated: Fix the push subscription not update issue

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 28a8d64  Fix the push subscription not update issue
     new 8e12495  Merge branch 'snode' of github.com:apache/rocketmq into snode
28a8d64 is described below

commit 28a8d649fa7a734831d28e01407fc37c535f08df
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Feb 19 16:26:02 2019 +0800

    Fix the push subscription not update issue
---
 .../common/protocol/header/PushMessageHeader.java   | 21 +++++++++++++++++++++
 .../snode/client/impl/SubscriptionManagerImpl.java  |  7 ++-----
 .../snode/service/impl/PushServiceImpl.java         |  7 ++++---
 3 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PushMessageHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PushMessageHeader.java
index 40277eb..92285e1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PushMessageHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PushMessageHeader.java
@@ -36,6 +36,8 @@ public class PushMessageHeader implements CommandCustomHeader {
     @CFNotNull
     private String consumerGroup;
 
+    private String enodeName;
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
@@ -80,4 +82,23 @@ public class PushMessageHeader implements CommandCustomHeader {
     public void setConsumerGroup(String consumerGroup) {
         this.consumerGroup = consumerGroup;
     }
+
+    public String getEnodeName() {
+        return enodeName;
+    }
+
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
+    }
+
+    @Override public String toString() {
+        return "PushMessageHeader{" +
+            "queueOffset=" + queueOffset +
+            ", messageId='" + messageId + '\'' +
+            ", queueId=" + queueId +
+            ", topic='" + topic + '\'' +
+            ", consumerGroup='" + consumerGroup + '\'' +
+            ", enodeName='" + enodeName + '\'' +
+            '}';
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index 22e245a..c75c224 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -44,7 +44,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
     @Override
     public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
         String groupId) {
-        log.info("Register push session subscriptionDataSet: {}", subscriptionDataSet);
         Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
         Set<MessageQueue> keySet = new HashSet<>();
         for (SubscriptionData subscriptionData : subscriptionDataSet) {
@@ -57,15 +56,14 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
                         Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
                         clientSet = prev != null ? prev : clientSet;
                     }
-                    log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
+                    log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
                     clientSet.add(remotingChannel);
                 }
             }
         }
         if (keySet.size() > 0) {
-            this.clientSubscriptionTable.putIfAbsent(remotingChannel, keySet);
+            this.clientSubscriptionTable.put(remotingChannel, keySet);
         }
-        log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable);
         if (prevSubSet != null) {
             for (MessageQueue messageQueue : prevSubSet) {
                 if (!keySet.contains(messageQueue)) {
@@ -77,7 +75,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
                 }
             }
         }
-        log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable);
     }
 
     @Override
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index 8a81c4c..72f658e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService {
             messageExt.setFlag(sendMessageRequestHeader.getFlag());
             messageExt.setBody(message);
             messageExt.setBodyCRC(UtilAll.crc32(message));
-            log.info("MessageExt:{}", messageExt);
+            log.debug("MessageExt:{}", messageExt);
             return messageExt;
         }
 
@@ -103,9 +103,9 @@ public class PushServiceImpl implements PushService {
         public void run() {
             if (!canceled.get()) {
                 try {
-                    log.info("sendMessageResponse:{}", sendMessageResponse);
+                    log.debug("sendMessageResponse: {}", sendMessageResponse);
                     SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
-                    log.info("sendMessageResponseHeader:{}", sendMessageResponseHeader);
+                    log.debug("sendMessageResponseHeader: {}", sendMessageResponseHeader);
                     MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
                     Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue);
                     if (consumerTable != null) {
@@ -113,6 +113,7 @@ public class PushServiceImpl implements PushService {
                         pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
                         pushMessageHeader.setTopic(sendMessageRequestHeader.getTopic());
                         pushMessageHeader.setQueueId(sendMessageResponseHeader.getQueueId());
+                        pushMessageHeader.setEnodeName(sendMessageRequestHeader.getEnodeName());
                         RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
                         MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
                         pushMessage.setBody(MessageDecoder.encode(messageExt, false));