You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/28 09:41:42 UTC

[rocketmq] branch snode updated: Polish Push consumer

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 4dbf416  Polish Push consumer
4dbf416 is described below

commit 4dbf4160d952b3d2ee8b91c7c6fa26e662f5c496
Author: ShannonDing <li...@163.com>
AuthorDate: Thu Feb 28 17:41:07 2019 +0800

    Polish Push consumer
---
 .../consumer/DefaultMQRealPushConsumerImpl.java    | 48 ++++++++++++++--------
 .../client/impl/consumer/RebalanceImpl.java        |  3 ++
 2 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
index 0c323ca..755ed58 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
@@ -224,13 +224,13 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
         try {
             this.makeSureStateOK();
         } catch (MQClientException e) {
-            log.warn("pullMessage exception, consumer state not ok", e);
+            log.warn("pullMessage exception, real push consumer state not ok", e);
             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
             return;
         }
 
         if (this.isPause()) {
-            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
+            log.warn("push consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
             return;
         }
@@ -242,7 +242,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
             if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
-                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
+                    "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                     this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
             }
             return;
@@ -252,7 +252,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
             if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
-                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
+                    "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                     this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
             }
             return;
@@ -263,7 +263,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                     log.warn(
-                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
+                        "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                         processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                         pullRequest, queueMaxSpanFlowControlTimes);
                 }
@@ -274,7 +274,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
                 if (!pullRequest.isLockedFirst()) {
                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                     boolean brokerBusy = offset < pullRequest.getNextOffset();
-                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
+                    log.info("the first time to pull message, so fix offset from snode. pullRequest: {} NewOffset: {} brokerBusy: {}",
                         pullRequest, offset, brokerBusy);
                     if (brokerBusy) {
                         log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
@@ -582,7 +582,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
                 this.persistConsumerOffset();
                 this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
                 this.mQClientFactory.shutdown();
-                log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
+                log.info("the real push consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
                 this.rebalanceImpl.destroy();
                 this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                 break;
@@ -596,6 +596,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
     public synchronized void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
+                if (this.defaultMQPushConsumer.isRealPush()) {
+                    log.info("============================================");
+                    log.info(" Notice: Start Real Push Consumer Model.");
+                    log.info("============================================\n");
+                }
                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                 this.serviceState = ServiceState.START_FAILED;
@@ -1207,6 +1212,16 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
             this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
             return false;
         }
+        AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
+        if (pullStop == null) {
+            this.pullStopped.put(localOffsetKey, new AtomicBoolean(false));
+            log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
+        }
+        ProcessQueue processQueue = processQueues.get(localOffsetKey);
+        if (processQueue == null) {
+            processQueues.put(localOffsetKey, new ProcessQueue());
+            processQueue = processQueues.get(localOffsetKey);
+        }
         if (localOffset.get() + 1 < offset) {
             //should start pull message process
             log.debug("#####Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
@@ -1214,11 +1229,6 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
         } else {
             //Stop pull request
             log.debug("#####Process Push : Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
-            AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
-            if (pullStop == null) {
-                this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
-                log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
-            }
             pullStop = this.pullStopped.get(localOffsetKey);
             if (!pullStop.get()) {
                 pullStop.set(true);
@@ -1229,13 +1239,17 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
             //submit to process queue
             List<MessageExt> messageExtList = new ArrayList<MessageExt>();
             messageExtList.add(msg);
-            ProcessQueue processQueue = processQueues.get(localOffsetKey);
-            if (processQueue == null) {
-                processQueues.put(localOffsetKey, new ProcessQueue());
-                processQueue = processQueues.get(localOffsetKey);
-            }
+
             processQueue.putMessage(messageExtList);
+            if (this.consumeOrderly) {
+                processQueue.setLocked(true);
+                processQueue.setLastLockTimestamp(System.currentTimeMillis());
+            }
             MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
+            //update process Queue pull time
+            if (this.rebalanceImpl.processQueueTable.get(messageQueue) != null) {
+                this.rebalanceImpl.processQueueTable.get(messageQueue).setLastPullTimestamp(System.currentTimeMillis());
+            }
             this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
         }
         return true;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index ba90f09..763472a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -360,6 +360,9 @@ public abstract class RebalanceImpl {
                             }
                             break;
                         case CONSUME_PUSH:
+                            if (isOrder) {
+                                break;
+                            }
                             pq.setDropped(true);
                             if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                 it.remove();