You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:26:38 UTC
[13/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-175] Consumer may miss messages because of inconsistent sub… closes apache/incubator-rocketmq#92
[ROCKETMQ-175] Consumer may miss messages because of inconsistent sub… closes apache/incubator-rocketmq#92
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/82803889
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/82803889
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/82803889
Branch: refs/heads/master
Commit: 8280388917c466d030ffb774a2474ca8e4144811
Parents: 42826c4
Author: vsair <li...@gmail.com>
Authored: Fri May 26 15:13:29 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Fri May 26 15:13:29 2017 +0800
----------------------------------------------------------------------
.../rocketmq/client/impl/consumer/RebalancePushImpl.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/82803889/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 1730c99..509c9a4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl {
private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
@@ -47,6 +48,16 @@ public class RebalancePushImpl extends RebalanceImpl {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+ /**
+ * When rebalance result changed, should update subscription's version to notify broker.
+ * Fix: inconsistency subscription may lead to consumer miss messages.
+ */
+ SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
+ long newVersion = System.currentTimeMillis();
+ log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
+ subscriptionData.setSubVersion(newVersion);
+ // notify broker
+ this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}
@Override