You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/02 02:34:13 UTC
[rocketmq] branch litePullConsumer updated: Polish lite pull
consumer (#1359)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/litePullConsumer by this push:
new 9c3b26c Polish lite pull consumer (#1359)
9c3b26c is described below
commit 9c3b26cfd3a7b5c7b87bb13c4ab38f249107e349
Author: King <79...@qq.com>
AuthorDate: Fri Aug 2 10:34:06 2019 +0800
Polish lite pull consumer (#1359)
* fix unsubscribe code
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* polish commit consumed offset
* pass checkstyle
* pass checkstyle
* polish LiteMQPullConsumer
* add flow control and polish commit logic
* fix bug
* polish code
* fix commit consumed offset back
* refactor litePullConsumer
* development save
* development save
* Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
* Polish lite pull consumer
* polish lite pull consumer
* polish lite pull consumer
* fix seek
* fix seek function
* polish lite pull consumer
* add apache header
* add test
* polish test
---
.../client/consumer/DefaultLitePullConsumer.java | 27 ++-
.../client/impl/consumer/AssignedMessageQueue.java | 41 +++-
.../impl/consumer/DefaultLitePullConsumerImpl.java | 180 ++++++++------
.../impl/consumer/RebalanceLitePullImpl.java | 24 +-
.../consumer/DefaultLitePullConsumerTest.java | 261 +++++++++++++++++++++
.../example/simple/LitePullConsumerTest.java | 22 +-
6 files changed, 448 insertions(+), 107 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 757c966..7f65713 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -32,10 +32,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
-
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
- private DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
+ private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/**
* Do the same thing for the same Group, the application must be set,and guarantee Globally unique
@@ -47,7 +46,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private long brokerSuspendMaxTimeMillis = 1000 * 20;
-
/**
* Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
* recommended to modify
@@ -134,10 +132,15 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
private int pullThresholdSizeForQueue = 100;
/**
- * The socket timeout in milliseconds
+ * The poll timeout in milliseconds
*/
private long pollTimeoutMillis = 1000 * 5;
+ /**
+ * Message pull delay in milliseconds
+ */
+ private long pullDelayTimeMills = 0;
+
public DefaultLitePullConsumer() {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
}
@@ -163,7 +166,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
- defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this,rpcHook);
+ defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
@Override
@@ -217,13 +220,13 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
}
@Override
- public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException{
+ public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
}
@Override
- public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException{
- return this.defaultLitePullConsumerImpl.searchOffset(messageQueue,timestamp);
+ public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
+ return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, timestamp);
}
@Override
@@ -393,4 +396,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
}
+ public long getPullDelayTimeMills() {
+ return pullDelayTimeMills;
+ }
+
+ public void setPullDelayTimeMills(long pullDelayTimeMills) {
+ this.pullDelayTimeMills = pullDelayTimeMills;
+ }
+
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index a3c5da1..aa8379e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue {
@@ -36,7 +37,7 @@ public class AssignedMessageQueue {
this.rebalanceImpl = rebalanceImpl;
}
- public Collection<MessageQueue> messageQueues(){
+ public Collection<MessageQueue> messageQueues() {
return assignedMessageQueueState.keySet();
}
@@ -52,6 +53,7 @@ public class AssignedMessageQueue {
for (MessageQueue messageQueue : messageQueues) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
+ messageQueueStat.getPausedLatch().reset();
messageQueueStat.setPaused(true);
}
}
@@ -62,6 +64,7 @@ public class AssignedMessageQueue {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueStat.setPaused(false);
+ messageQueueStat.getPausedLatch().reset();
}
}
}
@@ -74,18 +77,18 @@ public class AssignedMessageQueue {
return null;
}
- public long getNextOffset(MessageQueue messageQueue) {
+ public long getPullOffset(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) {
- return messageQueueStat.getNextOffset();
+ return messageQueueStat.getPullOffset();
}
return -1;
}
- public void updateNextOffset(MessageQueue messageQueue, long offset) {
+ public void updatePullOffset(MessageQueue messageQueue, long offset) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) {
- messageQueueStat.setNextOffset(offset);
+ messageQueueStat.setPullOffset(offset);
}
}
@@ -119,12 +122,21 @@ public class AssignedMessageQueue {
return -1;
}
+ public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) {
+ MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueStat != null) {
+ return messageQueueStat.getPausedLatch();
+ }
+ return null;
+ }
+
public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
if (!assigned.contains(next.getKey())) {
+ next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
}
@@ -159,10 +171,11 @@ public class AssignedMessageQueue {
public class MessageQueueStat {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
- private boolean paused = false;
- private long nextOffset = -1;
- private long consumeOffset = -1;
+ private volatile boolean paused = false;
+ private volatile long pullOffset = -1;
+ private volatile long consumeOffset = -1;
private volatile long seekOffset = -1;
+ private CountDownLatch2 pausedLatch = new CountDownLatch2(1);
public MessageQueueStat(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue;
@@ -185,12 +198,12 @@ public class AssignedMessageQueue {
this.paused = paused;
}
- public long getNextOffset() {
- return nextOffset;
+ public long getPullOffset() {
+ return pullOffset;
}
- public void setNextOffset(long nextOffset) {
- this.nextOffset = nextOffset;
+ public void setPullOffset(long pullOffset) {
+ this.pullOffset = pullOffset;
}
public ProcessQueue getProcessQueue() {
@@ -216,5 +229,9 @@ public class AssignedMessageQueue {
public void setSeekOffset(long seekOffset) {
this.seekOffset = seekOffset;
}
+
+ public CountDownLatch2 getPausedLatch() {
+ return pausedLatch;
+ }
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 95e218f..74cf644 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -16,27 +16,22 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
-import java.util.Collections;
-import java.util.TreeMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
@@ -56,6 +51,7 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -65,7 +61,11 @@ import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -150,7 +150,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
private void checkServiceState() {
- if (!(this.serviceState == ServiceState.RUNNING))
+ if (this.serviceState != ServiceState.RUNNING)
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
}
@@ -347,6 +347,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
+
+ }
+
+ public PullAPIWrapper getPullAPIWrapper() {
+ return pullAPIWrapper;
}
private void copySubscription() throws MQClientException {
@@ -440,16 +445,24 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public List<MessageExt> poll(long timeout) {
try {
checkServiceState();
+ if (timeout < 0)
+ throw new IllegalArgumentException("Timeout must not be negative");
+
if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit();
}
long endTime = System.currentTimeMillis() + timeout;
+
ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
- consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- if ((endTime - System.currentTimeMillis()) <= 0)
- break;
+
+ if (endTime - System.currentTimeMillis() > 0) {
+ while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
+ consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if (endTime - System.currentTimeMillis() <= 0)
+ break;
+ }
}
+
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
List<MessageExt> messages = consumeRequest.getMessageExts();
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
@@ -471,14 +484,33 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
- if (offset < minOffset(messageQueue) || offset > maxOffset(messageQueue))
- throw new MQClientException("Seek offset illegal", null);
+ if (!assignedMessageQueue.messageQueues().contains(messageQueue))
+ throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null);
+ long minOffset = minOffset(messageQueue);
+ long maxOffset = maxOffset(messageQueue);
+ if (offset < minOffset || offset > maxOffset)
+ throw new MQClientException("Seek offset illegal, seek offset = " + offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null);
try {
+ assignedMessageQueue.pause(Collections.singletonList(messageQueue));
+ CountDownLatch2 pausedLatch = assignedMessageQueue.getPausedLatch(messageQueue);
+ if (pausedLatch != null) {
+ pausedLatch.await(2, TimeUnit.SECONDS);
+ }
+ ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+ if (processQueue != null) {
+ processQueue.clear();
+ }
+ Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
+ while (iter.hasNext()) {
+ if (iter.next().getMessageQueue().equals(messageQueue))
+ iter.remove();
+ }
assignedMessageQueue.setSeekOffset(messageQueue, offset);
- updateConsumeOffset(messageQueue, offset);
- updateConsumeOffsetToBroker(messageQueue, offset, false);
+ assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
} catch (Exception e) {
log.error("Seek offset failed.", e);
+ } finally {
+ assignedMessageQueue.resume(Collections.singletonList(messageQueue));
}
}
@@ -545,7 +577,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) {
- assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
+ assignedMessageQueue.updatePullOffset(remoteQueue, nextPullOffset);
}
}
@@ -568,12 +600,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (seekOffset != -1) {
offset = seekOffset;
assignedMessageQueue.setSeekOffset(remoteQueue, -1);
- assignedMessageQueue.updateNextOffset(remoteQueue,offset);
+ assignedMessageQueue.updatePullOffset(remoteQueue, offset);
} else {
- offset = assignedMessageQueue.getNextOffset(remoteQueue);
+ offset = assignedMessageQueue.getPullOffset(remoteQueue);
if (offset == -1) {
offset = fetchConsumeOffset(remoteQueue, false);
- assignedMessageQueue.updateNextOffset(remoteQueue, offset);
+ assignedMessageQueue.updatePullOffset(remoteQueue, offset);
assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
}
}
@@ -596,78 +628,82 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
@Override
public void run() {
- ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
- if (processQueue == null && processQueue.isDropped()) {
- log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
- return;
- }
+ if (!this.isCancelled()) {
- if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
- scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
- if ((consumeRequestFlowControlTimes++ % 1000) == 0)
- log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
- return;
- }
+ if (assignedMessageQueue.isPaused(messageQueue)) {
+ CountDownLatch2 pasuedLatch = assignedMessageQueue.getPausedLatch(messageQueue);
+ if (pasuedLatch != null)
+ pasuedLatch.countDown();
+ scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
+ log.debug("Message Queue: {} has been paused!", messageQueue);
+ return;
+ }
- long cachedMessageCount = processQueue.getMsgCount().get();
- long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
+ ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
- if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
- scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
- if ((queueFlowControlTimes++ % 1000) == 0) {
- log.warn(
- "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
- defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+ if (processQueue == null && processQueue.isDropped()) {
+ log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
+ return;
}
- return;
- }
- if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
- scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
- if ((queueFlowControlTimes++ % 1000) == 0) {
- log.warn(
- "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
- defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+ if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
+ scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((consumeRequestFlowControlTimes++ % 1000) == 0)
+ log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
+ return;
}
- return;
- }
- if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
- scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
- if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
- log.warn(
- "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
+ long cachedMessageCount = processQueue.getMsgCount().get();
+ long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
+
+ if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
+ scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+ defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+ }
+ return;
}
- return;
- }
- if (!this.isCancelled()) {
- if (assignedMessageQueue.isPaused(messageQueue)) {
- scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
- log.debug("Message Queue: {} has been paused!", messageQueue);
+ if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
+ scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+ defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+ }
return;
}
+
+ if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
+ scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
+ processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
+ }
+ return;
+ }
+
String subExpression = null;
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic();
subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
}
long offset = nextPullOffset(messageQueue);
- long pullDelayTimeMills = 0;
+ long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills();
try {
PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums());
switch (pullResult.getPullStatus()) {
case FOUND:
- processQueue.putMessage(pullResult.getMsgFoundList());
- submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
- pullDelayTimeMills = 0;
+ if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
+ processQueue.putMessage(pullResult.getMsgFoundList());
+ submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+ }
break;
- case NO_NEW_MSG:
- pullDelayTimeMills = 100;
case OFFSET_ILLEGAL:
- //TODO
log.warn("the pull request offset illegal, {}", pullResult.toString());
break;
default:
@@ -1037,7 +1073,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final List<MessageExt> messageExts;
private final MessageQueue messageQueue;
private final ProcessQueue processQueue;
- private long startConsumeTimeMillis;
public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
final ProcessQueue processQueue) {
@@ -1058,12 +1093,5 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return processQueue;
}
- public long getStartConsumeTimeMillis() {
- return startConsumeTimeMillis;
- }
-
- public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
- this.startConsumeTimeMillis = startConsumeTimeMillis;
- }
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 8148c7d..0b8ec67 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
@@ -10,7 +26,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.Set;
-public class RebalanceLitePullImpl extends RebalanceImpl {
+public class RebalanceLitePullImpl extends RebalanceImpl {
private final DefaultLitePullConsumerImpl litePullConsumerImpl;
@@ -19,8 +35,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
}
public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy,
- MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+ MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.litePullConsumerImpl = litePullConsumerImpl;
}
@@ -37,7 +53,6 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
}
}
-
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.litePullConsumerImpl.getOffsetStore().persist(mq);
@@ -64,5 +79,4 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
-
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
new file mode 100644
index 0000000..68144c7
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.consumer;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
+import org.apache.rocketmq.client.impl.consumer.RebalanceService;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultLitePullConsumerTest {
+ @Spy
+ private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+
+ @Mock
+ private MQClientAPIImpl mQClientAPIImpl;
+ @Mock
+ private MQAdminImpl mQAdminImpl;
+
+ private RebalanceImpl rebalanceImpl;
+ private OffsetStore offsetStore;
+ private DefaultLitePullConsumer litePullConsumer;
+ private DefaultLitePullConsumerImpl litePullConsumerImpl;
+ private String consumerGroup = "LitePullConsumerGroup";
+ private String topic = "LitePullConsumerTest";
+ private String brokerName = "BrokerA";
+
+ @Before
+ public void init() throws Exception {
+ String groupName = consumerGroup + System.currentTimeMillis();
+ litePullConsumer = new DefaultLitePullConsumer(groupName);
+ litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+
+ Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
+ field.setAccessible(true);
+ RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
+ field = RebalanceService.class.getDeclaredField("waitInterval");
+ field.setAccessible(true);
+ field.set(rebalanceService, 100);
+
+ litePullConsumer.start();
+
+ field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
+ field.setAccessible(true);
+ litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
+ field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(litePullConsumerImpl, mQClientFactory);
+
+ PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper();
+ field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(pullAPIWrapper, mQClientFactory);
+
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mQClientFactory, mQClientAPIImpl);
+
+ field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
+ field.setAccessible(true);
+ field.set(mQClientFactory, mQAdminImpl);
+
+ field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
+ field.setAccessible(true);
+ rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
+ field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(rebalanceImpl, mQClientFactory);
+
+ offsetStore = spy(litePullConsumerImpl.getOffsetStore());
+ field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
+ field.setAccessible(true);
+ field.set(litePullConsumerImpl, offsetStore);
+
+ when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+ anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+ .thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ PullMessageRequestHeader requestHeader = mock.getArgument(1);
+ MessageClientExt messageClientExt = new MessageClientExt();
+ messageClientExt.setTopic(topic);
+ messageClientExt.setQueueId(0);
+ messageClientExt.setMsgId("123");
+ messageClientExt.setBody(new byte[] {'a'});
+ messageClientExt.setOffsetMsgId("234");
+ messageClientExt.setBornHost(new InetSocketAddress(8080));
+ messageClientExt.setStoreHost(new InetSocketAddress(8080));
+ PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+ return pullResult;
+ }
+ });
+
+ when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
+
+ doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
+
+ doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class));
+ }
+
+ @After
+ public void terminate() {
+ litePullConsumer.shutdown();
+ }
+
+ @Test
+ public void testAssign_PollMessageSuccess() {
+ MessageQueue messageQueue = createMessageQueue();
+ litePullConsumer.setPullDelayTimeMills(60 * 1000);
+ litePullConsumer.assign(Collections.singletonList(messageQueue));
+ List<MessageExt> result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+ }
+
+ @Test
+ public void testSubscribe_PollMessageSuccess() throws MQClientException {
+ litePullConsumer.setPullDelayTimeMills(60 * 1000);
+ litePullConsumer.subscribe(topic, "*");
+ Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+ messageQueueSet.add(createMessageQueue());
+ litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
+ litePullConsumer.setPollTimeoutMillis(20 * 1000);
+ List<MessageExt> result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+ }
+
+ @Test
+ public void testSubscriptionType_AssignAndSubscribeExclusive() throws MQClientException {
+ try {
+ litePullConsumer.subscribe(topic, "*");
+ litePullConsumer.assign(Collections.singletonList(createMessageQueue()));
+ failBecauseExceptionWasNotThrown(IllegalStateException.class);
+ } catch (IllegalStateException e) {
+ assertThat(e).hasMessageContaining("Cannot select two subscription types at the same time.");
+ }
+ }
+
+ @Test
+ public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws MQClientException {
+ try {
+ DefaultLitePullConsumer litePullConsumer = createLitePullConsumer();
+ litePullConsumer.fetchMessageQueues(topic);
+ failBecauseExceptionWasNotThrown(IllegalStateException.class);
+ } catch (IllegalStateException e) {
+ assertThat(e).hasMessageContaining("The consumer not running.");
+ }
+ }
+
+ @Test
+ public void testSeek_SeekOffsetIllegal() throws MQClientException {
+ when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
+ when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
+ MessageQueue messageQueue = createMessageQueue();
+ litePullConsumer.assign(Collections.singletonList(messageQueue));
+ try {
+ litePullConsumer.seek(messageQueue, -1);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("min offset = 0");
+ }
+
+ try {
+ litePullConsumer.seek(messageQueue, 1000);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("max offset = 100");
+ }
+ }
+
+ @Test
+ public void testSeek_MessageQueueNotInAssignList() {
+ try {
+ litePullConsumer.seek(createMessageQueue(), 0);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("The message queue is not in assigned list");
+ }
+ }
+
+ private MessageQueue createMessageQueue() {
+ MessageQueue messageQueue = new MessageQueue();
+ messageQueue.setBrokerName(brokerName);
+ messageQueue.setQueueId(0);
+ messageQueue.setTopic(topic);
+ return messageQueue;
+ }
+
+ private DefaultLitePullConsumer createLitePullConsumer() {
+ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+ return litePullConsumer;
+ }
+
+ private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+ List<MessageExt> messageExtList) throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ for (MessageExt messageExt : messageExtList) {
+ outputStream.write(MessageDecoder.encode(messageExt, false));
+ }
+ return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
index 488a499..0430465 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -16,23 +16,33 @@
*/
package org.apache.rocketmq.example.simple;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
-
+import org.apache.rocketmq.common.message.MessageQueue;
public class LitePullConsumerTest {
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("test");
- litePullConsumer.setNamesrvAddr("localhost:9876");
- litePullConsumer.setAutoCommit(true);
- litePullConsumer.subscribe("test41","TagA" );
+ litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
+ Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("test400");
+ List<MessageQueue> list = new ArrayList<>(mqSet);
+ Collection<MessageQueue> assginMq = Collections.singletonList(list.get(0));
+ litePullConsumer.assign(assginMq);
+ int size = 0;
+ litePullConsumer.seek(list.get(0), 26);
- int i = 0;
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
- System.out.printf("%s%n", messageExts);
+ if (messageExts != null) {
+ size += messageExts.size();
+ }
+ litePullConsumer.commitSync();
+ System.out.printf("%s %d %n", messageExts, size);
}
}