You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/19 13:40:42 UTC

[rocketmq] branch snode updated: Polish rebalance process in real push mode

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 1dde4fe  Polish rebalance process in real push mode
1dde4fe is described below

commit 1dde4fe7f297474e5cf6be945ad5fac82f99ed04
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Feb 19 21:40:23 2019 +0800

    Polish rebalance process in real push mode
---
 .../apache/rocketmq/client/impl/consumer/RebalancePushImpl.java    | 7 +------
 .../apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java | 5 ++++-
 .../org/apache/rocketmq/snode/processor/HeartbeatProcessor.java    | 3 +--
 .../org/apache/rocketmq/snode/processor/SendMessageProcessor.java  | 1 -
 4 files changed, 6 insertions(+), 10 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index a8bbaa5..2c2f014 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -59,11 +58,7 @@ public class RebalancePushImpl extends RebalanceImpl {
         log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
         subscriptionData.setSubVersion(newVersion);
 
-        Set<MessageQueue> queueIdSet = new HashSet<MessageQueue>();
-        for (MessageQueue messageQueue : mqAll) {
-            queueIdSet.add(messageQueue);
-        }
-        subscriptionData.setMessageQueueSet(queueIdSet);
+        subscriptionData.setMessageQueueSet(mqDivided);
         int currentQueueCount = this.processQueueTable.size();
         if (currentQueueCount != 0) {
             int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index c75c224..1f5ecd8 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
     @Override
     public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
         String groupId) {
+        log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
         Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
         Set<MessageQueue> keySet = new HashSet<>();
         for (SubscriptionData subscriptionData : subscriptionDataSet) {
@@ -56,7 +57,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
                         Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
                         clientSet = prev != null ? prev : clientSet;
                     }
-                    log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
+                    log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
                     clientSet.add(remotingChannel);
                 }
             }
@@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
         if (keySet.size() > 0) {
             this.clientSubscriptionTable.put(remotingChannel, keySet);
         }
+
         if (prevSubSet != null) {
             for (MessageQueue messageQueue : prevSubSet) {
                 if (!keySet.contains(messageQueue)) {
@@ -75,6 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
                 }
             }
         }
+        log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel));
     }
 
     @Override
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index 08e342e..3a7d822 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -71,7 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor {
 
     private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
-        log.info("heartbeatData: {}", heartbeatData);
+        log.debug("heartbeatData: {}", heartbeatData);
         Channel channel = null;
         Attribute<Client> clientAttribute = null;
         if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
@@ -85,7 +85,6 @@ public class HeartbeatProcessor implements RequestProcessor {
             client.setClientRole(ClientRole.Producer);
             this.snodeController.getProducerManager().register(producerData.getGroupName(), client);
         }
-
         Set<String> groupSet = new HashSet<>();
         for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
             client.setClientRole(ClientRole.Consumer);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index cadc4d7..c3fd2fe 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -99,7 +99,6 @@ public class SendMessageProcessor implements RequestProcessor {
                 remotingChannel.reply(data);
                 this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length);
                 if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
-                    log.info("Send message response: {}", data);
                     this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data);
                 }
             } else {