You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/19 13:40:42 UTC
[rocketmq] branch snode updated: Polish rebalance process in real
push mode
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 1dde4fe Polish rebalance process in real push mode
1dde4fe is described below
commit 1dde4fe7f297474e5cf6be945ad5fac82f99ed04
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Feb 19 21:40:23 2019 +0800
Polish rebalance process in real push mode
---
.../apache/rocketmq/client/impl/consumer/RebalancePushImpl.java | 7 +------
.../apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java | 5 ++++-
.../org/apache/rocketmq/snode/processor/HeartbeatProcessor.java | 3 +--
.../org/apache/rocketmq/snode/processor/SendMessageProcessor.java | 1 -
4 files changed, 6 insertions(+), 10 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index a8bbaa5..2c2f014 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.impl.consumer;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -59,11 +58,7 @@ public class RebalancePushImpl extends RebalanceImpl {
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
- Set<MessageQueue> queueIdSet = new HashSet<MessageQueue>();
- for (MessageQueue messageQueue : mqAll) {
- queueIdSet.add(messageQueue);
- }
- subscriptionData.setMessageQueueSet(queueIdSet);
+ subscriptionData.setMessageQueueSet(mqDivided);
int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index c75c224..1f5ecd8 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) {
+ log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData subscriptionData : subscriptionDataSet) {
@@ -56,7 +57,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet;
}
- log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
+ log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel);
}
}
@@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
if (keySet.size() > 0) {
this.clientSubscriptionTable.put(remotingChannel, keySet);
}
+
if (prevSubSet != null) {
for (MessageQueue messageQueue : prevSubSet) {
if (!keySet.contains(messageQueue)) {
@@ -75,6 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
}
}
+ log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel));
}
@Override
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index 08e342e..3a7d822 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -71,7 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
- log.info("heartbeatData: {}", heartbeatData);
+ log.debug("heartbeatData: {}", heartbeatData);
Channel channel = null;
Attribute<Client> clientAttribute = null;
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
@@ -85,7 +85,6 @@ public class HeartbeatProcessor implements RequestProcessor {
client.setClientRole(ClientRole.Producer);
this.snodeController.getProducerManager().register(producerData.getGroupName(), client);
}
-
Set<String> groupSet = new HashSet<>();
for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
client.setClientRole(ClientRole.Consumer);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index cadc4d7..c3fd2fe 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -99,7 +99,6 @@ public class SendMessageProcessor implements RequestProcessor {
remotingChannel.reply(data);
this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length);
if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
- log.info("Send message response: {}", data);
this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data);
}
} else {