You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/28 09:41:42 UTC
[rocketmq] branch snode updated: Polish Push consumer
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 4dbf416 Polish Push consumer
4dbf416 is described below
commit 4dbf4160d952b3d2ee8b91c7c6fa26e662f5c496
Author: ShannonDing <li...@163.com>
AuthorDate: Thu Feb 28 17:41:07 2019 +0800
Polish Push consumer
---
.../consumer/DefaultMQRealPushConsumerImpl.java | 48 ++++++++++++++--------
.../client/impl/consumer/RebalanceImpl.java | 3 ++
2 files changed, 34 insertions(+), 17 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
index 0c323ca..755ed58 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
@@ -224,13 +224,13 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
try {
this.makeSureStateOK();
} catch (MQClientException e) {
- log.warn("pullMessage exception, consumer state not ok", e);
+ log.warn("pullMessage exception, real push consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
if (this.isPause()) {
- log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
+ log.warn("push consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
@@ -242,7 +242,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
- "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
+ "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
@@ -252,7 +252,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
- "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
+ "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
@@ -263,7 +263,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
- "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
+ "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
@@ -274,7 +274,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
- log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
+ log.info("the first time to pull message, so fix offset from snode. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
@@ -582,7 +582,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
- log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
+ log.info("the real push consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.destroy();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
@@ -596,6 +596,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
+ if (this.defaultMQPushConsumer.isRealPush()) {
+ log.info("============================================");
+ log.info(" Notice: Start Real Push Consumer Model.");
+ log.info("============================================\n");
+ }
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
@@ -1207,6 +1212,16 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
+ AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
+ if (pullStop == null) {
+ this.pullStopped.put(localOffsetKey, new AtomicBoolean(false));
+ log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
+ }
+ ProcessQueue processQueue = processQueues.get(localOffsetKey);
+ if (processQueue == null) {
+ processQueues.put(localOffsetKey, new ProcessQueue());
+ processQueue = processQueues.get(localOffsetKey);
+ }
if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
@@ -1214,11 +1229,6 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
} else {
//Stop pull request
log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
- AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
- if (pullStop == null) {
- this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
- log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
- }
pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) {
pullStop.set(true);
@@ -1229,13 +1239,17 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
//submit to process queue
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg);
- ProcessQueue processQueue = processQueues.get(localOffsetKey);
- if (processQueue == null) {
- processQueues.put(localOffsetKey, new ProcessQueue());
- processQueue = processQueues.get(localOffsetKey);
- }
+
processQueue.putMessage(messageExtList);
+ if (this.consumeOrderly) {
+ processQueue.setLocked(true);
+ processQueue.setLastLockTimestamp(System.currentTimeMillis());
+ }
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
+ //update process Queue pull time
+ if (this.rebalanceImpl.processQueueTable.get(messageQueue) != null) {
+ this.rebalanceImpl.processQueueTable.get(messageQueue).setLastPullTimestamp(System.currentTimeMillis());
+ }
this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
}
return true;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index ba90f09..763472a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -360,6 +360,9 @@ public abstract class RebalanceImpl {
}
break;
case CONSUME_PUSH:
+ if (isOrder) {
+ break;
+ }
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();