You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/25 07:34:23 UTC

[rocketmq] branch snode updated: Fix the subscription data type error issue

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 044222b  Fix the subscription data type error issue
044222b is described below

commit 044222b5a1ecd19584a05d5d27ad76aab4508437
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Jan 25 15:33:46 2019 +0800

    Fix the subscription data type error issue
---
 .../org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java | 4 +---
 .../rocketmq/common/flowcontrol/AbstractFlowControlService.java     | 2 ++
 .../apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java  | 2 +-
 .../org/apache/rocketmq/snode/processor/HeartbeatProcessor.java     | 6 +-----
 4 files changed, 5 insertions(+), 9 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index b772b84..b92b6be 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -31,7 +31,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
 public class RebalancePushImpl extends RebalanceImpl {
@@ -55,8 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl {
          * When rebalance result changed, should update subscription's version to notify broker.
          * Fix: inconsistency subscription may lead to consumer miss messages.
          */
-        SubscriptionData sub = this.subscriptionInner.get(topic);
-        PushSubscriptionData subscriptionData = (PushSubscriptionData) sub;
+        SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
         long newVersion = System.currentTimeMillis();
         log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
         subscriptionData.setSubVersion(newVersion);
diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
index 8713a5d..e4267f4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
+++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
@@ -74,6 +74,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
         Boolean acquired = this.acquiredThreadLocal.get();
         if (acquired != null && acquired) {
             SphO.exit();
+            this.acquiredThreadLocal.remove();
         }
     }
 
@@ -82,6 +83,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
         Boolean acquired = this.acquiredThreadLocal.get();
         if (acquired != null && acquired) {
             SphO.exit();
+            this.acquiredThreadLocal.remove();
         }
     }
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index be337da..72cfd6a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -56,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
                         Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
                         clientSet = prev != null ? prev : clientSet;
                     }
-                    log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
+                    log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
                     clientSet.add(remotingChannel);
                 }
             }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index f7a5d38..36e27fc 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -20,7 +20,6 @@ import io.netty.channel.Channel;
 import io.netty.util.Attribute;
 import java.util.HashSet;
 import java.util.Set;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -75,15 +74,12 @@ public class HeartbeatProcessor implements RequestProcessor {
         Client client = new Client();
         client.setClientId(heartbeatData.getClientID());
         client.setRemotingChannel(remotingChannel);
-        Set<String> groupSet = new HashSet<>();
         for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
             client.setClientRole(ClientRole.Producer);
-            if (!MixAll.CLIENT_INNER_PRODUCER_GROUP.equals(producerData.getGroupName())) {
-                groupSet.add(producerData.getGroupName());
-            }
             this.snodeController.getProducerManager().register(producerData.getGroupName(), client);
         }
 
+        Set<String> groupSet = new HashSet<>();
         for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
             client.setClientRole(ClientRole.Consumer);
             groupSet.add(consumerData.getGroupName());