You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/25 07:34:23 UTC
[rocketmq] branch snode updated: Fix the subscription data type
error issue
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 044222b Fix the subscription data type error issue
044222b is described below
commit 044222b5a1ecd19584a05d5d27ad76aab4508437
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Jan 25 15:33:46 2019 +0800
Fix the subscription data type error issue
---
.../org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java | 4 +---
.../rocketmq/common/flowcontrol/AbstractFlowControlService.java | 2 ++
.../apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java | 2 +-
.../org/apache/rocketmq/snode/processor/HeartbeatProcessor.java | 6 +-----
4 files changed, 5 insertions(+), 9 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index b772b84..b92b6be 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -31,7 +31,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl {
@@ -55,8 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl {
* When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages.
*/
- SubscriptionData sub = this.subscriptionInner.get(topic);
- PushSubscriptionData subscriptionData = (PushSubscriptionData) sub;
+ SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
index 8713a5d..e4267f4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
+++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
@@ -74,6 +74,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired) {
SphO.exit();
+ this.acquiredThreadLocal.remove();
}
}
@@ -82,6 +83,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired) {
SphO.exit();
+ this.acquiredThreadLocal.remove();
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index be337da..72cfd6a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -56,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet;
}
- log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
+ log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel);
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index f7a5d38..36e27fc 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -20,7 +20,6 @@ import io.netty.channel.Channel;
import io.netty.util.Attribute;
import java.util.HashSet;
import java.util.Set;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -75,15 +74,12 @@ public class HeartbeatProcessor implements RequestProcessor {
Client client = new Client();
client.setClientId(heartbeatData.getClientID());
client.setRemotingChannel(remotingChannel);
- Set<String> groupSet = new HashSet<>();
for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
client.setClientRole(ClientRole.Producer);
- if (!MixAll.CLIENT_INNER_PRODUCER_GROUP.equals(producerData.getGroupName())) {
- groupSet.add(producerData.getGroupName());
- }
this.snodeController.getProducerManager().register(producerData.getGroupName(), client);
}
+ Set<String> groupSet = new HashSet<>();
for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
client.setClientRole(ClientRole.Consumer);
groupSet.add(consumerData.getGroupName());