You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/02 02:34:13 UTC

[rocketmq] branch litePullConsumer updated: Polish lite pull consumer (#1359)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/litePullConsumer by this push:
     new 9c3b26c  Polish lite pull consumer (#1359)
9c3b26c is described below

commit 9c3b26cfd3a7b5c7b87bb13c4ab38f249107e349
Author: King <79...@qq.com>
AuthorDate: Fri Aug 2 10:34:06 2019 +0800

    Polish lite pull consumer (#1359)
    
    * fix unsubscribe code
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * polish commit consumed offset
    
    * pass checkstyle
    
    * pass checkstyle
    
    * polish LiteMQPullConsumer
    
    * add flow control and polish commit logic
    
    * fix bug
    
    * polish code
    
    * fix commit consumed offset back
    
    * refactor litePullConsumer
    
    * development save
    
    * development save
    
    * Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
    
    * Polish lite pull consumer
    
    * polish lite pull consumer
    
    * polish lite pull consumer
    
    * fix seek
    
    * fix seek function
    
    * polish lite pull consumer
    
    * add apache header
    
    * add test
    
    * polish test
---
 .../client/consumer/DefaultLitePullConsumer.java   |  27 ++-
 .../client/impl/consumer/AssignedMessageQueue.java |  41 +++-
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 180 ++++++++------
 .../impl/consumer/RebalanceLitePullImpl.java       |  24 +-
 .../consumer/DefaultLitePullConsumerTest.java      | 261 +++++++++++++++++++++
 .../example/simple/LitePullConsumerTest.java       |  22 +-
 6 files changed, 448 insertions(+), 107 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 757c966..7f65713 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -32,10 +32,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.RPCHook;
 
-
 public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
 
-    private DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
+    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
 
     /**
      * Do the same thing for the same Group, the application must be set,and guarantee Globally unique
@@ -47,7 +46,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
      */
     private long brokerSuspendMaxTimeMillis = 1000 * 20;
 
-
     /**
      * Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
      * recommended to modify
@@ -134,10 +132,15 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     private int pullThresholdSizeForQueue = 100;
 
     /**
-     * The socket timeout in milliseconds
+     * The poll timeout in milliseconds
      */
     private long pollTimeoutMillis = 1000 * 5;
 
+    /**
+     * Message pull delay in milliseconds
+     */
+    private long pullDelayTimeMills = 0;
+
     public DefaultLitePullConsumer() {
         this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
     }
@@ -163,7 +166,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.consumerGroup = consumerGroup;
-        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this,rpcHook);
+        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
     }
 
     @Override
@@ -217,13 +220,13 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     }
 
     @Override
-    public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException{
+    public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
         return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
     }
 
     @Override
-    public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException{
-        return this.defaultLitePullConsumerImpl.searchOffset(messageQueue,timestamp);
+    public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
+        return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, timestamp);
     }
 
     @Override
@@ -393,4 +396,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
         this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
     }
 
+    public long getPullDelayTimeMills() {
+        return pullDelayTimeMills;
+    }
+
+    public void setPullDelayTimeMills(long pullDelayTimeMills) {
+        this.pullDelayTimeMills = pullDelayTimeMills;
+    }
+
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index a3c5da1..aa8379e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.CountDownLatch2;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 public class AssignedMessageQueue {
@@ -36,7 +37,7 @@ public class AssignedMessageQueue {
         this.rebalanceImpl = rebalanceImpl;
     }
 
-    public Collection<MessageQueue> messageQueues(){
+    public Collection<MessageQueue> messageQueues() {
         return assignedMessageQueueState.keySet();
     }
 
@@ -52,6 +53,7 @@ public class AssignedMessageQueue {
         for (MessageQueue messageQueue : messageQueues) {
             MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
             if (assignedMessageQueueState.get(messageQueue) != null) {
+                messageQueueStat.getPausedLatch().reset();
                 messageQueueStat.setPaused(true);
             }
         }
@@ -62,6 +64,7 @@ public class AssignedMessageQueue {
             MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
             if (assignedMessageQueueState.get(messageQueue) != null) {
                 messageQueueStat.setPaused(false);
+                messageQueueStat.getPausedLatch().reset();
             }
         }
     }
@@ -74,18 +77,18 @@ public class AssignedMessageQueue {
         return null;
     }
 
-    public long getNextOffset(MessageQueue messageQueue) {
+    public long getPullOffset(MessageQueue messageQueue) {
         MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
         if (messageQueueStat != null) {
-            return messageQueueStat.getNextOffset();
+            return messageQueueStat.getPullOffset();
         }
         return -1;
     }
 
-    public void updateNextOffset(MessageQueue messageQueue, long offset) {
+    public void updatePullOffset(MessageQueue messageQueue, long offset) {
         MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
         if (messageQueueStat != null) {
-            messageQueueStat.setNextOffset(offset);
+            messageQueueStat.setPullOffset(offset);
         }
     }
 
@@ -119,12 +122,21 @@ public class AssignedMessageQueue {
         return -1;
     }
 
+    public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
+            return messageQueueStat.getPausedLatch();
+        }
+        return null;
+    }
+
     public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
         synchronized (this.assignedMessageQueueState) {
             Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
                 if (!assigned.contains(next.getKey())) {
+                    next.getValue().getProcessQueue().setDropped(true);
                     it.remove();
                 }
             }
@@ -159,10 +171,11 @@ public class AssignedMessageQueue {
     public class MessageQueueStat {
         private MessageQueue messageQueue;
         private ProcessQueue processQueue;
-        private boolean paused = false;
-        private long nextOffset = -1;
-        private long consumeOffset = -1;
+        private volatile boolean paused = false;
+        private volatile long pullOffset = -1;
+        private volatile long consumeOffset = -1;
         private volatile long seekOffset = -1;
+        private CountDownLatch2 pausedLatch = new CountDownLatch2(1);
 
         public MessageQueueStat(MessageQueue messageQueue, ProcessQueue processQueue) {
             this.messageQueue = messageQueue;
@@ -185,12 +198,12 @@ public class AssignedMessageQueue {
             this.paused = paused;
         }
 
-        public long getNextOffset() {
-            return nextOffset;
+        public long getPullOffset() {
+            return pullOffset;
         }
 
-        public void setNextOffset(long nextOffset) {
-            this.nextOffset = nextOffset;
+        public void setPullOffset(long pullOffset) {
+            this.pullOffset = pullOffset;
         }
 
         public ProcessQueue getProcessQueue() {
@@ -216,5 +229,9 @@ public class AssignedMessageQueue {
         public void setSeekOffset(long seekOffset) {
             this.seekOffset = seekOffset;
         }
+
+        public CountDownLatch2 getPausedLatch() {
+            return pausedLatch;
+        }
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 95e218f..74cf644 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -16,27 +16,22 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.TreeMap;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
 
-import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
@@ -56,6 +51,7 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.CountDownLatch2;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -65,7 +61,11 @@ import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
 
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -150,7 +150,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     }
 
     private void checkServiceState() {
-        if (!(this.serviceState == ServiceState.RUNNING))
+        if (this.serviceState != ServiceState.RUNNING)
             throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
     }
 
@@ -347,6 +347,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
+
+    }
+
+    public PullAPIWrapper getPullAPIWrapper() {
+        return pullAPIWrapper;
     }
 
     private void copySubscription() throws MQClientException {
@@ -440,16 +445,24 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     public List<MessageExt> poll(long timeout) {
         try {
             checkServiceState();
+            if (timeout < 0)
+                throw new IllegalArgumentException("Timeout must not be negative");
+
             if (defaultLitePullConsumer.isAutoCommit()) {
                 maybeAutoCommit();
             }
             long endTime = System.currentTimeMillis() + timeout;
+
             ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-            while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
-                consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-                if ((endTime - System.currentTimeMillis()) <= 0)
-                    break;
+
+            if (endTime - System.currentTimeMillis() > 0) {
+                while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
+                    consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+                    if (endTime - System.currentTimeMillis() <= 0)
+                        break;
+                }
             }
+
             if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                 List<MessageExt> messages = consumeRequest.getMessageExts();
                 long offset = consumeRequest.getProcessQueue().removeMessage(messages);
@@ -471,14 +484,33 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     }
 
     public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
-        if (offset < minOffset(messageQueue) || offset > maxOffset(messageQueue))
-            throw new MQClientException("Seek offset illegal", null);
+        if (!assignedMessageQueue.messageQueues().contains(messageQueue))
+            throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null);
+        long minOffset = minOffset(messageQueue);
+        long maxOffset = maxOffset(messageQueue);
+        if (offset < minOffset || offset > maxOffset)
+            throw new MQClientException("Seek offset illegal, seek offset = " + offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null);
         try {
+            assignedMessageQueue.pause(Collections.singletonList(messageQueue));
+            CountDownLatch2 pausedLatch = assignedMessageQueue.getPausedLatch(messageQueue);
+            if (pausedLatch != null) {
+                pausedLatch.await(2, TimeUnit.SECONDS);
+            }
+            ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+            if (processQueue != null) {
+                processQueue.clear();
+            }
+            Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
+            while (iter.hasNext()) {
+                if (iter.next().getMessageQueue().equals(messageQueue))
+                    iter.remove();
+            }
             assignedMessageQueue.setSeekOffset(messageQueue, offset);
-            updateConsumeOffset(messageQueue, offset);
-            updateConsumeOffsetToBroker(messageQueue, offset, false);
+            assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
         } catch (Exception e) {
             log.error("Seek offset failed.", e);
+        } finally {
+            assignedMessageQueue.resume(Collections.singletonList(messageQueue));
         }
     }
 
@@ -545,7 +577,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
         if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) {
-            assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
+            assignedMessageQueue.updatePullOffset(remoteQueue, nextPullOffset);
         }
     }
 
@@ -568,12 +600,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         if (seekOffset != -1) {
             offset = seekOffset;
             assignedMessageQueue.setSeekOffset(remoteQueue, -1);
-            assignedMessageQueue.updateNextOffset(remoteQueue,offset);
+            assignedMessageQueue.updatePullOffset(remoteQueue, offset);
         } else {
-            offset = assignedMessageQueue.getNextOffset(remoteQueue);
+            offset = assignedMessageQueue.getPullOffset(remoteQueue);
             if (offset == -1) {
                 offset = fetchConsumeOffset(remoteQueue, false);
-                assignedMessageQueue.updateNextOffset(remoteQueue, offset);
+                assignedMessageQueue.updatePullOffset(remoteQueue, offset);
                 assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
             }
         }
@@ -596,78 +628,82 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
         @Override
         public void run() {
-            ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
 
-            if (processQueue == null && processQueue.isDropped()) {
-                log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
-                return;
-            }
+            if (!this.isCancelled()) {
 
-            if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
-                scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
-                if ((consumeRequestFlowControlTimes++ % 1000) == 0)
-                    log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
-                return;
-            }
+                if (assignedMessageQueue.isPaused(messageQueue)) {
+                    CountDownLatch2 pasuedLatch = assignedMessageQueue.getPausedLatch(messageQueue);
+                    if (pasuedLatch != null)
+                        pasuedLatch.countDown();
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
+                    log.debug("Message Queue: {} has been paused!", messageQueue);
+                    return;
+                }
 
-            long cachedMessageCount = processQueue.getMsgCount().get();
-            long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
+                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
 
-            if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
-                scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
-                if ((queueFlowControlTimes++ % 1000) == 0) {
-                    log.warn(
-                        "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
-                        defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+                if (processQueue == null && processQueue.isDropped()) {
+                    log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
+                    return;
                 }
-                return;
-            }
 
-            if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
-                scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
-                if ((queueFlowControlTimes++ % 1000) == 0) {
-                    log.warn(
-                        "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
-                        defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+                if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    if ((consumeRequestFlowControlTimes++ % 1000) == 0)
+                        log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
+                    return;
                 }
-                return;
-            }
 
-            if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
-                scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
-                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
-                    log.warn(
-                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
-                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
+                long cachedMessageCount = processQueue.getMsgCount().get();
+                long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
+
+                if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    if ((queueFlowControlTimes++ % 1000) == 0) {
+                        log.warn(
+                            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+                            defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+                    }
+                    return;
                 }
-                return;
-            }
 
-            if (!this.isCancelled()) {
-                if (assignedMessageQueue.isPaused(messageQueue)) {
-                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
-                    log.debug("Message Queue: {} has been paused!", messageQueue);
+                if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    if ((queueFlowControlTimes++ % 1000) == 0) {
+                        log.warn(
+                            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+                            defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+                    }
                     return;
                 }
+
+                if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
+                        log.warn(
+                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
+                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
+                    }
+                    return;
+                }
+
                 String subExpression = null;
                 if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                     String topic = this.messageQueue.getTopic();
                     subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
                 }
                 long offset = nextPullOffset(messageQueue);
-                long pullDelayTimeMills = 0;
+                long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills();
                 try {
                     PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums());
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
-                            processQueue.putMessage(pullResult.getMsgFoundList());
-                            submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
-                            pullDelayTimeMills = 0;
+                            if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
+                                processQueue.putMessage(pullResult.getMsgFoundList());
+                                submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+                            }
                             break;
-                        case NO_NEW_MSG:
-                            pullDelayTimeMills = 100;
                         case OFFSET_ILLEGAL:
-                            //TODO
                             log.warn("the pull request offset illegal, {}", pullResult.toString());
                             break;
                         default:
@@ -1037,7 +1073,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         private final List<MessageExt> messageExts;
         private final MessageQueue messageQueue;
         private final ProcessQueue processQueue;
-        private long startConsumeTimeMillis;
 
         public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
             final ProcessQueue processQueue) {
@@ -1058,12 +1093,5 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
             return processQueue;
         }
 
-        public long getStartConsumeTimeMillis() {
-            return startConsumeTimeMillis;
-        }
-
-        public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
-            this.startConsumeTimeMillis = startConsumeTimeMillis;
-        }
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 8148c7d..0b8ec67 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.client.impl.consumer;
 
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
@@ -10,7 +26,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import java.util.List;
 import java.util.Set;
 
-public class RebalanceLitePullImpl extends RebalanceImpl  {
+public class RebalanceLitePullImpl extends RebalanceImpl {
 
     private final DefaultLitePullConsumerImpl litePullConsumerImpl;
 
@@ -19,8 +35,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl  {
     }
 
     public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel,
-                                 AllocateMessageQueueStrategy allocateMessageQueueStrategy,
-                                 MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
+        AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+        MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
         super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
         this.litePullConsumerImpl = litePullConsumerImpl;
     }
@@ -37,7 +53,6 @@ public class RebalanceLitePullImpl extends RebalanceImpl  {
         }
     }
 
-
     @Override
     public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
         this.litePullConsumerImpl.getOffsetStore().persist(mq);
@@ -64,5 +79,4 @@ public class RebalanceLitePullImpl extends RebalanceImpl  {
     public void dispatchPullRequest(List<PullRequest> pullRequestList) {
     }
 
-
 }
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
new file mode 100644
index 0000000..68144c7
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.consumer;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
+import org.apache.rocketmq.client.impl.consumer.RebalanceService;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultLitePullConsumerTest {
+    @Spy
+    private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    @Mock
+    private MQAdminImpl mQAdminImpl;
+
+    private RebalanceImpl rebalanceImpl;
+    private OffsetStore offsetStore;
+    private DefaultLitePullConsumer litePullConsumer;
+    private DefaultLitePullConsumerImpl litePullConsumerImpl;
+    private String consumerGroup = "LitePullConsumerGroup";
+    private String topic = "LitePullConsumerTest";
+    private String brokerName = "BrokerA";
+
+    @Before
+    public void init() throws Exception {
+        String groupName = consumerGroup + System.currentTimeMillis();
+        litePullConsumer = new DefaultLitePullConsumer(groupName);
+        litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+
+        Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
+        field.setAccessible(true);
+        RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
+        field = RebalanceService.class.getDeclaredField("waitInterval");
+        field.setAccessible(true);
+        field.set(rebalanceService, 100);
+
+        litePullConsumer.start();
+
+        field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
+        field.setAccessible(true);
+        litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
+        field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(litePullConsumerImpl, mQClientFactory);
+
+        PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper();
+        field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(pullAPIWrapper, mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQAdminImpl);
+
+        field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
+        field.setAccessible(true);
+        rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
+        field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(rebalanceImpl, mQClientFactory);
+
+        offsetStore = spy(litePullConsumerImpl.getOffsetStore());
+        field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
+        field.setAccessible(true);
+        field.set(litePullConsumerImpl, offsetStore);
+
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+            anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+            .thenAnswer(new Answer<Object>() {
+                @Override
+                public Object answer(InvocationOnMock mock) throws Throwable {
+                    PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                    MessageClientExt messageClientExt = new MessageClientExt();
+                    messageClientExt.setTopic(topic);
+                    messageClientExt.setQueueId(0);
+                    messageClientExt.setMsgId("123");
+                    messageClientExt.setBody(new byte[] {'a'});
+                    messageClientExt.setOffsetMsgId("234");
+                    messageClientExt.setBornHost(new InetSocketAddress(8080));
+                    messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                    PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                    return pullResult;
+                }
+            });
+
+        when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
+
+        doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
+
+        doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class));
+    }
+
+    @After
+    public void terminate() {
+        litePullConsumer.shutdown();
+    }
+
+    @Test
+    public void testAssign_PollMessageSuccess() {
+        MessageQueue messageQueue = createMessageQueue();
+        litePullConsumer.setPullDelayTimeMills(60 * 1000);
+        litePullConsumer.assign(Collections.singletonList(messageQueue));
+        List<MessageExt> result = litePullConsumer.poll();
+        assertThat(result.get(0).getTopic()).isEqualTo(topic);
+        assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+    }
+
+    @Test
+    public void testSubscribe_PollMessageSuccess() throws MQClientException {
+        litePullConsumer.setPullDelayTimeMills(60 * 1000);
+        litePullConsumer.subscribe(topic, "*");
+        Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+        messageQueueSet.add(createMessageQueue());
+        litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
+        litePullConsumer.setPollTimeoutMillis(20 * 1000);
+        List<MessageExt> result = litePullConsumer.poll();
+        assertThat(result.get(0).getTopic()).isEqualTo(topic);
+        assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+    }
+
+    @Test
+    public void testSubscriptionType_AssignAndSubscribeExclusive() throws MQClientException {
+        try {
+            litePullConsumer.subscribe(topic, "*");
+            litePullConsumer.assign(Collections.singletonList(createMessageQueue()));
+            failBecauseExceptionWasNotThrown(IllegalStateException.class);
+        } catch (IllegalStateException e) {
+            assertThat(e).hasMessageContaining("Cannot select two subscription types at the same time.");
+        }
+    }
+
+    @Test
+    public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws MQClientException {
+        try {
+            DefaultLitePullConsumer litePullConsumer = createLitePullConsumer();
+            litePullConsumer.fetchMessageQueues(topic);
+            failBecauseExceptionWasNotThrown(IllegalStateException.class);
+        } catch (IllegalStateException e) {
+            assertThat(e).hasMessageContaining("The consumer not running.");
+        }
+    }
+
+    @Test
+    public void testSeek_SeekOffsetIllegal() throws MQClientException {
+        when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
+        when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
+        MessageQueue messageQueue = createMessageQueue();
+        litePullConsumer.assign(Collections.singletonList(messageQueue));
+        try {
+            litePullConsumer.seek(messageQueue, -1);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("min offset = 0");
+        }
+
+        try {
+            litePullConsumer.seek(messageQueue, 1000);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("max offset = 100");
+        }
+    }
+
+    @Test
+    public void testSeek_MessageQueueNotInAssignList() {
+        try {
+            litePullConsumer.seek(createMessageQueue(), 0);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("The message queue is not in assigned list");
+        }
+    }
+
+    private MessageQueue createMessageQueue() {
+        MessageQueue messageQueue = new MessageQueue();
+        messageQueue.setBrokerName(brokerName);
+        messageQueue.setQueueId(0);
+        messageQueue.setTopic(topic);
+        return messageQueue;
+    }
+
+    private DefaultLitePullConsumer createLitePullConsumer() {
+        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+        return litePullConsumer;
+    }
+
+    private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+        List<MessageExt> messageExtList) throws Exception {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        for (MessageExt messageExt : messageExtList) {
+            outputStream.write(MessageDecoder.encode(messageExt, false));
+        }
+        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
+    }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
index 488a499..0430465 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -16,23 +16,33 @@
  */
 package org.apache.rocketmq.example.simple;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
-
+import org.apache.rocketmq.common.message.MessageQueue;
 
 public class LitePullConsumerTest {
     public static void main(String[] args) throws Exception {
         DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("test");
-        litePullConsumer.setNamesrvAddr("localhost:9876");
-        litePullConsumer.setAutoCommit(true);
-        litePullConsumer.subscribe("test41","TagA" );
+        litePullConsumer.setAutoCommit(false);
         litePullConsumer.start();
+        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("test400");
+        List<MessageQueue> list = new ArrayList<>(mqSet);
+        Collection<MessageQueue> assginMq = Collections.singletonList(list.get(0));
+        litePullConsumer.assign(assginMq);
+        int size = 0;
+        litePullConsumer.seek(list.get(0), 26);
 
-        int i = 0;
         while (true) {
             List<MessageExt> messageExts = litePullConsumer.poll();
-            System.out.printf("%s%n", messageExts);
+            if (messageExts != null) {
+                size += messageExts.size();
+            }
+            litePullConsumer.commitSync();
+            System.out.printf("%s %d %n", messageExts, size);
         }
 
     }