You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/20 08:41:59 UTC

[rocketmq] branch snode updated: [RIP-11] Recover pull request for old message queue that is allocated on rebalance.(#827)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new d7a2603  [RIP-11] Recover pull request for old message queue that is allocated on rebalance.(#827)
d7a2603 is described below

commit d7a2603c17eb04f49b936beea0900f04cc7bb96a
Author: dinglei <li...@163.com>
AuthorDate: Wed Feb 20 16:41:54 2019 +0800

    [RIP-11] Recover pull request for old message queue that is allocated on rebalance.(#827)
    
    [RIP-11] Recover pull request for old message queue that is allocated on rebalance.
---
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 36 ++++++++++++++++++++--
 .../client/impl/consumer/RebalanceImpl.java        | 12 ++++++++
 .../client/impl/consumer/RebalancePullImpl.java    |  4 +++
 .../client/impl/consumer/RebalancePushImpl.java    |  6 ++++
 4 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index f5251f5..ff5f418 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -1212,11 +1212,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
         if (localOffset.get() + 1 < offset) {
             //should start pull message process
-            log.debug("Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
+            log.debug("#####Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
             return false;
         } else {
             //Stop pull request
-            log.debug("Process Push : Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
+            log.debug("#####Process Push : Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
             AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
             if (pullStop == null) {
                 this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
@@ -1247,4 +1247,36 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
         return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
     }
+
+    public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
+        String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
+        ProcessQueue processQueue = processQueues.get(localOffsetKey);
+        if (processQueue != null) {
+            log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
+            processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
+        }
+        AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
+        if (pullStop != null) {
+            if (pullStop.get()) {
+                pullStop.set(false);
+                log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
+            }
+        }
+        return true;
+    }
+
+    public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
+        String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
+        AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
+        if (pullStop == null) {
+            this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
+            log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
+            return true;
+        }
+        if (!pullStop.get()) {
+            pullStop.set(true);
+            log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
+        }
+        return true;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 9ad07c7..ba90f09 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -359,6 +359,15 @@ public abstract class RebalanceImpl {
                                     consumerGroup, mq);
                             }
                             break;
+                        case CONSUME_PUSH:
+                            pq.setDropped(true);
+                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+                                it.remove();
+                                changed = true;
+                                log.info("doRebalance, {}, remove unnecessary mq, {}, because pull is pause by Push model, so try to fixed it",
+                                    consumerGroup, mq);
+                            }
+                            break;
                         default:
                             break;
                     }
@@ -375,6 +384,7 @@ public abstract class RebalanceImpl {
                 }
 
                 this.removeDirtyOffset(mq);
+                this.removeLocalDirtyPushOffset(topic, mq);
                 ProcessQueue pq = new ProcessQueue();
                 long nextOffset = this.computePullFromWhere(mq);
                 if (nextOffset >= 0) {
@@ -411,6 +421,8 @@ public abstract class RebalanceImpl {
 
     public abstract void removeDirtyOffset(final MessageQueue mq);
 
+    public abstract void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq);
+
     public abstract long computePullFromWhere(final MessageQueue mq);
 
     public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index 9dd408c..9dc8f4a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -69,6 +69,10 @@ public class RebalancePullImpl extends RebalanceImpl {
     }
 
     @Override
+    public void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq) {
+    }
+
+    @Override
     public long computePullFromWhere(MessageQueue mq) {
         return 0;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 2c2f014..2e622be 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -141,6 +141,12 @@ public class RebalancePushImpl extends RebalanceImpl {
     }
 
     @Override
+    public void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq) {
+        log.info("removeLocalDirtyPushOffset:consumergroup={},topic={},mq={}", consumerGroup, topic, mq);
+        this.defaultMQPushConsumerImpl.resumePullRequest(consumerGroup, topic, mq.getBrokerName(), mq.getQueueId());
+    }
+
+    @Override
     public long computePullFromWhere(MessageQueue mq) {
         long result = -1;
         final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();