You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/20 08:41:59 UTC
[rocketmq] branch snode updated: [RIP-11] Recover pull request for
old message queue that is allocated on rebalance.(#827)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new d7a2603 [RIP-11] Recover pull request for old message queue that is allocated on rebalance.(#827)
d7a2603 is described below
commit d7a2603c17eb04f49b936beea0900f04cc7bb96a
Author: dinglei <li...@163.com>
AuthorDate: Wed Feb 20 16:41:54 2019 +0800
[RIP-11] Recover pull request for old message queue that is allocated on rebalance.(#827)
[RIP-11] Recover pull request for old message queue that is allocated on rebalance.
---
.../impl/consumer/DefaultMQPushConsumerImpl.java | 36 ++++++++++++++++++++--
.../client/impl/consumer/RebalanceImpl.java | 12 ++++++++
.../client/impl/consumer/RebalancePullImpl.java | 4 +++
.../client/impl/consumer/RebalancePushImpl.java | 6 ++++
4 files changed, 56 insertions(+), 2 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index f5251f5..ff5f418 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -1212,11 +1212,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
if (localOffset.get() + 1 < offset) {
//should start pull message process
- log.debug("Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
+ log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
- log.debug("Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
+ log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
@@ -1247,4 +1247,36 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
}
+
+ public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
+ String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
+ ProcessQueue processQueue = processQueues.get(localOffsetKey);
+ if (processQueue != null) {
+ log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
+ processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
+ }
+ AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
+ if (pullStop != null) {
+ if (pullStop.get()) {
+ pullStop.set(false);
+ log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
+ }
+ }
+ return true;
+ }
+
+ public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
+ String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
+ AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
+ if (pullStop == null) {
+ this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
+ log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
+ return true;
+ }
+ if (!pullStop.get()) {
+ pullStop.set(true);
+ log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
+ }
+ return true;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 9ad07c7..ba90f09 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -359,6 +359,15 @@ public abstract class RebalanceImpl {
consumerGroup, mq);
}
break;
+ case CONSUME_PUSH:
+ pq.setDropped(true);
+ if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+ it.remove();
+ changed = true;
+ log.info("doRebalance, {}, remove unnecessary mq, {}, because pull is pause by Push model, so try to fixed it",
+ consumerGroup, mq);
+ }
+ break;
default:
break;
}
@@ -375,6 +384,7 @@ public abstract class RebalanceImpl {
}
this.removeDirtyOffset(mq);
+ this.removeLocalDirtyPushOffset(topic, mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
@@ -411,6 +421,8 @@ public abstract class RebalanceImpl {
public abstract void removeDirtyOffset(final MessageQueue mq);
+ public abstract void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq);
+
public abstract long computePullFromWhere(final MessageQueue mq);
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index 9dd408c..9dc8f4a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -69,6 +69,10 @@ public class RebalancePullImpl extends RebalanceImpl {
}
@Override
+ public void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq) {
+ }
+
+ @Override
public long computePullFromWhere(MessageQueue mq) {
return 0;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 2c2f014..2e622be 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -141,6 +141,12 @@ public class RebalancePushImpl extends RebalanceImpl {
}
@Override
+ public void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq) {
+ log.info("removeLocalDirtyPushOffset:consumergroup={},topic={},mq={}", consumerGroup, topic, mq);
+ this.defaultMQPushConsumerImpl.resumePullRequest(consumerGroup, topic, mq.getBrokerName(), mq.getQueueId());
+ }
+
+ @Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();