You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/04/19 01:35:08 UTC

[rocketmq] branch pop_consumer updated: [RIP-19] Pop Consuming (client)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch pop_consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/pop_consumer by this push:
     new 9d8f4c2  [RIP-19] Pop Consuming (client)
     new cd44623  Merge pull request #2808 from hill007299/pop_consumer
9d8f4c2 is described below

commit 9d8f4c2cc0e30d8240fb325ed047634f1ad67275
Author: hill007299 <hi...@126.com>
AuthorDate: Tue Mar 9 11:17:55 2021 +0800

    [RIP-19] Pop Consuming (client)
---
 .../AckCallback.java}                              |  29 +-
 .../AckResult.java}                                |  45 +-
 .../AckStatus.java}                                |  38 +-
 .../client/consumer/DefaultMQPushConsumer.java     |  51 +++
 .../PopCallback.java}                              |  32 +-
 .../apache/rocketmq/client/consumer/PopResult.java |  82 ++++
 .../PopStatus.java}                                |  47 +-
 ...MessageService.java => BaseInvokeCallback.java} |  34 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 286 +++++++++++-
 .../ConsumeMessageConcurrentlyService.java         |   8 +-
 .../consumer/ConsumeMessageOrderlyService.java     |  14 +-
 ...a => ConsumeMessagePopConcurrentlyService.java} | 265 +++++------
 .../consumer/ConsumeMessagePopOrderlyService.java  | 408 +++++++++++++++++
 .../impl/consumer/ConsumeMessageService.java       |   5 +
 .../impl/consumer/DefaultLitePullConsumerImpl.java |   6 +
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 331 +++++++++++++-
 .../client/impl/consumer/MessageQueueLock.java     |  29 +-
 ...sumeMessageService.java => MessageRequest.java} |  27 +-
 .../client/impl/consumer/PopProcessQueue.java      |  84 ++++
 .../consumer/{PullRequest.java => PopRequest.java} |  61 ++-
 .../client/impl/consumer/PullAPIWrapper.java       |  55 ++-
 .../client/impl/consumer/PullMessageService.java   |  47 +-
 .../rocketmq/client/impl/consumer/PullRequest.java |   8 +-
 .../client/impl/consumer/RebalanceImpl.java        | 478 ++++++++++++++++++--
 .../impl/consumer/RebalanceLitePullImpl.java       |  25 +-
 .../client/impl/consumer/RebalancePullImpl.java    |  26 +-
 .../client/impl/consumer/RebalancePushImpl.java    |  56 ++-
 .../client/impl/factory/MQClientInstance.java      |  37 +-
 .../consumer/DefaultLitePullConsumerTest.java      |  27 +-
 .../client/consumer/DefaultMQPushConsumerTest.java |  36 +-
 .../rocketmq/client/impl/MQClientAPIImplTest.java  | 490 ++++++++++++++++++++-
 .../consumer/DefaultMQPushConsumerImplTest.java    |  92 ++++
 .../impl/consumer/RebalancePushImplTest.java       |  47 +-
 .../client/impl/factory/MQClientInstanceTest.java  |   1 +
 .../client/producer/DefaultMQProducerTest.java     |   5 +-
 .../trace/DefaultMQConsumerWithTraceTest.java      |   4 -
 .../powermock/extensions/configuration.properties  |  16 +
 37 files changed, 2883 insertions(+), 449 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AckCallback.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/AckCallback.java
index 5078c97..99a261c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AckCallback.java
@@ -14,31 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.impl.consumer;
+package org.apache.rocketmq.client.consumer;
 
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+public interface AckCallback {
+    void onSuccess(final AckResult ackResult);
 
-public interface ConsumeMessageService {
-    void start();
-
-    void shutdown(long awaitTerminateMillis);
-
-    void updateCorePoolSize(int corePoolSize);
-
-    void incCorePoolSize();
-
-    void decCorePoolSize();
-
-    int getCorePoolSize();
-
-    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
-    void submitConsumeRequest(
-        final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
-        final MessageQueue messageQueue,
-        final boolean dispathToConsume);
+    void onException(final Throwable e);
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AckResult.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/AckResult.java
index 5078c97..06cb59a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AckResult.java
@@ -14,31 +14,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.impl.consumer;
+package org.apache.rocketmq.client.consumer;
 
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 
-public interface ConsumeMessageService {
-    void start();
+public class AckResult {
+    private AckStatus status;
+    private String extraInfo;
+    private long popTime;
 
-    void shutdown(long awaitTerminateMillis);
+    public void setPopTime(long popTime) {
+        this.popTime = popTime;
+    }
 
-    void updateCorePoolSize(int corePoolSize);
+    public long getPopTime() {
+        return popTime;
+    }
 
-    void incCorePoolSize();
+    public AckStatus getStatus() {
+        return status;
+    }
 
-    void decCorePoolSize();
+    public void setStatus(AckStatus status) {
+        this.status = status;
+    }
 
-    int getCorePoolSize();
+    public void setExtraInfo(String extraInfo) {
+        this.extraInfo = extraInfo;
+    }
 
-    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
+    public String getExtraInfo() {
+        return extraInfo;
+    }
 
-    void submitConsumeRequest(
-        final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
-        final MessageQueue messageQueue,
-        final boolean dispathToConsume);
+    @Override
+    public String toString() {
+        return "AckResult [AckStatus=" + status + ",extraInfo=" + extraInfo + "]";
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AckStatus.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/AckStatus.java
index 5078c97..b144f8f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AckStatus.java
@@ -14,31 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.impl.consumer;
-
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-
-public interface ConsumeMessageService {
-    void start();
-
-    void shutdown(long awaitTerminateMillis);
-
-    void updateCorePoolSize(int corePoolSize);
-
-    void incCorePoolSize();
-
-    void decCorePoolSize();
-
-    int getCorePoolSize();
-
-    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
-    void submitConsumeRequest(
-        final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
-        final MessageQueue messageQueue,
-        final boolean dispathToConsume);
+package org.apache.rocketmq.client.consumer;
+
+public enum AckStatus {
+    /**
+     * ack success
+     */
+    OK,
+    /**
+     * msg not exist
+     */
+    NO_EXIST,
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 9011117..d32dd15 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -180,6 +180,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     private int pullThresholdForQueue = 1000;
 
     /**
+     * Flow control threshold on queue level, means max num of messages waiting to ack.
+     * in contrast with pull threshold, once a message is popped, it's considered the beginning of consumption.
+     */
+    private int popThresholdForQueue = 96;
+
+    /**
      * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
      * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
      *
@@ -255,6 +261,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     private long consumeTimeout = 15;
 
     /**
+     * Maximum amount of invisible time in millisecond of a message, rang is [5000, 300000]
+     */
+    private long popInvisibleTime = 60000;
+
+    /**
+     * Batch pop size. range is [1, 32]
+     */
+    private int popBatchNums = 32;
+
+    /**
      * Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
      */
     private long awaitTerminationMillisWhenShutdown = 0;
@@ -264,6 +280,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      */
     private TraceDispatcher traceDispatcher = null;
 
+    // force to use client rebalance
+    private boolean clientRebalance = false;
+
     /**
      * Default constructor.
      */
@@ -598,6 +617,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         this.pullThresholdForQueue = pullThresholdForQueue;
     }
 
+    public int getPopThresholdForQueue() {
+        return popThresholdForQueue;
+    }
+
+    public void setPopThresholdForQueue(int popThresholdForQueue) {
+        this.popThresholdForQueue = popThresholdForQueue;
+    }
+
     public int getPullThresholdForTopic() {
         return pullThresholdForTopic;
     }
@@ -891,6 +918,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         this.consumeTimeout = consumeTimeout;
     }
 
+    public long getPopInvisibleTime() {
+        return popInvisibleTime;
+    }
+
+    public void setPopInvisibleTime(long popInvisibleTime) {
+        this.popInvisibleTime = popInvisibleTime;
+    }
+
     public long getAwaitTerminationMillisWhenShutdown() {
         return awaitTerminationMillisWhenShutdown;
     }
@@ -902,4 +937,20 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     public TraceDispatcher getTraceDispatcher() {
         return traceDispatcher;
     }
+
+    public int getPopBatchNums() {
+        return popBatchNums;
+    }
+
+    public void setPopBatchNums(int popBatchNums) {
+        this.popBatchNums = popBatchNums;
+    }
+
+    public boolean isClientRebalance() {
+        return clientRebalance;
+    }
+
+    public void setClientRebalance(boolean clientRebalance) {
+        this.clientRebalance = clientRebalance;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PopCallback.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/PopCallback.java
index 5078c97..4932e74 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PopCallback.java
@@ -14,31 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.impl.consumer;
+package org.apache.rocketmq.client.consumer;
 
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-
-public interface ConsumeMessageService {
-    void start();
-
-    void shutdown(long awaitTerminateMillis);
-
-    void updateCorePoolSize(int corePoolSize);
-
-    void incCorePoolSize();
-
-    void decCorePoolSize();
-
-    int getCorePoolSize();
-
-    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
+/**
+ * Async message pop interface
+ */
+public interface PopCallback {
+    void onSuccess(final PopResult popResult);
 
-    void submitConsumeRequest(
-        final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
-        final MessageQueue messageQueue,
-        final boolean dispathToConsume);
+    void onException(final Throwable e);
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PopResult.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PopResult.java
new file mode 100644
index 0000000..6423e90
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PopResult.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class PopResult {
+    private List<MessageExt> msgFoundList;
+    private PopStatus popStatus;
+    private long popTime;
+    private long invisibleTime;
+    private long restNum;
+
+    public PopResult(PopStatus popStatus, List<MessageExt> msgFoundList) {
+        this.popStatus = popStatus;
+        this.msgFoundList = msgFoundList;
+    }
+
+    public long getPopTime() {
+        return popTime;
+    }
+
+
+    public void setPopTime(long popTime) {
+        this.popTime = popTime;
+    }
+
+    public long getRestNum() {
+        return restNum;
+    }
+
+    public void setRestNum(long restNum) {
+        this.restNum = restNum;
+    }
+
+    public long getInvisibleTime() {
+        return invisibleTime;
+    }
+
+
+    public void setInvisibleTime(long invisibleTime) {
+        this.invisibleTime = invisibleTime;
+    }
+
+
+    public void setPopStatus(PopStatus popStatus) {
+        this.popStatus = popStatus;
+    }
+
+    public PopStatus getPopStatus() {
+        return popStatus;
+    }
+
+    public List<MessageExt> getMsgFoundList() {
+        return msgFoundList;
+    }
+
+    public void setMsgFoundList(List<MessageExt> msgFoundList) {
+        this.msgFoundList = msgFoundList;
+    }
+
+    @Override
+    public String toString() {
+        return "PopResult [popStatus=" + popStatus + ",msgFoundList="
+            + (msgFoundList == null ? 0 : msgFoundList.size()) + ",restNum=" + restNum + "]";
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PopStatus.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/PopStatus.java
index 5078c97..17dda9a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PopStatus.java
@@ -14,31 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.impl.consumer;
-
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-
-public interface ConsumeMessageService {
-    void start();
-
-    void shutdown(long awaitTerminateMillis);
-
-    void updateCorePoolSize(int corePoolSize);
-
-    void incCorePoolSize();
-
-    void decCorePoolSize();
-
-    int getCorePoolSize();
-
-    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
-    void submitConsumeRequest(
-        final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
-        final MessageQueue messageQueue,
-        final boolean dispathToConsume);
+package org.apache.rocketmq.client.consumer;
+
+public enum PopStatus {
+    /**
+     * Founded
+     */
+    FOUND,
+    /**
+     * No new message can be pull after polling time out
+     * delete after next realease
+     */
+    NO_NEW_MSG,
+    /**
+     * polling pool is full, do not try again immediately.
+     */
+    POLLING_FULL,
+    /**
+     * polling time out but no message find
+     */
+    POLLING_NOT_FOUND
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
index 5078c97..baf6f17 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
@@ -14,31 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.impl.consumer;
 
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+package org.apache.rocketmq.client.impl;
 
-public interface ConsumeMessageService {
-    void start();
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
 
-    void shutdown(long awaitTerminateMillis);
+public abstract class BaseInvokeCallback implements InvokeCallback {
+    private final MQClientAPIImpl mqClientAPI;
 
-    void updateCorePoolSize(int corePoolSize);
+    public BaseInvokeCallback(MQClientAPIImpl mqClientAPI) {
+        this.mqClientAPI = mqClientAPI;
+    }
 
-    void incCorePoolSize();
+    @Override
+    public void operationComplete(final ResponseFuture responseFuture) {
+        onComplete(responseFuture);
+    }
 
-    void decCorePoolSize();
-
-    int getCorePoolSize();
-
-    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
-    void submitConsumeRequest(
-        final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
-        final MessageQueue messageQueue,
-        final boolean dispathToConsume);
+    public abstract void onComplete(final ResponseFuture responseFuture);
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7a4d556..f3b4284 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,6 +30,12 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -59,6 +66,8 @@ import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -77,15 +86,21 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryAssignmentRequestBody;
+import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
 import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.common.protocol.body.SetMessageRequestModeRequestBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeResponseHeader;
 import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -95,6 +110,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeade
 import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
 import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
 import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
@@ -113,6 +129,8 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader;
@@ -144,10 +162,12 @@ import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHead
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -243,6 +263,34 @@ public class MQClientAPIImpl {
         this.remotingClient.shutdown();
     }
 
+    public Set<MessageQueueAssignment> queryAssignment(final String addr, final String topic,
+        final String consumerGroup, final String clientId, final String strategyName,
+        final MessageModel messageModel, final long timeoutMillis)
+        throws RemotingException, MQBrokerException, InterruptedException {
+        QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody();
+        requestBody.setTopic(topic);
+        requestBody.setConsumerGroup(consumerGroup);
+        requestBody.setClientId(clientId);
+        requestBody.setMessageModel(messageModel);
+        requestBody.setStrategyName(strategyName);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_ASSIGNMENT, null);
+        request.setBody(requestBody.encode());
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+            request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                QueryAssignmentResponseBody queryAssignmentResponseBody = QueryAssignmentResponseBody.decode(response.getBody(), QueryAssignmentResponseBody.class);
+                return queryAssignmentResponseBody.getMessageQueueAssignments();
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
     public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
         final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -394,7 +442,8 @@ public class MQClientAPIImpl {
 
     }
 
-    public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
+    public AclConfig getBrokerClusterConfig(final String addr,
+        final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);
 
@@ -404,7 +453,7 @@ public class MQClientAPIImpl {
             case ResponseCode.SUCCESS: {
                 if (response.getBody() != null) {
                     GetBrokerClusterAclConfigResponseBody body =
-                            GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
+                        GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
                     AclConfig aclConfig = new AclConfig();
                     aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
                     aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
@@ -502,7 +551,7 @@ public class MQClientAPIImpl {
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
-        return this.processSendResponse(brokerName, msg, response,addr);
+        return this.processSendResponse(brokerName, msg, response, addr);
     }
 
     private void sendMessageAsync(
@@ -668,7 +717,7 @@ public class MQClientAPIImpl {
         }
 
         SendMessageResponseHeader responseHeader =
-                (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+            (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
 
         //If namespace not null , reset Topic without namespace.
         String topic = msg.getTopic();
@@ -687,8 +736,8 @@ public class MQClientAPIImpl {
             uniqMsgId = sb.toString();
         }
         SendResult sendResult = new SendResult(sendStatus,
-                uniqMsgId,
-                responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+            uniqMsgId,
+            responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
         sendResult.setTransactionId(responseHeader.getTransactionId());
         String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
         String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -730,6 +779,123 @@ public class MQClientAPIImpl {
         return null;
     }
 
+    public void popMessageAsync(
+        final String brokerName, final String addr, final PopMessageRequestHeader requestHeader,
+        final long timeoutMillis, final PopCallback popCallback
+    ) throws RemotingException, InterruptedException {
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
+        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new BaseInvokeCallback(MQClientAPIImpl.this) {
+            @Override
+            public void onComplete(ResponseFuture responseFuture) {
+                RemotingCommand response = responseFuture.getResponseCommand();
+                if (response != null) {
+                    try {
+                        PopResult
+                            popResult = MQClientAPIImpl.this.processPopResponse(brokerName, response, requestHeader.getTopic(), requestHeader);
+                        assert popResult != null;
+                        popCallback.onSuccess(popResult);
+                    } catch (Exception e) {
+                        popCallback.onException(e);
+                    }
+                } else {
+                    if (!responseFuture.isSendRequestOK()) {
+                        popCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
+                    } else if (responseFuture.isTimeout()) {
+                        popCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
+                            responseFuture.getCause()));
+                    } else {
+                        popCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
+                    }
+                }
+            }
+        });
+    }
+
+    public void ackMessageAsync(
+        final String addr,
+        final long timeOut,
+        final AckCallback ackCallback,
+        final AckMessageRequestHeader requestHeader //
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+        this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {
+
+            @Override
+            public void onComplete(ResponseFuture responseFuture) {
+                RemotingCommand response = responseFuture.getResponseCommand();
+                if (response != null) {
+                    try {
+                        AckResult ackResult = new AckResult();
+                        if (ResponseCode.SUCCESS == response.getCode()) {
+                            ackResult.setStatus(AckStatus.OK);
+                        } else {
+                            ackResult.setStatus(AckStatus.NO_EXIST);
+                        }
+                        assert ackResult != null;
+                        ackCallback.onSuccess(ackResult);
+                    } catch (Exception e) {
+                        ackCallback.onException(e);
+                    }
+                } else {
+                    if (!responseFuture.isSendRequestOK()) {
+                        ackCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
+                    } else if (responseFuture.isTimeout()) {
+                        ackCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
+                            responseFuture.getCause()));
+                    } else {
+                        ackCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeOut + ". Request: " + request, responseFuture.getCause()));
+                    }
+                }
+
+            }
+        });
+    }
+
+    public void changeInvisibleTimeAsync(//
+        final String brokerName,
+        final String addr, //
+        final ChangeInvisibleTimeRequestHeader requestHeader,//
+        final long timeoutMillis,
+        final AckCallback ackCallback
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader);
+        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new BaseInvokeCallback(MQClientAPIImpl.this) {
+            @Override
+            public void onComplete(ResponseFuture responseFuture) {
+                RemotingCommand response = responseFuture.getResponseCommand();
+                if (response != null) {
+                    try {
+                        ChangeInvisibleTimeResponseHeader responseHeader = (ChangeInvisibleTimeResponseHeader) response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class);
+                        AckResult ackResult = new AckResult();
+                        if (ResponseCode.SUCCESS == response.getCode()) {
+                            ackResult.setStatus(AckStatus.OK);
+                            ackResult.setPopTime(responseHeader.getPopTime());
+                            ackResult.setExtraInfo(ExtraInfoUtil
+                                .buildExtraInfo(requestHeader.getOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+                                    responseHeader.getReviveQid(), requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) + MessageConst.KEY_SEPARATOR
+                                + requestHeader.getOffset());
+                        } else {
+                            ackResult.setStatus(AckStatus.NO_EXIST);
+                        }
+                        assert ackResult != null;
+                        ackCallback.onSuccess(ackResult);
+                    } catch (Exception e) {
+                        ackCallback.onException(e);
+                    }
+                } else {
+                    if (!responseFuture.isSendRequestOK()) {
+                        ackCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
+                    } else if (responseFuture.isTimeout()) {
+                        ackCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
+                            responseFuture.getCause()));
+                    } else {
+                        ackCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
+                    }
+                }
+            }
+        });
+    }
+
     private void pullMessageAsync(
         final String addr,
         final RemotingCommand request,
@@ -801,6 +967,94 @@ public class MQClientAPIImpl {
             responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
     }
 
+    private PopResult processPopResponse(final String brokerName, final RemotingCommand response, String topic,
+        CommandCustomHeader requestHeader) throws MQBrokerException, RemotingCommandException {
+        PopStatus popStatus = PopStatus.NO_NEW_MSG;
+        List<MessageExt> msgFoundList = null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS:
+                popStatus = PopStatus.FOUND;
+                ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
+                msgFoundList = MessageDecoder.decodes(byteBuffer);
+                break;
+            case ResponseCode.POLLING_FULL:
+                popStatus = PopStatus.POLLING_FULL;
+                break;
+            case ResponseCode.POLLING_TIMEOUT:
+                popStatus = PopStatus.POLLING_NOT_FOUND;
+                break;
+            case ResponseCode.PULL_NOT_FOUND:
+                popStatus = PopStatus.POLLING_NOT_FOUND;
+                break;
+            default:
+                throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+
+        PopResult popResult = new PopResult(popStatus, msgFoundList);
+        PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.decodeCommandCustomHeader(PopMessageResponseHeader.class);
+        popResult.setRestNum(responseHeader.getRestNum());
+        // it is a pop command if pop time greater than 0, we should set the check point info to extraInfo field
+        if (popStatus == PopStatus.FOUND) {
+            Map<String, Long> startOffsetInfo = null;
+            Map<String, List<Long>> msgOffsetInfo = null;
+            Map<String, Integer> orderCountInfo = null;
+            if (requestHeader instanceof PopMessageRequestHeader) {
+                popResult.setInvisibleTime(responseHeader.getInvisibleTime());
+                popResult.setPopTime(responseHeader.getPopTime());
+                startOffsetInfo = ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
+                msgOffsetInfo = ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
+                orderCountInfo = ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
+            }
+            Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> sortMap = new HashMap<String, List<Long>>(16);
+            for (MessageExt messageExt : msgFoundList) {
+                String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
+                if (!sortMap.containsKey(key)) {
+                    sortMap.put(key, new ArrayList<Long>(4));
+                }
+                sortMap.get(key).add(messageExt.getQueueOffset());
+            }
+            Map<String, String> map = new HashMap<String, String>(5);
+            for (MessageExt messageExt : msgFoundList) {
+                if (requestHeader instanceof PopMessageRequestHeader) {
+                    if (startOffsetInfo == null) {
+                        // we should set the check point info to extraInfo field , if the command is popMsg
+                        // find pop ck offset
+                        String key = messageExt.getTopic() + messageExt.getQueueId();
+                        if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
+                            map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
+                                messageExt.getTopic(), brokerName, messageExt.getQueueId()));
+
+                        }
+                        messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
+                    } else {
+                        String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
+                        int index = sortMap.get(key).indexOf(messageExt.getQueueOffset());
+                        Long msgQueueOffset = msgOffsetInfo.get(key).get(index);
+                        if (msgQueueOffset != messageExt.getQueueOffset()) {
+                            log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s", msgQueueOffset, messageExt);
+                        }
+
+                        messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
+                            ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key).longValue(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+                                responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset.longValue())
+                        );
+                        if (((PopMessageRequestHeader) requestHeader).isOrder() && orderCountInfo != null) {
+                            Integer count = orderCountInfo.get(key);
+                            if (count != null && count > 0) {
+                                messageExt.setReconsumeTimes(count);
+                            }
+                        }
+                    }
+                    if (messageExt.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
+                        messageExt.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(responseHeader.getPopTime()));
+                    }
+                }
+                messageExt.setTopic(NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()));
+            }
+        }
+        return popResult;
+    }
+
     public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
@@ -2262,4 +2516,24 @@ public class MQClientAPIImpl {
                 return false;
         }
     }
+
+    public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup,
+        final MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SET_MESSAGE_REQUEST_MODE, null);
+
+        SetMessageRequestModeRequestBody requestBody = new SetMessageRequestModeRequestBody();
+        requestBody.setTopic(topic);
+        requestBody.setConsumerGroup(consumerGroup);
+        requestBody.setMode(mode);
+        requestBody.setPopShareQueueNum(popShareQueueNum);
+        request.setBody(requestBody.encode());
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+        assert response != null;
+        if (ResponseCode.SUCCESS != response.getCode()) {
+            throw new MQClientException(response.getCode(), response.getRemark());
+        }
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index b37f8a6..35102b4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -29,7 +29,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -237,6 +236,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         }
     }
 
+    @Override
+    public void submitPopConsumeRequest(final List<MessageExt> msgs,
+        final PopProcessQueue processQueue,
+        final MessageQueue messageQueue) {
+        throw new UnsupportedOperationException();
+    }
 
     private void cleanExpireMsg() {
         Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
@@ -386,6 +391,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
             ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
             ConsumeConcurrentlyStatus status = null;
+            defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
             defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
 
             ConsumeMessageContext consumeMessageContext = null;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 130effa..ecb3017 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -26,7 +26,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -39,17 +38,17 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.common.utils.ThreadUtils;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.CMResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 public class ConsumeMessageOrderlyService implements ConsumeMessageService {
@@ -205,6 +204,13 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         }
     }
 
+    @Override
+    public void submitPopConsumeRequest(final List<MessageExt> msgs,
+                                        final PopProcessQueue processQueue,
+                                        final MessageQueue messageQueue) {
+        throw new UnsupportedOperationException();
+    }
+
     public synchronized void lockMQPeriodically() {
         if (!this.stopped) {
             this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
similarity index 64%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
index b37f8a6..910f592 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
@@ -19,9 +19,7 @@ package org.apache.rocketmq.client.impl.consumer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -29,7 +27,8 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -41,15 +40,17 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.CMResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
-public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
+public class ConsumeMessagePopConcurrentlyService implements ConsumeMessageService {
     private static final InternalLogger log = ClientLogger.getLog();
     private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
     private final DefaultMQPushConsumer defaultMQPushConsumer;
@@ -59,9 +60,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     private final String consumerGroup;
 
     private final ScheduledExecutorService scheduledExecutorService;
-    private final ScheduledExecutorService cleanExpireMsgExecutors;
 
-    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+    public ConsumeMessagePopConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
         MessageListenerConcurrently messageListener) {
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
         this.messageListener = messageListener;
@@ -79,24 +79,14 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             new ThreadFactoryImpl("ConsumeMessageThread_"));
 
         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
-        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
     }
 
     public void start() {
-        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
-
-            @Override
-            public void run() {
-                cleanExpireMsg();
-            }
-
-        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
     }
 
     public void shutdown(long awaitTerminateMillis) {
         this.scheduledExecutorService.shutdown();
         ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
-        this.cleanExpireMsgExecutors.shutdown();
     }
 
     @Override
@@ -110,32 +100,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
 
     @Override
     public void incCorePoolSize() {
-        // long corePoolSize = this.consumeExecutor.getCorePoolSize();
-        // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax())
-        // {
-        // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
-        // + 1);
-        // }
-        // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
-        // {}",
-        // corePoolSize,
-        // this.consumeExecutor.getCorePoolSize(),
-        // this.consumerGroup);
     }
 
     @Override
     public void decCorePoolSize() {
-        // long corePoolSize = this.consumeExecutor.getCorePoolSize();
-        // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin())
-        // {
-        // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
-        // - 1);
-        // }
-        // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
-        // {}",
-        // corePoolSize,
-        // this.consumeExecutor.getCorePoolSize(),
-        // this.consumerGroup);
     }
 
     @Override
@@ -143,6 +111,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         return this.consumeExecutor.getCorePoolSize();
     }
 
+
     @Override
     public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
         ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
@@ -186,7 +155,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
 
             log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
                 RemotingHelper.exceptionSimpleDesc(e),
-                ConsumeMessageConcurrentlyService.this.consumerGroup,
+                ConsumeMessagePopConcurrentlyService.this.consumerGroup,
                 msgs,
                 mq), e);
         }
@@ -199,11 +168,16 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     }
 
     @Override
-    public void submitConsumeRequest(
+    public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue,
+                                     MessageQueue messageQueue, boolean dispathToConsume) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void submitPopConsumeRequest(
         final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
-        final MessageQueue messageQueue,
-        final boolean dispatchToConsume) {
+        final PopProcessQueue processQueue,
+        final MessageQueue messageQueue) {
         final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
         if (msgs.size() <= consumeBatchSize) {
             ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
@@ -237,26 +211,17 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         }
     }
 
-
-    private void cleanExpireMsg() {
-        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
-            this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<MessageQueue, ProcessQueue> next = it.next();
-            ProcessQueue pq = next.getValue();
-            pq.cleanExpiredMsg(this.defaultMQPushConsumer);
-        }
-    }
-
     public void processConsumeResult(
         final ConsumeConcurrentlyStatus status,
         final ConsumeConcurrentlyContext context,
-        final ConsumeRequest consumeRequest
-    ) {
-        int ackIndex = context.getAckIndex();
+        final ConsumeRequest consumeRequest) {
 
-        if (consumeRequest.getMsgs().isEmpty())
+        if (consumeRequest.getMsgs().isEmpty()) {
             return;
+        }
+
+        int ackIndex = context.getAckIndex();
+        String topic = consumeRequest.getMessageQueue().getTopic();
 
         switch (status) {
             case CONSUME_SUCCESS:
@@ -265,74 +230,94 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
                 }
                 int ok = ackIndex + 1;
                 int failed = consumeRequest.getMsgs().size() - ok;
-                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
-                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
+                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, topic, ok);
+                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, topic, failed);
                 break;
             case RECONSUME_LATER:
                 ackIndex = -1;
-                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
-                    consumeRequest.getMsgs().size());
+                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, topic,
+                        consumeRequest.getMsgs().size());
                 break;
             default:
                 break;
         }
 
-        switch (this.defaultMQPushConsumer.getMessageModel()) {
-            case BROADCASTING:
-                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
-                    MessageExt msg = consumeRequest.getMsgs().get(i);
-                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
-                }
-                break;
-            case CLUSTERING:
-                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
-                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
-                    MessageExt msg = consumeRequest.getMsgs().get(i);
-                    boolean result = this.sendMessageBack(msg, context);
-                    if (!result) {
-                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
-                        msgBackFailed.add(msg);
-                    }
-                }
+        //ack if consume success
+        for (int i = 0; i <= ackIndex; i++) {
+            this.defaultMQPushConsumerImpl.ackAsync(consumeRequest.getMsgs().get(i), consumerGroup);
+            consumeRequest.getPopProcessQueue().ack();
+        }
 
-                if (!msgBackFailed.isEmpty()) {
-                    consumeRequest.getMsgs().removeAll(msgBackFailed);
+        //consume later if consume fail
+        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
+            MessageExt msgExt = consumeRequest.getMsgs().get(i);
+            consumeRequest.getPopProcessQueue().ack();
+            if (msgExt.getReconsumeTimes() >= this.defaultMQPushConsumerImpl.getMaxReconsumeTimes()) {
+                checkNeedAckOrDelay(msgExt);
+                continue;
+            }
 
-                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
-                }
-                break;
-            default:
-                break;
+            int delayLevel = context.getDelayLevelWhenNextConsume();
+            changePopInvisibleTime(consumeRequest.getMsgs().get(i), consumerGroup, delayLevel);
         }
+    }
+
+    private void checkNeedAckOrDelay(MessageExt msgExt) {
+        int[] delayLevelTable = this.defaultMQPushConsumerImpl.getPopDelayLevel();
+
+        long msgDelaytime = System.currentTimeMillis() - msgExt.getBornTimestamp();
+        if (msgDelaytime > delayLevelTable[delayLevelTable.length - 1] * 1000 * 2) {
+            log.warn("Consume too many times, ack message async. message {}", msgExt.toString());
+            this.defaultMQPushConsumerImpl.ackAsync(msgExt, consumerGroup);
+        } else {
+            int delayLevel = delayLevelTable.length - 1;
+            for (; delayLevel >= 0; delayLevel--) {
+                if (msgDelaytime >= delayLevelTable[delayLevel] * 1000) {
+                    delayLevel++;
+                    break;
+                }
+            }
 
-        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
-        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
-            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
+            changePopInvisibleTime(msgExt, consumerGroup, delayLevel);
+            log.warn("Consume too many times, but delay time {} not enough. changePopInvisibleTime to delayLevel {} . message key:{}",
+                msgDelaytime, delayLevel, msgExt.getKeys());
         }
     }
 
-    public ConsumerStatsManager getConsumerStatsManager() {
-        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
-    }
+    private void changePopInvisibleTime(final MessageExt msg, String consumerGroup, int delayLevel) {
+        if (0 == delayLevel) {
+            delayLevel = 3 + msg.getReconsumeTimes();
+        }
 
-    public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
-        int delayLevel = context.getDelayLevelWhenNextConsume();
+        int[] delayLevelTable = this.defaultMQPushConsumerImpl.getPopDelayLevel();
+        int delaySecond = delayLevel >= delayLevelTable.length ? delayLevelTable[delayLevelTable.length - 1] : delayLevelTable[delayLevel];
+        String extraInfo = msg.getProperty(MessageConst.PROPERTY_POP_CK);
 
-        // Wrap topic with namespace before sending back message.
-        msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
         try {
-            this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
-            return true;
-        } catch (Exception e) {
-            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
+            this.defaultMQPushConsumerImpl.changePopInvisibleTimeAsync(msg.getTopic(), consumerGroup, extraInfo,
+                    delaySecond * 1000L, new AckCallback() {
+                        @Override
+                        public void onSuccess(AckResult ackResult) {
+                        }
+
+
+                        @Override
+                        public void onException(Throwable e) {
+                            log.error("changePopInvisibleTimeAsync fail. msg:{} error info: {}", msg.toString(), e.toString());
+                        }
+                    });
+        } catch (Throwable t) {
+            log.error("changePopInvisibleTimeAsync fail, group:{} msg:{} errorInfo:{}", consumerGroup, msg.toString(), t.toString());
         }
+    }
 
-        return false;
+    public ConsumerStatsManager getConsumerStatsManager() {
+        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
     }
 
     private void submitConsumeRequestLater(
         final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
+        final PopProcessQueue processQueue,
         final MessageQueue messageQueue
     ) {
 
@@ -340,7 +325,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
 
             @Override
             public void run() {
-                ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
+                ConsumeMessagePopConcurrentlyService.this.submitPopConsumeRequest(msgs, processQueue, messageQueue);
             }
         }, 5000, TimeUnit.MILLISECONDS);
     }
@@ -352,44 +337,71 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
 
             @Override
             public void run() {
-                ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
+                ConsumeMessagePopConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
             }
         }, 5000, TimeUnit.MILLISECONDS);
     }
 
     class ConsumeRequest implements Runnable {
         private final List<MessageExt> msgs;
-        private final ProcessQueue processQueue;
+        private final PopProcessQueue processQueue;
         private final MessageQueue messageQueue;
+        private long popTime = 0;
+        private long invisibleTime = 0;
 
-        public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
+        public ConsumeRequest(List<MessageExt> msgs, PopProcessQueue processQueue, MessageQueue messageQueue) {
             this.msgs = msgs;
             this.processQueue = processQueue;
             this.messageQueue = messageQueue;
+
+            try {
+                String extraInfo = msgs.get(0).getProperty(MessageConst.PROPERTY_POP_CK);
+                String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+                popTime = ExtraInfoUtil.getPopTime(extraInfoStrs);
+                invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfoStrs);
+            } catch (Throwable t) {
+                log.error("parse extra info error. msg:" + msgs.get(0), t);
+            }
+        }
+
+        public boolean isPopTimeout() {
+            if (msgs.size() == 0 || popTime <= 0 || invisibleTime <= 0) {
+                return true;
+            }
+
+            long current = System.currentTimeMillis();
+            return current - popTime >= invisibleTime;
         }
 
         public List<MessageExt> getMsgs() {
             return msgs;
         }
 
-        public ProcessQueue getProcessQueue() {
+        public PopProcessQueue getPopProcessQueue() {
             return processQueue;
         }
 
         @Override
         public void run() {
             if (this.processQueue.isDropped()) {
-                log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
+                log.info("the message queue not be able to consume, because it's dropped(pop). group={} {}", ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.messageQueue);
+                return;
+            }
+
+            if (isPopTimeout()) {
+                log.info("the pop message time out so abort consume. popTime={} invisibleTime={}, group={} {}",
+                        popTime, invisibleTime, ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.messageQueue);
+                processQueue.decFoundMsg(-msgs.size());
                 return;
             }
 
-            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
+            MessageListenerConcurrently listener = ConsumeMessagePopConcurrentlyService.this.messageListener;
             ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
             ConsumeConcurrentlyStatus status = null;
             defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
 
             ConsumeMessageContext consumeMessageContext = null;
-            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
+            if (ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                 consumeMessageContext = new ConsumeMessageContext();
                 consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                 consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
@@ -397,7 +409,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
                 consumeMessageContext.setMq(messageQueue);
                 consumeMessageContext.setMsgList(msgs);
                 consumeMessageContext.setSuccess(false);
-                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
+                ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
             }
 
             long beginTimestamp = System.currentTimeMillis();
@@ -413,7 +425,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             } catch (Throwable e) {
                 log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                     RemotingHelper.exceptionSimpleDesc(e),
-                    ConsumeMessageConcurrentlyService.this.consumerGroup,
+                    ConsumeMessagePopConcurrentlyService.this.consumerGroup,
                     msgs,
                     messageQueue);
                 hasException = true;
@@ -425,7 +437,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
                 } else {
                     returnType = ConsumeReturnType.RETURNNULL;
                 }
-            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
+            } else if (consumeRT >= invisibleTime * 1000) {
                 returnType = ConsumeReturnType.TIME_OUT;
             } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                 returnType = ConsumeReturnType.FAILED;
@@ -433,31 +445,30 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
                 returnType = ConsumeReturnType.SUCCESS;
             }
 
-            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
-                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
-            }
-
             if (null == status) {
                 log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
-                    ConsumeMessageConcurrentlyService.this.consumerGroup,
+                    ConsumeMessagePopConcurrentlyService.this.consumerGroup,
                     msgs,
                     messageQueue);
                 status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
             }
 
-            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
-                consumeMessageContext.setStatus(status.toString());
-                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
-                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
+            if (ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
+                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
             }
 
-            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
-                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
+            ConsumeMessagePopConcurrentlyService.this.getConsumerStatsManager()
+                .incConsumeRT(ConsumeMessagePopConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
 
-            if (!processQueue.isDropped()) {
-                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
+            if (!processQueue.isDropped() && !isPopTimeout()) {
+                ConsumeMessagePopConcurrentlyService.this.processConsumeResult(status, context, this);
             } else {
-                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
+                if (msgs != null) {
+                    processQueue.decFoundMsg(-msgs.size());
+                }
+
+                log.warn("processQueue invalid. isDropped={}, isPopTimeout={}, messageQueue={}, msgs={}",
+                        processQueue.isDropped(), isPopTimeout(), messageQueue, msgs);
             }
         }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
new file mode 100644
index 0000000..48e0336
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import io.netty.util.internal.ConcurrentSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class ConsumeMessagePopOrderlyService implements ConsumeMessageService {
+    private static final InternalLogger log = ClientLogger.getLog();
+    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+    private final DefaultMQPushConsumer defaultMQPushConsumer;
+    private final MessageListenerOrderly messageListener;
+    private final BlockingQueue<Runnable> consumeRequestQueue;
+    private final ConcurrentSet<ConsumeRequest> consumeRequestSet = new ConcurrentSet<ConsumeRequest>();
+    private final ThreadPoolExecutor consumeExecutor;
+    private final String consumerGroup;
+    private final MessageQueueLock messageQueueLock = new MessageQueueLock();
+    private final MessageQueueLock consumeRequestLock = new MessageQueueLock();
+    private final ScheduledExecutorService scheduledExecutorService;
+    private volatile boolean stopped = false;
+
+    public ConsumeMessagePopOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+        MessageListenerOrderly messageListener) {
+        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
+        this.messageListener = messageListener;
+
+        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
+        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
+        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
+
+        this.consumeExecutor = new ThreadPoolExecutor(
+            this.defaultMQPushConsumer.getConsumeThreadMin(),
+            this.defaultMQPushConsumer.getConsumeThreadMax(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            this.consumeRequestQueue,
+            new ThreadFactoryImpl("ConsumeMessageThread_"));
+
+        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
+    }
+
+    @Override
+    public void start() {
+        if (MessageModel.CLUSTERING.equals(ConsumeMessagePopOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    ConsumeMessagePopOrderlyService.this.lockMQPeriodically();
+                }
+            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void shutdown(long awaitTerminateMillis) {
+        this.stopped = true;
+        this.scheduledExecutorService.shutdown();
+        ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
+        if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
+            this.unlockAllMessageQueues();
+        }
+    }
+
+    public synchronized void unlockAllMessageQueues() {
+        this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
+    }
+
+    @Override
+    public void updateCorePoolSize(int corePoolSize) {
+        if (corePoolSize > 0
+            && corePoolSize <= Short.MAX_VALUE
+            && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
+            this.consumeExecutor.setCorePoolSize(corePoolSize);
+        }
+    }
+
+    @Override
+    public void incCorePoolSize() {
+    }
+
+    @Override
+    public void decCorePoolSize() {
+    }
+
+    @Override
+    public int getCorePoolSize() {
+        return this.consumeExecutor.getCorePoolSize();
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
+        ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
+        result.setOrder(true);
+
+        List<MessageExt> msgs = new ArrayList<MessageExt>();
+        msgs.add(msg);
+        MessageQueue mq = new MessageQueue();
+        mq.setBrokerName(brokerName);
+        mq.setTopic(msg.getTopic());
+        mq.setQueueId(msg.getQueueId());
+
+        ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
+
+        this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
+
+        final long beginTime = System.currentTimeMillis();
+
+        log.info("consumeMessageDirectly receive new message: {}", msg);
+
+        try {
+            ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
+            if (status != null) {
+                switch (status) {
+                    case COMMIT:
+                        result.setConsumeResult(CMResult.CR_COMMIT);
+                        break;
+                    case ROLLBACK:
+                        result.setConsumeResult(CMResult.CR_ROLLBACK);
+                        break;
+                    case SUCCESS:
+                        result.setConsumeResult(CMResult.CR_SUCCESS);
+                        break;
+                    case SUSPEND_CURRENT_QUEUE_A_MOMENT:
+                        result.setConsumeResult(CMResult.CR_LATER);
+                        break;
+                    default:
+                        break;
+                }
+            } else {
+                result.setConsumeResult(CMResult.CR_RETURN_NULL);
+            }
+        } catch (Throwable e) {
+            result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
+            result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
+
+            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
+                RemotingHelper.exceptionSimpleDesc(e),
+                ConsumeMessagePopOrderlyService.this.consumerGroup,
+                msgs,
+                mq), e);
+        }
+
+        result.setAutoCommit(context.isAutoCommit());
+        result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
+
+        log.info("consumeMessageDirectly Result: {}", result);
+
+        return result;
+    }
+
+    @Override
+    public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue,
+                                     MessageQueue messageQueue, boolean dispathToConsume) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void submitPopConsumeRequest(final List<MessageExt> msgs,
+                                     final PopProcessQueue processQueue,
+                                     final MessageQueue messageQueue) {
+        ConsumeRequest req = new ConsumeRequest(processQueue, messageQueue);
+        submitConsumeRequest(req, false);
+    }
+
+    public synchronized void lockMQPeriodically() {
+        if (!this.stopped) {
+            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
+        }
+    }
+
+    private void removeConsumeRequest(final ConsumeRequest consumeRequest) {
+        consumeRequestSet.remove(consumeRequest);
+    }
+
+    private void submitConsumeRequest(final ConsumeRequest consumeRequest, boolean force) {
+        Object lock = consumeRequestLock.fetchLockObject(consumeRequest.getMessageQueue(), consumeRequest.shardingKeyIndex);
+        synchronized (lock) {
+            boolean isNewReq = consumeRequestSet.add(consumeRequest);
+            if (force || isNewReq) {
+                try {
+                    consumeExecutor.submit(consumeRequest);
+                } catch (Exception e) {
+                    log.error("error submit consume request: {}, mq: {}, shardingKeyIndex: {}",
+                        e.toString(), consumeRequest.getMessageQueue(), consumeRequest.getShardingKeyIndex());
+                }
+            }
+        }
+    }
+
+    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest, final long suspendTimeMillis) {
+        long timeMillis = suspendTimeMillis;
+        if (timeMillis == -1) {
+            timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
+        }
+
+        if (timeMillis < 10) {
+            timeMillis = 10;
+        } else if (timeMillis > 30000) {
+            timeMillis = 30000;
+        }
+
+        this.scheduledExecutorService.schedule(new Runnable() {
+
+            @Override
+            public void run() {
+                submitConsumeRequest(consumeRequest, true);
+            }
+        }, timeMillis, TimeUnit.MILLISECONDS);
+    }
+
+    public boolean processConsumeResult(
+        final List<MessageExt> msgs,
+        final ConsumeOrderlyStatus status,
+        final ConsumeOrderlyContext context,
+        final ConsumeRequest consumeRequest
+    ) {
+        return true;
+    }
+
+    public ConsumerStatsManager getConsumerStatsManager() {
+        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
+    }
+
+    private int getMaxReconsumeTimes() {
+        // default reconsume times: Integer.MAX_VALUE
+        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
+            return Integer.MAX_VALUE;
+        } else {
+            return this.defaultMQPushConsumer.getMaxReconsumeTimes();
+        }
+    }
+
+    private boolean checkReconsumeTimes(List<MessageExt> msgs) {
+        boolean suspend = false;
+        if (msgs != null && !msgs.isEmpty()) {
+            for (MessageExt msg : msgs) {
+                if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
+                    MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
+                    if (!sendMessageBack(msg)) {
+                        suspend = true;
+                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
+                    }
+                } else {
+                    suspend = true;
+                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
+                }
+            }
+        }
+        return suspend;
+    }
+
+    public boolean sendMessageBack(final MessageExt msg) {
+        try {
+            // max reconsume times exceeded then send to dead letter queue.
+            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
+            String originMsgId = MessageAccessor.getOriginMessageId(msg);
+            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+            newMsg.setFlag(msg.getFlag());
+            MessageAccessor.setProperties(newMsg, msg.getProperties());
+            MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
+            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+
+            this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
+            return true;
+        } catch (Exception e) {
+            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
+        }
+
+        return false;
+    }
+
+    public void resetNamespace(final List<MessageExt> msgs) {
+        for (MessageExt msg : msgs) {
+            if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
+                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
+            }
+        }
+    }
+
+    class ConsumeRequest implements Runnable {
+        private final PopProcessQueue processQueue;
+        private final MessageQueue messageQueue;
+        private int shardingKeyIndex = 0;
+
+        public ConsumeRequest(PopProcessQueue processQueue, MessageQueue messageQueue) {
+            this.processQueue = processQueue;
+            this.messageQueue = messageQueue;
+            this.shardingKeyIndex = 0;
+        }
+
+        public ConsumeRequest(PopProcessQueue processQueue, MessageQueue messageQueue, int shardingKeyIndex) {
+            this.processQueue = processQueue;
+            this.messageQueue = messageQueue;
+            this.shardingKeyIndex = shardingKeyIndex;
+        }
+
+        public PopProcessQueue getProcessQueue() {
+            return processQueue;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+
+        public int getShardingKeyIndex() {
+            return shardingKeyIndex;
+        }
+
+        @Override
+        public void run() {
+            if (this.processQueue.isDropped()) {
+                log.warn("run, message queue not be able to consume, because it's dropped. {}", this.messageQueue);
+                ConsumeMessagePopOrderlyService.this.removeConsumeRequest(this);
+                return;
+            }
+
+            // lock on sharding key index
+            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue, shardingKeyIndex);
+        }
+
+        @Override
+        public int hashCode() {
+            int hash = shardingKeyIndex;
+            if (processQueue != null) {
+                hash += processQueue.hashCode() * 31;
+            }
+            if (messageQueue != null) {
+                hash += messageQueue.hashCode() * 31;
+            }
+            return hash;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+
+            ConsumeRequest other = (ConsumeRequest) obj;
+            if (shardingKeyIndex != other.shardingKeyIndex) {
+                return false;
+            }
+
+            if (processQueue != other.processQueue) {
+                return false;
+            }
+
+            if (messageQueue == other.messageQueue) {
+                return true;
+            }
+            if (messageQueue != null && messageQueue.equals(other.messageQueue)) {
+                return true;
+            }
+            return false;
+        }
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 5078c97..bdde6ff 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -41,4 +41,9 @@ public interface ConsumeMessageService {
         final ProcessQueue processQueue,
         final MessageQueue messageQueue,
         final boolean dispathToConsume);
+
+    void submitPopConsumeRequest(
+        final List<MessageExt> msgs,
+        final PopProcessQueue processQueue,
+        final MessageQueue messageQueue);
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f832370..5cf814b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -142,6 +142,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     private final MessageQueueLock messageQueueLock = new MessageQueueLock();
 
+    // only for test purpose, will be modified by reflection in unit test.
+    @SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
+
     public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
         this.defaultLitePullConsumer = defaultLitePullConsumer;
         this.rpcHook = rpcHook;
@@ -417,6 +420,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     }
 
     private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        if (doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged) {
+            return;
+        }
         Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
         if (subTable != null) {
             for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index e08f780..64126b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -27,12 +27,17 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -46,39 +51,47 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.hook.ConsumeMessageContext;
 import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageContext;
 import org.apache.rocketmq.client.hook.FilterMessageHook;
 import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo;
 import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
+
 public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     /**
      * Delay some time when exception occur
@@ -109,9 +122,20 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private MessageListener messageListenerInner;
     private OffsetStore offsetStore;
     private ConsumeMessageService consumeMessageService;
+    private ConsumeMessageService consumeMessagePopService;
     private long queueFlowControlTimes = 0;
     private long queueMaxSpanFlowControlTimes = 0;
 
+    //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
+    private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
+
+    private static final int MAX_POP_INVISIBLE_TIME = 300000;
+    private static final int MIN_POP_INVISIBLE_TIME = 5000;
+    private static final int ASYNC_TIMEOUT = 3000;
+
+    // only for test purpose, will be modified by reflection in unit test.
+    @SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
+
     public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
         this.rpcHook = rpcHook;
@@ -443,6 +467,169 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
+    void popMessage(final PopRequest popRequest) {
+        final PopProcessQueue processQueue = popRequest.getPopProcessQueue();
+        if (processQueue.isDropped()) {
+            log.info("the pop request[{}] is dropped.", popRequest.toString());
+            return;
+        }
+
+        processQueue.setLastPopTimestamp(System.currentTimeMillis());
+
+        try {
+            this.makeSureStateOK();
+        } catch (MQClientException e) {
+            log.warn("pullMessage exception, consumer state not ok", e);
+            this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+            return;
+        }
+
+        if (this.isPause()) {
+            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
+            this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
+            return;
+        }
+
+        if (processQueue.getWaiAckMsgCount() > this.defaultMQPushConsumer.getPopThresholdForQueue()) {
+            this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+            if ((queueFlowControlTimes++ % 1000) == 0) {
+                log.warn("the messages waiting to ack exceeds the threshold {}, so do flow control, popRequest={}, flowControlTimes={}, wait count={}",
+                    this.defaultMQPushConsumer.getPopThresholdForQueue(), popRequest, queueFlowControlTimes, processQueue.getWaiAckMsgCount());
+            }
+            return;
+        }
+
+        //POPTODO think of pop mode orderly implementation later.
+        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(popRequest.getMessageQueue().getTopic());
+        if (null == subscriptionData) {
+            this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+            log.warn("find the consumer's subscription failed, {}", popRequest);
+            return;
+        }
+
+        final long beginTimestamp = System.currentTimeMillis();
+
+        PopCallback popCallback = new PopCallback() {
+            @Override
+            public void onSuccess(PopResult popResult) {
+                if (popResult == null) {
+                    log.error("pop callback popResult is null");
+                    DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
+                    return;
+                }
+
+                processPopResult(popResult, subscriptionData);
+
+                switch (popResult.getPopStatus()) {
+                    case FOUND:
+                        long pullRT = System.currentTimeMillis() - beginTimestamp;
+                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(popRequest.getConsumerGroup(),
+                            popRequest.getMessageQueue().getTopic(), pullRT);
+                        if (popResult.getMsgFoundList() == null || popResult.getMsgFoundList().isEmpty()) {
+                            DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
+                        } else {
+                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(popRequest.getConsumerGroup(),
+                                popRequest.getMessageQueue().getTopic(), popResult.getMsgFoundList().size());
+                            popRequest.getPopProcessQueue().incFoundMsg(popResult.getMsgFoundList().size());
+
+                            DefaultMQPushConsumerImpl.this.consumeMessagePopService.submitPopConsumeRequest(
+                                popResult.getMsgFoundList(),
+                                processQueue,
+                                popRequest.getMessageQueue());
+
+                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
+                                DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest,
+                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
+                            } else {
+                                DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
+                            }
+                        }
+                        break;
+                    case NO_NEW_MSG:
+                    case POLLING_NOT_FOUND:
+                        DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
+                        break;
+                    case POLLING_FULL:
+                        DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+                        break;
+                    default:
+                        DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+                        break;
+                }
+
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                if (!popRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                    log.warn("execute the pull request exception: {}", e);
+                }
+
+                DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+            }
+        };
+
+
+        try {
+
+            long invisibleTime = this.defaultMQPushConsumer.getPopInvisibleTime();
+            if (invisibleTime < MIN_POP_INVISIBLE_TIME || invisibleTime > MAX_POP_INVISIBLE_TIME) {
+                invisibleTime = 60000;
+            }
+            this.pullAPIWrapper.popAsync(popRequest.getMessageQueue(), invisibleTime, this.defaultMQPushConsumer.getPopBatchNums(),
+                    popRequest.getConsumerGroup(), BROKER_SUSPEND_MAX_TIME_MILLIS, popCallback, true, popRequest.getInitMode(),
+                    false, subscriptionData.getExpressionType(), subscriptionData.getSubString());
+        } catch (Exception e) {
+            log.error("popAsync exception", e);
+            this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+        }
+    }
+
+    private PopResult processPopResult(final PopResult popResult, final SubscriptionData subscriptionData) {
+        if (PopStatus.FOUND == popResult.getPopStatus()) {
+            List<MessageExt> msgFoundList = popResult.getMsgFoundList();
+            List<MessageExt> msgListFilterAgain = msgFoundList;
+            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()
+                    && popResult.getMsgFoundList().size() > 0) {
+                msgListFilterAgain = new ArrayList<MessageExt>(popResult.getMsgFoundList().size());
+                for (MessageExt msg : popResult.getMsgFoundList()) {
+                    if (msg.getTags() != null) {
+                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
+                            msgListFilterAgain.add(msg);
+                        }
+                    }
+                }
+            }
+
+            if (!this.filterMessageHookList.isEmpty()) {
+                FilterMessageContext filterMessageContext = new FilterMessageContext();
+                filterMessageContext.setUnitMode(this.defaultMQPushConsumer.isUnitMode());
+                filterMessageContext.setMsgList(msgListFilterAgain);
+                if (!this.filterMessageHookList.isEmpty()) {
+                    for (FilterMessageHook hook : this.filterMessageHookList) {
+                        try {
+                            hook.filterMessage(filterMessageContext);
+                        } catch (Throwable e) {
+                            log.error("execute hook error. hookName={}", hook.hookName());
+                        }
+                    }
+                }
+            }
+
+            if (msgFoundList.size() != msgListFilterAgain.size()) {
+                for (MessageExt msg : msgFoundList) {
+                    if (!msgListFilterAgain.contains(msg)) {
+                        ackAsync(msg, this.groupName());
+                    }
+                }
+            }
+
+            popResult.setMsgFoundList(msgListFilterAgain);
+        }
+
+        return popResult;
+    }
+
     private void makeSureStateOK() throws MQClientException {
         if (this.serviceState != ServiceState.RUNNING) {
             throw new MQClientException("The consumer service state not OK, "
@@ -452,7 +639,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
-    private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
+    void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
         this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
     }
 
@@ -472,6 +659,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
     }
 
+    void executePopPullRequestLater(final PopRequest pullRequest, final long timeDelay) {
+        this.mQClientFactory.getPullMessageService().executePopPullRequestLater(pullRequest, timeDelay);
+    }
+
+    void executePopPullRequestImmediately(final PopRequest pullRequest) {
+        this.mQClientFactory.getPullMessageService().executePopPullRequestImmediately(pullRequest);
+    }
+
     private void correctTagsOffset(final PullRequest pullRequest) {
         if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
             this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
@@ -531,7 +726,78 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
-    private int getMaxReconsumeTimes() {
+    void ackAsync(MessageExt message, String consumerGroup) {
+        final String extraInfo = message.getProperty(MessageConst.PROPERTY_POP_CK);
+
+        try {
+            String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+            String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
+            int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+            long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
+            String topic = message.getTopic();
+
+            FindBrokerResult
+                findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+            if (null == findBrokerResult) {
+                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+                findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+            }
+
+            if (findBrokerResult == null) {
+                log.error("The broker[" + brokerName + "] not exist");
+                return;
+            }
+
+            AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
+            requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs, topic, consumerGroup));
+            requestHeader.setQueueId(queueId);
+            requestHeader.setOffset(queueOffset);
+            requestHeader.setConsumerGroup(consumerGroup);
+            requestHeader.setExtraInfo(extraInfo);
+            this.mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), ASYNC_TIMEOUT, new AckCallback() {
+                @Override
+                public void onSuccess(AckResult ackResult) {
+                    if (ackResult != null && !AckStatus.OK.equals(ackResult.getStatus())) {
+                        log.info("Ack message fail. ackResult: {}, extraInfo: {}", ackResult, extraInfo);
+                    }
+                }
+                @Override
+                public void onException(Throwable e) {
+                    log.info("Ack message fail. extraInfo: {}  error message: {}", extraInfo, e.toString());
+                }
+            }, requestHeader);
+
+        } catch (Throwable t) {
+            log.error("ack async error.", t);
+        }
+    }
+
+    void changePopInvisibleTimeAsync(String topic, String consumerGroup, String extraInfo, long invisibleTime, AckCallback callback)
+            throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+        String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+        String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
+        int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+        FindBrokerResult
+                findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+        if (null == findBrokerResult) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+        }
+        if (findBrokerResult != null) {
+            ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
+            requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs, topic, consumerGroup));
+            requestHeader.setQueueId(queueId);
+            requestHeader.setOffset(ExtraInfoUtil.getQueueOffset(extraInfoStrs));
+            requestHeader.setConsumerGroup(consumerGroup);
+            requestHeader.setExtraInfo(extraInfo);
+            requestHeader.setInvisibleTime(invisibleTime);
+            this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback);
+            return;
+        }
+        throw new MQClientException("The broker[" + brokerName + "] not exist", null);
+    }
+
+    public int getMaxReconsumeTimes() {
         // default reconsume times: 16
         if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
             return 16;
@@ -612,13 +878,20 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     this.consumeOrderly = true;
                     this.consumeMessageService =
                         new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
+                    //POPTODO reuse Executor ?
+                    this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                     this.consumeOrderly = false;
                     this.consumeMessageService =
                         new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
+                    //POPTODO reuse Executor ?
+                    this.consumeMessagePopService =
+                        new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                 }
 
                 this.consumeMessageService.start();
+                // POPTODO
+                this.consumeMessagePopService.start();
 
                 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                 if (!registerOK) {
@@ -818,6 +1091,23 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
+
+        // popInvisibleTime
+        if (this.defaultMQPushConsumer.getPopInvisibleTime() < MIN_POP_INVISIBLE_TIME
+                || this.defaultMQPushConsumer.getPopInvisibleTime() > MAX_POP_INVISIBLE_TIME) {
+            throw new MQClientException(
+                    "popInvisibleTime Out of range [" + MIN_POP_INVISIBLE_TIME + ", " + MAX_POP_INVISIBLE_TIME + "]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // popBatchNums
+        if (this.defaultMQPushConsumer.getPopBatchNums() <= 0 || this.defaultMQPushConsumer.getPopBatchNums() > 32) {
+            throw new MQClientException(
+                    "popBatchNums Out of range [1, 32]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
     }
 
     private void copySubscription() throws MQClientException {
@@ -857,6 +1147,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        if (doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged) {
+            return;
+        }
         Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
         if (subTable != null) {
             for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
@@ -1074,6 +1367,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             info.getMqTable().put(mq, pqinfo);
         }
 
+        Iterator<Entry<MessageQueue, PopProcessQueue>> popIt = this.rebalanceImpl.getPopProcessQueueTable().entrySet().iterator();
+        while (popIt.hasNext()) {
+            Entry<MessageQueue, PopProcessQueue> next = popIt.next();
+            MessageQueue mq = next.getKey();
+            PopProcessQueue pq = next.getValue();
+
+            PopProcessQueueInfo pqinfo = new PopProcessQueueInfo();
+            pq.fillPopProcessQueueInfo(pqinfo);
+            info.getMqPopTable().put(mq, pqinfo);
+        }
+
         for (SubscriptionData sd : subSet) {
             ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());
             info.getStatusTable().put(sd.getTopic(), consumeStatus);
@@ -1142,6 +1446,19 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         return queueTimeSpan;
     }
 
+    public void tryResetPopRetryTopic(final List<MessageExt> msgs, String consumerGroup) {
+        String popRetryPrefix = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup + "_";
+        for (MessageExt msg : msgs) {
+            if (msg.getTopic().startsWith(popRetryPrefix)) {
+                String normalTopic = KeyBuilder.parseNormalTopic(msg.getTopic(), consumerGroup);
+                if (normalTopic != null && !normalTopic.isEmpty()) {
+                    msg.setTopic(normalTopic);
+                }
+            }
+        }
+    }
+
+
     public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
         final String groupTopic = MixAll.getRetryTopic(consumerGroup);
         for (MessageExt msg : msgs) {
@@ -1168,4 +1485,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
         this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
     }
+
+    int[] getPopDelayLevel() {
+        return popDelayLevel;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
index a02f1b6..73453b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
@@ -24,19 +24,32 @@ import org.apache.rocketmq.common.message.MessageQueue;
  * Message lock,strictly ensure the single queue only one thread at a time consuming
  */
 public class MessageQueueLock {
-    private ConcurrentMap<MessageQueue, Object> mqLockTable =
-        new ConcurrentHashMap<MessageQueue, Object>();
+    private ConcurrentMap<MessageQueue, ConcurrentMap<Integer, Object>> mqLockTable =
+        new ConcurrentHashMap<MessageQueue, ConcurrentMap<Integer, Object>>(32);
 
     public Object fetchLockObject(final MessageQueue mq) {
-        Object objLock = this.mqLockTable.get(mq);
-        if (null == objLock) {
-            objLock = new Object();
-            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
+        return fetchLockObject(mq, -1);
+    }
+
+    public Object fetchLockObject(final MessageQueue mq, final int shardingKeyIndex) {
+        ConcurrentMap<Integer, Object> objMap = this.mqLockTable.get(mq);
+        if (null == objMap) {
+            objMap = new ConcurrentHashMap<Integer, Object>(32);
+            ConcurrentMap<Integer, Object> prevObjMap = this.mqLockTable.putIfAbsent(mq, objMap);
+            if (prevObjMap != null) {
+                objMap = prevObjMap;
+            }
+        }
+
+        Object lock = objMap.get(shardingKeyIndex);
+        if (null == lock) {
+            lock = new Object();
+            Object prevLock = objMap.putIfAbsent(shardingKeyIndex, lock);
             if (prevLock != null) {
-                objLock = prevLock;
+                lock = prevLock;
             }
         }
 
-        return objLock;
+        return lock;
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageRequest.java
similarity index 53%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageRequest.java
index 5078c97..a808538 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageRequest.java
@@ -16,29 +16,8 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 
-public interface ConsumeMessageService {
-    void start();
-
-    void shutdown(long awaitTerminateMillis);
-
-    void updateCorePoolSize(int corePoolSize);
-
-    void incCorePoolSize();
-
-    void decCorePoolSize();
-
-    int getCorePoolSize();
-
-    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
-    void submitConsumeRequest(
-        final List<MessageExt> msgs,
-        final ProcessQueue processQueue,
-        final MessageQueue messageQueue,
-        final boolean dispathToConsume);
+public interface MessageRequest {
+    MessageRequestMode getMessageRequestMode();
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
new file mode 100644
index 0000000..0883a77
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo;
+
+/**
+ * Queue consumption snapshot
+ */
+public class PopProcessQueue {
+
+    private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
+
+    private long lastPopTimestamp;
+    private AtomicInteger waitAckCounter = new AtomicInteger(0);
+    private volatile boolean dropped = false;
+
+    public long getLastPopTimestamp() {
+        return lastPopTimestamp;
+    }
+
+    public void setLastPopTimestamp(long lastPopTimestamp) {
+        this.lastPopTimestamp = lastPopTimestamp;
+    }
+
+    public void incFoundMsg(int count) {
+        this.waitAckCounter.getAndAdd(count);
+    }
+
+    /**
+     * @return the value before decrement.
+     */
+    public int ack() {
+        return this.waitAckCounter.getAndDecrement();
+    }
+
+    public void decFoundMsg(int count) {
+        this.waitAckCounter.addAndGet(count);
+    }
+
+    public int getWaiAckMsgCount() {
+        return this.waitAckCounter.get();
+    }
+
+    public boolean isDropped() {
+        return dropped;
+    }
+
+    public void setDropped(boolean dropped) {
+        this.dropped = dropped;
+    }
+
+    public void fillPopProcessQueueInfo(final PopProcessQueueInfo info) {
+        info.setWaitAckCount(getWaiAckMsgCount());
+        info.setDroped(isDropped());
+        info.setLastPopTimestamp(getLastPopTimestamp());
+    }
+
+    public boolean isPullExpired() {
+        return (System.currentTimeMillis() - this.lastPopTimestamp) > PULL_MAX_IDLE_TIME;
+    }
+
+    @Override
+    public String toString() {
+        return "PopProcessQueue[waitAckCounter:" + this.waitAckCounter.get()
+                + ", lastPopTimestamp:" + getLastPopTimestamp()
+                + ", drop:" + dropped +  "]";
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopRequest.java
similarity index 64%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopRequest.java
index 10aded0..c47f2d0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopRequest.java
@@ -16,14 +16,17 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 
-public class PullRequest {
+public class PopRequest implements MessageRequest {
+    private String topic;
     private String consumerGroup;
     private MessageQueue messageQueue;
-    private ProcessQueue processQueue;
-    private long nextOffset;
+    private PopProcessQueue popProcessQueue;
     private boolean lockedFirst = false;
+    private int initMode = ConsumeInitMode.MAX;
 
     public boolean isLockedFirst() {
         return lockedFirst;
@@ -49,18 +52,35 @@ public class PullRequest {
         this.messageQueue = messageQueue;
     }
 
-    public long getNextOffset() {
-        return nextOffset;
+    public String getTopic() {
+        return topic;
     }
 
-    public void setNextOffset(long nextOffset) {
-        this.nextOffset = nextOffset;
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public PopProcessQueue getPopProcessQueue() {
+        return popProcessQueue;
+    }
+
+    public void setPopProcessQueue(PopProcessQueue popProcessQueue) {
+        this.popProcessQueue = popProcessQueue;
+    }
+
+    public int getInitMode() {
+        return initMode;
+    }
+
+    public void setInitMode(int initMode) {
+        this.initMode = initMode;
     }
 
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
         result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
         result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
         return result;
@@ -74,31 +94,38 @@ public class PullRequest {
             return false;
         if (getClass() != obj.getClass())
             return false;
-        PullRequest other = (PullRequest) obj;
+
+        PopRequest other = (PopRequest) obj;
+
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic)) {
+            return false;
+        }
+
         if (consumerGroup == null) {
             if (other.consumerGroup != null)
                 return false;
         } else if (!consumerGroup.equals(other.consumerGroup))
             return false;
+
         if (messageQueue == null) {
             if (other.messageQueue != null)
                 return false;
-        } else if (!messageQueue.equals(other.messageQueue))
+        } else if (!messageQueue.equals(other.messageQueue)) {
             return false;
+        }
         return true;
     }
 
     @Override
     public String toString() {
-        return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
-            + ", nextOffset=" + nextOffset + "]";
-    }
-
-    public ProcessQueue getProcessQueue() {
-        return processQueue;
+        return "PopRequest [topic=" + topic + ", consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue + "]";
     }
 
-    public void setProcessQueue(ProcessQueue processQueue) {
-        this.processQueue = processQueue;
+    @Override
+    public MessageRequestMode getMessageRequestMode() {
+        return MessageRequestMode.POP;
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index cc42a9e..95b609e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -23,6 +23,7 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.consumer.PopCallback;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -37,16 +38,17 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
 public class PullAPIWrapper {
@@ -269,4 +271,55 @@ public class PullAPIWrapper {
     public void setDefaultBrokerId(long defaultBrokerId) {
         this.defaultBrokerId = defaultBrokerId;
     }
+
+
+    /**
+     *
+     * @param mq
+     * @param invisibleTime
+     * @param maxNums
+     * @param consumerGroup
+     * @param timeout
+     * @param popCallback
+     * @param poll
+     * @param initMode
+    //     * @param expressionType
+    //     * @param expression
+     * @param order
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup,
+                         long timeout, PopCallback popCallback, boolean poll, int initMode, boolean order, String expressionType, String expression)
+        throws MQClientException, RemotingException, InterruptedException {
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+        if (null == findBrokerResult) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+        }
+        if (findBrokerResult != null) {
+            PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
+            requestHeader.setConsumerGroup(consumerGroup);
+            requestHeader.setTopic(mq.getTopic());
+            requestHeader.setQueueId(mq.getQueueId());
+            requestHeader.setMaxMsgNums(maxNums);
+            requestHeader.setInvisibleTime(invisibleTime);
+            requestHeader.setInitMode(initMode);
+            requestHeader.setExpType(expressionType);
+            requestHeader.setExp(expression);
+            requestHeader.setOrder(order);
+            //give 1000 ms for server response
+            if (poll) {
+                requestHeader.setPollTime(timeout);
+                requestHeader.setBornTime(System.currentTimeMillis());
+                // timeout + 10s, fix the too earlier timeout of client when long polling.
+                timeout += 10 * 1000;
+            }
+            String brokerAddr = findBrokerResult.getBrokerAddr();
+            this.mQClientFactory.getMQClientAPIImpl().popMessageAsync(mq.getBrokerName(), brokerAddr, requestHeader, timeout, popCallback);
+            return;
+        }
+        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index bd46a58..9665c6d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -24,12 +24,14 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
 
 public class PullMessageService extends ServiceThread {
     private final InternalLogger log = ClientLogger.getLog();
-    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
+    private final LinkedBlockingQueue<MessageRequest> messageRequestQueue = new LinkedBlockingQueue<MessageRequest>();
+
     private final MQClientInstance mQClientFactory;
     private final ScheduledExecutorService scheduledExecutorService = Executors
         .newSingleThreadScheduledExecutor(new ThreadFactory() {
@@ -58,7 +60,28 @@ public class PullMessageService extends ServiceThread {
 
     public void executePullRequestImmediately(final PullRequest pullRequest) {
         try {
-            this.pullRequestQueue.put(pullRequest);
+            this.messageRequestQueue.put(pullRequest);
+        } catch (InterruptedException e) {
+            log.error("executePullRequestImmediately pullRequestQueue.put", e);
+        }
+    }
+
+    public void executePopPullRequestLater(final PopRequest pullRequest, final long timeDelay) {
+        if (!isStopped()) {
+            this.scheduledExecutorService.schedule(new Runnable() {
+                @Override
+                public void run() {
+                    PullMessageService.this.executePopPullRequestImmediately(pullRequest);
+                }
+            }, timeDelay, TimeUnit.MILLISECONDS);
+        } else {
+            log.warn("PullMessageServiceScheduledThread has shutdown");
+        }
+    }
+
+    public void executePopPullRequestImmediately(final PopRequest pullRequest) {
+        try {
+            this.messageRequestQueue.put(pullRequest);
         } catch (InterruptedException e) {
             log.error("executePullRequestImmediately pullRequestQueue.put", e);
         }
@@ -86,14 +109,28 @@ public class PullMessageService extends ServiceThread {
         }
     }
 
+    private void popMessage(final PopRequest popRequest) {
+        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(popRequest.getConsumerGroup());
+        if (consumer != null) {
+            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
+            impl.popMessage(popRequest);
+        } else {
+            log.warn("No matched consumer for the PopRequest {}, drop it", popRequest);
+        }
+    }
+
     @Override
     public void run() {
         log.info(this.getServiceName() + " service started");
 
         while (!this.isStopped()) {
             try {
-                PullRequest pullRequest = this.pullRequestQueue.take();
-                this.pullMessage(pullRequest);
+                MessageRequest messageRequest = this.messageRequestQueue.take();
+                if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) {
+                    this.popMessage((PopRequest)messageRequest);
+                } else {
+                    this.pullMessage((PullRequest)messageRequest);
+                }
             } catch (InterruptedException ignored) {
             } catch (Exception e) {
                 log.error("Pull Message Service Run Method exception", e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
index 10aded0..71d1fdb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
@@ -17,8 +17,9 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 
-public class PullRequest {
+public class PullRequest implements MessageRequest {
     private String consumerGroup;
     private MessageQueue messageQueue;
     private ProcessQueue processQueue;
@@ -101,4 +102,9 @@ public class PullRequest {
     public void setProcessQueue(ProcessQueue processQueue) {
         this.processQueue = processQueue;
     }
+
+    @Override
+    public MessageRequestMode getMessageRequestMode() {
+        return MessageRequestMode.PULL;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index b8972a9..7e78c9b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -31,18 +31,26 @@ import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 
 public abstract class RebalanceImpl {
     protected static final InternalLogger log = ClientLogger.getLog();
+
     protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
+    protected final ConcurrentMap<MessageQueue, PopProcessQueue> popProcessQueueTable = new ConcurrentHashMap<MessageQueue, PopProcessQueue>(64);
+
     protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
         new ConcurrentHashMap<String, Set<MessageQueue>>();
     protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
@@ -51,6 +59,11 @@ public abstract class RebalanceImpl {
     protected MessageModel messageModel;
     protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
     protected MQClientInstance mQClientFactory;
+    private static final int TIMEOUT_CHECK_TIMES = 3;
+    private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000;
+
+    private Map<String, String> topicBrokerRebalance = new ConcurrentHashMap<String, String>();
+    private Map<String, String> topicClientRebalance = new ConcurrentHashMap<String, String>();
 
     public RebalanceImpl(String consumerGroup, MessageModel messageModel,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy,
@@ -88,8 +101,9 @@ public abstract class RebalanceImpl {
             final String brokerName = entry.getKey();
             final Set<MessageQueue> mqs = entry.getValue();
 
-            if (mqs.isEmpty())
+            if (mqs.isEmpty()) {
                 continue;
+            }
 
             FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
             if (findBrokerResult != null) {
@@ -117,7 +131,15 @@ public abstract class RebalanceImpl {
 
     private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
         HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
-        for (MessageQueue mq : this.processQueueTable.keySet()) {
+
+        for (Map.Entry<MessageQueue, ProcessQueue> entry : this.processQueueTable.entrySet()) {
+            MessageQueue mq = entry.getKey();
+            ProcessQueue pq = entry.getValue();
+
+            if (pq.isDropped()) {
+                continue;
+            }
+
             Set<MessageQueue> mqs = result.get(mq.getBrokerName());
             if (null == mqs) {
                 mqs = new HashSet<MessageQueue>();
@@ -150,10 +172,7 @@ public abstract class RebalanceImpl {
                 }
 
                 boolean lockOK = lockedMq.contains(mq);
-                log.info("the message queue lock {}, {} {}",
-                    lockOK ? "OK" : "Failed",
-                    this.consumerGroup,
-                    mq);
+                log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
                 return lockOK;
             } catch (Exception e) {
                 log.error("lockBatchMQ exception, " + mq, e);
@@ -172,8 +191,9 @@ public abstract class RebalanceImpl {
             final String brokerName = entry.getKey();
             final Set<MessageQueue> mqs = entry.getValue();
 
-            if (mqs.isEmpty())
+            if (mqs.isEmpty()) {
                 continue;
+            }
 
             FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
             if (findBrokerResult != null) {
@@ -213,29 +233,86 @@ public abstract class RebalanceImpl {
         }
     }
 
-    public void doRebalance(final boolean isOrder) {
+    public boolean clientRebalance(String topic) {
+        return true;
+    }
+
+    public boolean doRebalance(final boolean isOrder) {
+        boolean balanced = true;
         Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
         if (subTable != null) {
             for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                 final String topic = entry.getKey();
                 try {
-                    this.rebalanceByTopic(topic, isOrder);
+                    if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
+                        balanced = this.getRebalanceResultFromBroker(topic, isOrder);
+                    } else {
+                        balanced = this.rebalanceByTopic(topic, isOrder);
+                    }
                 } catch (Throwable e) {
                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                        log.warn("rebalanceByTopic Exception", e);
+                        log.warn("rebalance Exception", e);
+                        balanced = false;
                     }
                 }
             }
         }
 
         this.truncateMessageQueueNotMyTopic();
+
+        return balanced;
+    }
+
+    private boolean tryQueryAssignment(String topic) {
+        if (topicClientRebalance.containsKey(topic)) {
+            return false;
+        }
+
+        if (topicBrokerRebalance.containsKey(topic)) {
+            return true;
+        }
+
+        String strategyName = allocateMessageQueueStrategy != null ? allocateMessageQueueStrategy.getName() : null;
+
+        boolean success = false;
+        int i = 0;
+        int timeOut = 0;
+        while (i++ < TIMEOUT_CHECK_TIMES) {
+            try {
+                Set<MessageQueueAssignment> resultSet = mQClientFactory.queryAssignment(topic, consumerGroup,
+                    strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / TIMEOUT_CHECK_TIMES * i);
+                success = true;
+                break;
+            } catch (Throwable t) {
+                if (t instanceof RemotingTimeoutException) {
+                    timeOut++;
+                } else {
+                    log.error("tryQueryAssignment error.", t);
+                    break;
+                }
+            }
+        }
+
+        if (success) {
+            topicBrokerRebalance.put(topic, topic);
+            return true;
+        } else {
+            if (timeOut >= TIMEOUT_CHECK_TIMES) {
+                // if never success before and timeout exceed TIMEOUT_CHECK_TIMES, force client rebalance
+                topicClientRebalance.put(topic, topic);
+                return false;
+            } else {
+                return true;
+            }
+        }
     }
 
     public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
         return subscriptionInner;
     }
 
-    private void rebalanceByTopic(final String topic, final boolean isOrder) {
+    private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
+        boolean balanced = true;
         switch (messageModel) {
             case BROADCASTING: {
                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
@@ -243,13 +320,12 @@ public abstract class RebalanceImpl {
                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                     if (changed) {
                         this.messageQueueChanged(topic, mqSet, mqSet);
-                        log.info("messageQueueChanged {} {} {} {}",
-                            consumerGroup,
-                            topic,
-                            mqSet,
-                            mqSet);
+                        log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet);
                     }
+
+                    balanced = mqSet.equals(getWorkingMessageQueue(topic));
                 } else {
+                    this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                 }
                 break;
@@ -259,6 +335,7 @@ public abstract class RebalanceImpl {
                 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                 if (null == mqSet) {
                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                        this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
                         log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                     }
                 }
@@ -284,9 +361,8 @@ public abstract class RebalanceImpl {
                             mqAll,
                             cidAll);
                     } catch (Throwable e) {
-                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
-                            e);
-                        return;
+                        log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
+                        return false;
                     }
 
                     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
@@ -297,17 +373,89 @@ public abstract class RebalanceImpl {
                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                     if (changed) {
                         log.info(
-                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
+                            "client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                             strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                             allocateResultSet.size(), allocateResultSet);
                         this.messageQueueChanged(topic, mqSet, allocateResultSet);
                     }
+
+                    balanced = allocateResultSet.equals(getWorkingMessageQueue(topic));
                 }
                 break;
             }
             default:
                 break;
         }
+
+        return balanced;
+    }
+
+    private boolean getRebalanceResultFromBroker(final String topic, final boolean isOrder) {
+        String strategyName;
+        switch (messageModel) {
+            case BROADCASTING:
+                strategyName = null;
+                break;
+            case CLUSTERING:
+                strategyName = this.allocateMessageQueueStrategy.getName();
+                break;
+            default:
+                return true;
+        }
+        Set<MessageQueueAssignment> messageQueueAssignments;
+        try {
+            messageQueueAssignments = this.mQClientFactory.queryAssignment(topic, consumerGroup,
+                strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT);
+        } catch (Exception e) {
+            log.error("allocate message queue exception. strategy name: {}, ex: {}", strategyName, e);
+            return false;
+        }
+
+        // null means invalid result, we should skip the update logic
+        if (messageQueueAssignments == null) {
+            return false;
+        }
+        Set<MessageQueue> mqSet = new HashSet<MessageQueue>();
+        for (MessageQueueAssignment messageQueueAssignment : messageQueueAssignments) {
+            if (messageQueueAssignment.getMessageQueue() != null) {
+                mqSet.add(messageQueueAssignment.getMessageQueue());
+            }
+        }
+        Set<MessageQueue> mqAll = null;
+        if (messageModel == MessageModel.BROADCASTING) {
+            mqAll = mqSet;
+        }
+        boolean changed = this.updateMessageQueueAssignment(topic, messageQueueAssignments, isOrder);
+        if (changed) {
+            log.info("broker rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, assignmentSet={}",
+                strategyName, consumerGroup, topic, this.mQClientFactory.getClientId(), messageQueueAssignments);
+            this.messageQueueChanged(topic, mqAll, mqSet);
+        }
+
+        return mqSet.equals(getWorkingMessageQueue(topic));
+    }
+
+    private Set<MessageQueue> getWorkingMessageQueue(String topic) {
+        Set<MessageQueue> queueSet = new HashSet<MessageQueue>();
+        for (Entry<MessageQueue, ProcessQueue> entry : this.processQueueTable.entrySet()) {
+            MessageQueue mq = entry.getKey();
+            ProcessQueue pq = entry.getValue();
+
+            if (mq.getTopic().equals(topic) && !pq.isDropped()) {
+                queueSet.add(mq);
+            }
+        }
+
+        for (Entry<MessageQueue, PopProcessQueue> entry : this.popProcessQueueTable.entrySet()) {
+            MessageQueue mq = entry.getKey();
+            PopProcessQueue pq = entry.getValue();
+
+            if (mq.getTopic().equals(topic) && !pq.isDropped()) {
+                queueSet.add(mq);
+            }
+        }
+
+        return queueSet;
     }
 
     private void truncateMessageQueueNotMyTopic() {
@@ -323,12 +471,40 @@ public abstract class RebalanceImpl {
                 }
             }
         }
+
+        for (MessageQueue mq : this.popProcessQueueTable.keySet()) {
+            if (!subTable.containsKey(mq.getTopic())) {
+
+                PopProcessQueue pq = this.popProcessQueueTable.remove(mq);
+                if (pq != null) {
+                    pq.setDropped(true);
+                    log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary pop mq, {}", consumerGroup, mq);
+                }
+            }
+        }
+
+        Iterator<Map.Entry<String, String>> clientIter = topicClientRebalance.entrySet().iterator();
+        while (clientIter.hasNext()) {
+            if (!subTable.containsKey(clientIter.next().getKey())) {
+                clientIter.remove();
+            }
+        }
+
+        Iterator<Map.Entry<String, String>> brokerIter = topicBrokerRebalance.entrySet().iterator();
+        while (brokerIter.hasNext()) {
+            if (!subTable.containsKey(brokerIter.next().getKey())) {
+                brokerIter.remove();
+            }
+        }
     }
 
     private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
         final boolean isOrder) {
         boolean changed = false;
 
+        Map<MessageQueue, MessageQueue> upgradeMqTable = new HashMap<MessageQueue, MessageQueue>();
+        // drop process queues no longer belong me
+        HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
         Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
         while (it.hasNext()) {
             Entry<MessageQueue, ProcessQueue> next = it.next();
@@ -338,41 +514,42 @@ public abstract class RebalanceImpl {
             if (mq.getTopic().equals(topic)) {
                 if (!mqSet.contains(mq)) {
                     pq.setDropped(true);
-                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
-                        it.remove();
-                        changed = true;
-                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
-                    }
-                } else if (pq.isPullExpired()) {
-                    switch (this.consumeType()) {
-                        case CONSUME_ACTIVELY:
-                            break;
-                        case CONSUME_PASSIVELY:
-                            pq.setDropped(true);
-                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
-                                it.remove();
-                                changed = true;
-                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
-                                    consumerGroup, mq);
-                            }
-                            break;
-                        default:
-                            break;
-                    }
+                    removeQueueMap.put(mq, pq);
+                } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
+                    pq.setDropped(true);
+                    removeQueueMap.put(mq, pq);
+                    log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+                        consumerGroup, mq);
                 }
             }
         }
 
+        // remove message queues no longer belong me
+        for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
+            MessageQueue mq = entry.getKey();
+            ProcessQueue pq = entry.getValue();
+
+            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+                this.processQueueTable.remove(mq);
+                changed = true;
+                log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
+            }
+        }
+
+        // add new message queue
+        boolean allMQLocked = true;
         List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
         for (MessageQueue mq : mqSet) {
             if (!this.processQueueTable.containsKey(mq)) {
                 if (isOrder && !this.lock(mq)) {
                     log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
+                    allMQLocked = false;
                     continue;
                 }
 
                 this.removeDirtyOffset(mq);
-                ProcessQueue pq = new ProcessQueue();
+                ProcessQueue pq = createProcessQueue(topic);
+                pq.setLocked(true);
                 long nextOffset = this.computePullFromWhere(mq);
                 if (nextOffset >= 0) {
                     ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
@@ -392,9 +569,194 @@ public abstract class RebalanceImpl {
                     log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                 }
             }
+
+        }
+
+        if (!allMQLocked) {
+            mQClientFactory.rebalanceLater(500);
+        }
+
+        this.dispatchPullRequest(pullRequestList, 500);
+
+        return changed;
+    }
+
+    private boolean updateMessageQueueAssignment(final String topic, final Set<MessageQueueAssignment> assignments,
+        final boolean isOrder) {
+        boolean changed = false;
+
+        Map<MessageQueue, MessageQueueAssignment> mq2PushAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
+        Map<MessageQueue, MessageQueueAssignment> mq2PopAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
+        for (MessageQueueAssignment assignment : assignments) {
+            MessageQueue messageQueue = assignment.getMessageQueue();
+            if (messageQueue == null) {
+                continue;
+            }
+            if (MessageRequestMode.POP == assignment.getMode()) {
+                mq2PopAssignment.put(messageQueue, assignment);
+            } else {
+                mq2PushAssignment.put(messageQueue, assignment);
+            }
+        }
+
+        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            if (mq2PopAssignment.isEmpty() && !mq2PushAssignment.isEmpty()) {
+                //pop switch to push
+                //subscribe pop retry topic
+                try {
+                    final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
+                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
+                    getSubscriptionInner().put(retryTopic, subscriptionData);
+                } catch (Exception ignored) {
+                }
+
+            } else if (!mq2PopAssignment.isEmpty() && mq2PushAssignment.isEmpty()) {
+                //push switch to pop
+                //unsubscribe pop retry topic
+                try {
+                    final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
+                    getSubscriptionInner().remove(retryTopic);
+                } catch (Exception ignored) {
+                }
+
+            }
+        }
+
+        {
+            // drop process queues no longer belong me
+            HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
+            Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<MessageQueue, ProcessQueue> next = it.next();
+                MessageQueue mq = next.getKey();
+                ProcessQueue pq = next.getValue();
+
+                if (mq.getTopic().equals(topic)) {
+                    if (!mq2PushAssignment.containsKey(mq)) {
+                        pq.setDropped(true);
+                        removeQueueMap.put(mq, pq);
+                    } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
+                        pq.setDropped(true);
+                        removeQueueMap.put(mq, pq);
+                        log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+                            consumerGroup, mq);
+                    }
+                }
+            }
+            // remove message queues no longer belong me
+            for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
+                MessageQueue mq = entry.getKey();
+                ProcessQueue pq = entry.getValue();
+
+                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+                    this.processQueueTable.remove(mq);
+                    changed = true;
+                    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
+                }
+            }
         }
 
-        this.dispatchPullRequest(pullRequestList);
+        {
+            HashMap<MessageQueue, PopProcessQueue> removeQueueMap = new HashMap<MessageQueue, PopProcessQueue>(this.popProcessQueueTable.size());
+            Iterator<Entry<MessageQueue, PopProcessQueue>> it = this.popProcessQueueTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<MessageQueue, PopProcessQueue> next = it.next();
+                MessageQueue mq = next.getKey();
+                PopProcessQueue pq = next.getValue();
+
+                if (mq.getTopic().equals(topic)) {
+                    if (!mq2PopAssignment.containsKey(mq)) {
+                        //the queue is no longer your assignment
+                        pq.setDropped(true);
+                        removeQueueMap.put(mq, pq);
+                    } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
+                        pq.setDropped(true);
+                        removeQueueMap.put(mq, pq);
+                        log.error("[BUG]doRebalance, {}, try remove unnecessary pop mq, {}, because pop is pause, so try to fixed it",
+                            consumerGroup, mq);
+                    }
+                }
+            }
+            // remove message queues no longer belong me
+            for (Entry<MessageQueue, PopProcessQueue> entry : removeQueueMap.entrySet()) {
+                MessageQueue mq = entry.getKey();
+                PopProcessQueue pq = entry.getValue();
+
+                if (this.removeUnnecessaryPopMessageQueue(mq, pq)) {
+                    this.popProcessQueueTable.remove(mq);
+                    changed = true;
+                    log.info("doRebalance, {}, remove unnecessary pop mq, {}", consumerGroup, mq);
+                }
+            }
+        }
+
+        {
+            // add new message queue
+            boolean allMQLocked = true;
+            List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
+            for (MessageQueue mq : mq2PushAssignment.keySet()) {
+                if (!this.processQueueTable.containsKey(mq)) {
+                    if (isOrder && !this.lock(mq)) {
+                        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
+                        allMQLocked = false;
+                        continue;
+                    }
+
+                    this.removeDirtyOffset(mq);
+                    ProcessQueue pq = createProcessQueue();
+                    pq.setLocked(true);
+                    long nextOffset = this.computePullFromWhere(mq);
+                    if (nextOffset >= 0) {
+                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
+                        if (pre != null) {
+                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
+                        } else {
+                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
+                            PullRequest pullRequest = new PullRequest();
+                            pullRequest.setConsumerGroup(consumerGroup);
+                            pullRequest.setNextOffset(nextOffset);
+                            pullRequest.setMessageQueue(mq);
+                            pullRequest.setProcessQueue(pq);
+                            pullRequestList.add(pullRequest);
+                            changed = true;
+                        }
+                    } else {
+                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
+                    }
+                }
+            }
+
+            if (!allMQLocked) {
+                mQClientFactory.rebalanceLater(500);
+            }
+            this.dispatchPullRequest(pullRequestList, 500);
+        }
+
+        {
+            // add new message queue
+            List<PopRequest> popRequestList = new ArrayList<PopRequest>();
+            for (MessageQueue mq : mq2PopAssignment.keySet()) {
+                if (!this.popProcessQueueTable.containsKey(mq)) {
+                    PopProcessQueue pq = createPopProcessQueue();
+                    PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(mq, pq);
+                    if (pre != null) {
+                        log.info("doRebalance, {}, mq pop already exists, {}", consumerGroup, mq);
+                    } else {
+                        log.info("doRebalance, {}, add a new pop mq, {}", consumerGroup, mq);
+                        PopRequest popRequest = new PopRequest();
+                        popRequest.setTopic(topic);
+                        popRequest.setConsumerGroup(consumerGroup);
+                        popRequest.setMessageQueue(mq);
+                        popRequest.setPopProcessQueue(pq);
+                        popRequest.setInitMode(getConsumeInitMode());
+                        popRequestList.add(popRequest);
+                        changed = true;
+                    }
+                }
+            }
+
+            this.dispatchPopPullRequest(popRequestList, 500);
+        }
 
         return changed;
     }
@@ -404,13 +766,27 @@ public abstract class RebalanceImpl {
 
     public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
 
+    public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final PopProcessQueue pq) {
+        return true;
+    }
+
     public abstract ConsumeType consumeType();
 
     public abstract void removeDirtyOffset(final MessageQueue mq);
 
     public abstract long computePullFromWhere(final MessageQueue mq);
 
-    public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
+    public abstract int getConsumeInitMode();
+
+    public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay);
+
+    public abstract void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay);
+
+    public abstract ProcessQueue createProcessQueue();
+
+    public abstract PopProcessQueue createPopProcessQueue();
+
+    public abstract ProcessQueue createProcessQueue(String topicName);
 
     public void removeProcessQueue(final MessageQueue mq) {
         ProcessQueue prev = this.processQueueTable.remove(mq);
@@ -426,6 +802,10 @@ public abstract class RebalanceImpl {
         return processQueueTable;
     }
 
+    public ConcurrentMap<MessageQueue, PopProcessQueue> getPopProcessQueueTable() {
+        return popProcessQueueTable;
+    }
+
     public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
         return topicSubscribeInfoTable;
     }
@@ -470,5 +850,13 @@ public abstract class RebalanceImpl {
         }
 
         this.processQueueTable.clear();
+
+        Iterator<Entry<MessageQueue, PopProcessQueue>> popIt = this.popProcessQueueTable.entrySet().iterator();
+        while (popIt.hasNext()) {
+            Entry<MessageQueue, PopProcessQueue> next = popIt.next();
+            next.getValue().setDropped(true);
+        }
+        this.popProcessQueueTable.clear();
     }
+
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 9d1ea74..c8f5cf1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -139,7 +139,30 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
     }
 
     @Override
-    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+    public int getConsumeInitMode() {
+        throw new UnsupportedOperationException("no initMode for Pull");
     }
 
+    @Override
+    public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
+    }
+
+    @Override
+    public void dispatchPopPullRequest(List<PopRequest> pullRequestList, long delay) {
+
+    }
+
+    @Override
+    public ProcessQueue createProcessQueue() {
+        return new ProcessQueue();
+    }
+
+    @Override
+    public PopProcessQueue createPopProcessQueue() {
+        return null;
+    }
+
+    public ProcessQueue createProcessQueue(String topicName) {
+        return createProcessQueue();
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index 9dd408c..a68fb04 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -74,6 +74,30 @@ public class RebalancePullImpl extends RebalanceImpl {
     }
 
     @Override
-    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+    public int getConsumeInitMode() {
+        throw new UnsupportedOperationException("no initMode for Pull");
     }
+
+    @Override
+    public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
+    }
+
+    @Override
+    public void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay) {
+    }
+
+    @Override
+    public ProcessQueue createProcessQueue() {
+        return new ProcessQueue();
+    }
+
+    @Override
+    public PopProcessQueue createPopProcessQueue() {
+        return null;
+    }
+
+    public ProcessQueue createProcessQueue(String topicName) {
+        return createProcessQueue();
+    }
+
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 9582391..88c7c1e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -36,6 +37,7 @@ public class RebalancePushImpl extends RebalanceImpl {
     private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
     private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
 
+
     public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
         this(null, null, null, null, defaultMQPushConsumerImpl);
     }
@@ -110,6 +112,16 @@ public class RebalancePushImpl extends RebalanceImpl {
         return true;
     }
 
+    @Override
+    public boolean clientRebalance(String topic) {
+        // POPTODO order pop consume not implement yet
+        return defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() || defaultMQPushConsumerImpl.isConsumeOrderly();
+    }
+
+    public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final PopProcessQueue pq) {
+        return true;
+    }
+
     private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
 
         if (pq.hasTempMessage()) {
@@ -212,10 +224,48 @@ public class RebalancePushImpl extends RebalanceImpl {
     }
 
     @Override
-    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+    public int getConsumeInitMode() {
+        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
+        if (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET == consumeFromWhere) {
+            return ConsumeInitMode.MIN;
+        } else {
+            return ConsumeInitMode.MAX;
+        }
+    }
+
+    @Override
+    public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
         for (PullRequest pullRequest : pullRequestList) {
-            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
-            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
+            if (delay <= 0) {
+                this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
+            } else {
+                this.defaultMQPushConsumerImpl.executePullRequestLater(pullRequest, delay);
+            }
         }
     }
+
+    @Override
+    public void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay) {
+        for (PopRequest pullRequest : pullRequestList) {
+            if (delay <= 0) {
+                this.defaultMQPushConsumerImpl.executePopPullRequestImmediately(pullRequest);
+            } else {
+                this.defaultMQPushConsumerImpl.executePopPullRequestLater(pullRequest, delay);
+            }
+        }
+    }
+
+    @Override
+    public ProcessQueue createProcessQueue() {
+        return new ProcessQueue();
+    }
+
+    @Override public ProcessQueue createProcessQueue(String topicName) {
+        return createProcessQueue();
+    }
+
+    @Override
+    public PopProcessQueue createPopProcessQueue() {
+        return new PopProcessQueue();
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 81e6d84..619cfc2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -64,26 +63,29 @@ import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+
 public class MQClientInstance {
     private final static long LOCK_TIMEOUT_MILLIS = 3000;
     private final InternalLogger log = ClientLogger.getLog();
@@ -964,6 +966,19 @@ public class MQClientInstance {
         this.adminExtTable.remove(group);
     }
 
+    public void rebalanceLater(long delayMillis) {
+        if (delayMillis <= 0) {
+            this.rebalanceService.wakeup();
+        } else {
+            this.scheduledExecutorService.schedule(new Runnable() {
+                @Override
+                public void run() {
+                    MQClientInstance.this.rebalanceService.wakeup();
+                }
+            }, delayMillis, TimeUnit.MILLISECONDS);
+        }
+    }
+
     public void rebalanceImmediately() {
         this.rebalanceService.wakeup();
     }
@@ -1091,6 +1106,22 @@ public class MQClientInstance {
         return null;
     }
 
+    public Set<MessageQueueAssignment> queryAssignment(final String topic, final String consumerGroup, final String strategyName, final MessageModel messageModel, int timeout)
+        throws RemotingException, InterruptedException, MQBrokerException {
+        String brokerAddr = this.findBrokerAddrByTopic(topic);
+        if (null == brokerAddr) {
+            this.updateTopicRouteInfoFromNameServer(topic);
+            brokerAddr = this.findBrokerAddrByTopic(topic);
+        }
+
+        if (null != brokerAddr) {
+            return this.mQClientAPIImpl.queryAssignment(brokerAddr, topic, consumerGroup, clientId,  strategyName,
+                messageModel, timeout);
+        }
+
+        return null;
+    }
+
     public String findBrokerAddrByTopic(final String topic) {
         TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
         if (topicRouteData != null) {
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 5b7e635..eee8726 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -20,8 +20,10 @@ package org.apache.rocketmq.client.consumer;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
-import java.util.*;
-
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
@@ -52,11 +54,8 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -70,9 +69,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultLitePullConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
 public class DefaultLitePullConsumerTest {
     @Spy
     private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@@ -94,13 +91,16 @@ public class DefaultLitePullConsumerTest {
 
     @Before
     public void init() throws Exception {
-        PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
         Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
         field.setAccessible(true);
         RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
         field = RebalanceService.class.getDeclaredField("waitInterval");
         field.setAccessible(true);
         field.set(rebalanceService, 100);
+
+        field = DefaultLitePullConsumerImpl.class.getDeclaredField("doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged");
+        field.setAccessible(true);
+        field.set(null, true);
     }
 
     @Test
@@ -183,6 +183,7 @@ public class DefaultLitePullConsumerTest {
         when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
         MessageQueue messageQueue = createMessageQueue();
         litePullConsumer.assign(Collections.singletonList(messageQueue));
+        litePullConsumer.pause(Collections.singletonList(messageQueue));
         long offset = litePullConsumer.committed(messageQueue);
         litePullConsumer.seek(messageQueue, offset);
         Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
@@ -199,6 +200,7 @@ public class DefaultLitePullConsumerTest {
         when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
         MessageQueue messageQueue = createMessageQueue();
         litePullConsumer.assign(Collections.singletonList(messageQueue));
+        litePullConsumer.pause(Collections.singletonList(messageQueue));
         litePullConsumer.seekToBegin(messageQueue);
         Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
         field.setAccessible(true);
@@ -214,6 +216,7 @@ public class DefaultLitePullConsumerTest {
         when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
         MessageQueue messageQueue = createMessageQueue();
         litePullConsumer.assign(Collections.singletonList(messageQueue));
+        litePullConsumer.pause(Collections.singletonList(messageQueue));
         litePullConsumer.seekToEnd(messageQueue);
         Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
         field.setAccessible(true);
@@ -229,6 +232,7 @@ public class DefaultLitePullConsumerTest {
         when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
         MessageQueue messageQueue = createMessageQueue();
         litePullConsumer.assign(Collections.singletonList(messageQueue));
+        litePullConsumer.pause(Collections.singletonList(messageQueue));
         try {
             litePullConsumer.seek(messageQueue, -1);
             failBecauseExceptionWasNotThrown(MQClientException.class);
@@ -517,9 +521,6 @@ public class DefaultLitePullConsumerTest {
     public void testConsumerAfterShutdown() throws Exception {
         DefaultLitePullConsumer defaultLitePullConsumer = createSubscribeLitePullConsumer();
 
-        DefaultLitePullConsumer mockConsumer = spy(defaultLitePullConsumer);
-        when(mockConsumer.poll(anyLong())).thenReturn(new ArrayList<>());
-
         new AsyncConsumer().executeAsync(defaultLitePullConsumer);
 
         Thread.sleep(100);
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 924ee12..93a4ae2 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -24,8 +24,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -60,11 +60,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -77,20 +74,20 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
 public class DefaultMQPushConsumerTest {
     private String consumerGroup;
     private String topic = "FooBar";
     private String brokerName = "BrokerA";
     private MQClientInstance mQClientFactory;
+    private final byte[] msgBody = Long.toString(System.currentTimeMillis()).getBytes();
 
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
     private PullAPIWrapper pullAPIWrapper;
     private RebalancePushImpl rebalancePushImpl;
     private DefaultMQPushConsumer pushConsumer;
+    private AtomicLong queueOffset = new AtomicLong(1024);;
 
     @Before
     public void init() throws Exception {
@@ -108,12 +105,15 @@ public class DefaultMQPushConsumerTest {
         });
 
         DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
-        PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
         rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
         Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
         field.setAccessible(true);
         field.set(pushConsumerImpl, rebalancePushImpl);
 
+        field = DefaultMQPushConsumerImpl.class.getDeclaredField("doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged");
+        field.setAccessible(true);
+        field.set(null, true);
+
         pushConsumer.subscribe(topic, "*");
         pushConsumer.start();
 
@@ -143,8 +143,9 @@ public class DefaultMQPushConsumerTest {
                     MessageClientExt messageClientExt = new MessageClientExt();
                     messageClientExt.setTopic(topic);
                     messageClientExt.setQueueId(0);
-                    messageClientExt.setMsgId("123");
-                    messageClientExt.setBody(new byte[] {'a'});
+                    messageClientExt.setQueueOffset(queueOffset.getAndIncrement());
+                    messageClientExt.setMsgId("1024");
+                    messageClientExt.setBody(msgBody);
                     messageClientExt.setOffsetMsgId("234");
                     messageClientExt.setBornHost(new InetSocketAddress(8080));
                     messageClientExt.setStoreHost(new InetSocketAddress(8080));
@@ -190,10 +191,10 @@ public class DefaultMQPushConsumerTest {
         pullMessageService.executePullRequestImmediately(createPullRequest());
         countDownLatch.await();
         assertThat(messageExts[0].getTopic()).isEqualTo(topic);
-        assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+        assertThat(messageExts[0].getBody()).isEqualTo(msgBody);
     }
 
-    @Test
+    @Test(timeout = 20000)
     public void testPullMessage_SuccessWithOrderlyService() throws Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         final MessageExt[] messageExts = new MessageExt[1];
@@ -213,9 +214,9 @@ public class DefaultMQPushConsumerTest {
         PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
         pullMessageService.executePullRequestLater(createPullRequest(), 100);
 
-        countDownLatch.await(10, TimeUnit.SECONDS);
+        countDownLatch.await();
         assertThat(messageExts[0].getTopic()).isEqualTo(topic);
-        assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+        assertThat(messageExts[0].getBody()).isEqualTo(msgBody);
     }
 
     @Test
@@ -259,7 +260,7 @@ public class DefaultMQPushConsumerTest {
         }
     }
 
-    @Test
+    @Test(timeout = 20000)
     public void testGracefulShutdown() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         pushConsumer.setAwaitTerminationMillisWhenShutdown(2000);
@@ -268,6 +269,7 @@ public class DefaultMQPushConsumerTest {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                             ConsumeConcurrentlyContext context) {
+                assertThat(msgs.get(0).getBody()).isEqualTo(msgBody);
                 countDownLatch.countDown();
                 try {
                     Thread.sleep(1000);
@@ -302,7 +304,7 @@ public class DefaultMQPushConsumerTest {
     private PullRequest createPullRequest() {
         PullRequest pullRequest = new PullRequest();
         pullRequest.setConsumerGroup(consumerGroup);
-        pullRequest.setNextOffset(1024);
+        pullRequest.setNextOffset(queueOffset.get());
 
         MessageQueue messageQueue = new MessageQueue();
         messageQueue.setBrokerName(brokerName);
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 3f00d9e..c91d55a 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -17,7 +17,18 @@
 package org.apache.rocketmq.client.impl;
 
 import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.hook.SendMessageContext;
@@ -26,12 +37,42 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -39,6 +80,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -67,9 +109,11 @@ public class MQClientAPIImplTest {
 
     private String brokerAddr = "127.0.0.1";
     private String brokerName = "DefaultBroker";
+    private String clusterName = "DefaultCluster";
     private static String group = "FooBarGroup";
     private static String topic = "FooBar";
     private Message msg = new Message("FooBar", new byte[] {});
+    private static String clientId = "127.0.0.2@UnitTest";
 
     @Before
     public void init() throws Exception {
@@ -113,7 +157,7 @@ public class MQClientAPIImplTest {
             @Override
             public Object answer(InvocationOnMock mock) throws Throwable {
                 RemotingCommand request = mock.getArgument(1);
-                return createSuccessResponse(request);
+                return createSendMessageSuccessResponse(request);
             }
         }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
 
@@ -166,7 +210,7 @@ public class MQClientAPIImplTest {
                 InvokeCallback callback = mock.getArgument(3);
                 RemotingCommand request = mock.getArgument(1);
                 ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
-                responseFuture.setResponseCommand(createSuccessResponse(request));
+                responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
                 callback.operationComplete(responseFuture);
                 return null;
             }
@@ -332,7 +376,7 @@ public class MQClientAPIImplTest {
                 InvokeCallback callback = mock.getArgument(3);
                 RemotingCommand request = mock.getArgument(1);
                 ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
-                responseFuture.setResponseCommand(createSuccessResponse(request));
+                responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
                 callback.operationComplete(responseFuture);
                 return null;
             }
@@ -356,6 +400,444 @@ public class MQClientAPIImplTest {
             }, null, null, 0, sendMessageContext, defaultMQProducerImpl);
     }
 
+    @Test
+    public void testQueryAssignment_Success() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                RemotingCommand response = RemotingCommand.createResponseCommand(null);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                QueryAssignmentResponseBody b = new QueryAssignmentResponseBody();
+                b.setMessageQueueAssignments(Collections.singleton(new MessageQueueAssignment()));
+                response.setBody(b.encode());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+        Set<MessageQueueAssignment> assignments = mqClientAPI.queryAssignment(brokerAddr, topic, group, clientId, null, MessageModel.CLUSTERING, 10 * 1000);
+        assertThat(assignments).size().isEqualTo(1);
+    }
+
+    @Test
+    public void testPopMessageAsync_Success() throws Exception {
+        final long popTime = System.currentTimeMillis();
+        final int invisibleTime = 10 * 1000;
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock mock) throws Throwable {
+                InvokeCallback callback = mock.getArgument(3);
+                RemotingCommand request = mock.getArgument(1);
+                ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+                RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+
+                PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
+                responseHeader.setInvisibleTime(invisibleTime);
+                responseHeader.setPopTime(popTime);
+                responseHeader.setReviveQid(0);
+                responseHeader.setRestNum(1);
+                StringBuilder startOffsetInfo = new StringBuilder(64);
+                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0, 0L);
+                responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+                StringBuilder msgOffsetInfo = new StringBuilder(64);
+                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0, Collections.singletonList(0L));
+                responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+                response.setRemark("FOUND");
+                response.makeCustomHeaderToNet();
+
+                MessageExt message = new MessageExt();
+                message.setQueueId(0);
+                message.setFlag(12);
+                message.setQueueOffset(0L);
+                message.setCommitLogOffset(100L);
+                message.setSysFlag(0);
+                message.setBornTimestamp(System.currentTimeMillis());
+                message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+                message.setStoreTimestamp(System.currentTimeMillis());
+                message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+                message.setBody("body".getBytes());
+                message.setTopic(topic);
+                message.putUserProperty("key", "value");
+                response.setBody(MessageDecoder.encode(message, false));
+                responseFuture.setResponseCommand(response);
+                callback.operationComplete(responseFuture);
+                return null;
+            }
+        }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        final CountDownLatch done = new CountDownLatch(1);
+        mqClientAPI.popMessageAsync(brokerName, brokerAddr, new PopMessageRequestHeader(), 10 * 1000, new PopCallback() {
+            @Override public void onSuccess(PopResult popResult) {
+                assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
+                assertThat(popResult.getRestNum()).isEqualTo(1);
+                assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime);
+                assertThat(popResult.getPopTime()).isEqualTo(popTime);
+                assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
+                done.countDown();
+            }
+
+            @Override public void onException(Throwable e) {
+                Assertions.fail("want no exception but got one", e);
+                done.countDown();
+            }
+        });
+        done.await();
+    }
+
+    @Test
+    public void testAckMessageAsync_Success() throws Exception {
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock mock) throws Throwable {
+                InvokeCallback callback = mock.getArgument(3);
+                RemotingCommand request = mock.getArgument(1);
+                ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+                RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
+                response.setOpaque(request.getOpaque());
+                response.setCode(ResponseCode.SUCCESS);
+                responseFuture.setResponseCommand(response);
+                callback.operationComplete(responseFuture);
+                return null;
+            }
+        }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+        final CountDownLatch done = new CountDownLatch(1);
+        mqClientAPI.ackMessageAsync(brokerAddr, 10 * 1000, new AckCallback() {
+            @Override public void onSuccess(AckResult ackResult) {
+                assertThat(ackResult.getStatus()).isEqualTo(AckStatus.OK);
+                done.countDown();
+            }
+
+            @Override public void onException(Throwable e) {
+                Assertions.fail("want no exception but got one", e);
+                done.countDown();
+            }
+        }, new AckMessageRequestHeader());
+        done.await();
+    }
+
+    @Test
+    public void testChangeInvisibleTimeAsync_Success() throws Exception {
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock mock) throws Throwable {
+                InvokeCallback callback = mock.getArgument(3);
+                RemotingCommand request = mock.getArgument(1);
+                ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+                RemotingCommand response = RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
+                response.setOpaque(request.getOpaque());
+                response.setCode(ResponseCode.SUCCESS);
+                ChangeInvisibleTimeResponseHeader responseHeader = (ChangeInvisibleTimeResponseHeader) response.readCustomHeader();
+                responseHeader.setPopTime(System.currentTimeMillis());
+                responseHeader.setInvisibleTime(10 * 1000L);
+                responseFuture.setResponseCommand(response);
+                callback.operationComplete(responseFuture);
+                return null;
+            }
+        }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+        final CountDownLatch done = new CountDownLatch(1);
+        ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(0);
+        requestHeader.setOffset(0L);
+        requestHeader.setInvisibleTime(10 * 1000L);
+        mqClientAPI.changeInvisibleTimeAsync(brokerName, brokerAddr, requestHeader, 10 * 1000, new AckCallback() {
+            @Override public void onSuccess(AckResult ackResult) {
+                assertThat(ackResult.getStatus()).isEqualTo(AckStatus.OK);
+                done.countDown();
+            }
+
+            @Override public void onException(Throwable e) {
+                Assertions.fail("want no exception but got one", e);
+                done.countDown();
+            }
+        });
+        done.await();
+    }
+
+    @Test
+    public void testSetMessageRequestMode_Success() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                RemotingCommand response = RemotingCommand.createResponseCommand(null);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        mqClientAPI.setMessageRequestMode(brokerAddr, topic, group, MessageRequestMode.POP, 8, 10 * 1000L);
+    }
+
+    @Test
+    public void testCreateSubscriptionGroup_Success() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                RemotingCommand response = RemotingCommand.createResponseCommand(null);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        mqClientAPI.createSubscriptionGroup(brokerAddr, new SubscriptionGroupConfig(), 10000);
+    }
+
+    @Test
+    public void testCreateTopic_Success() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                RemotingCommand response = RemotingCommand.createResponseCommand(null);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        mqClientAPI.createTopic(brokerAddr, topic, new TopicConfig(), 10000);
+    }
+
+    @Test
+    public void testGetBrokerClusterAclInfo() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerAclConfigResponseHeader.class);
+                GetBrokerAclConfigResponseHeader responseHeader = (GetBrokerAclConfigResponseHeader) response.readCustomHeader();
+                responseHeader.setVersion(new DataVersion().toJson());
+                responseHeader.setBrokerAddr(brokerAddr);
+                responseHeader.setBrokerName(brokerName);
+                responseHeader.setClusterName(clusterName);
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        ClusterAclVersionInfo info = mqClientAPI.getBrokerClusterAclInfo(brokerAddr, 10000);
+        assertThat(info.getAclConfigDataVersion().getTimestamp()).isGreaterThan(0);
+    }
+
+    @Test
+    public void testGetBrokerClusterConfig() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class);
+                GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody();
+                body.setGlobalWhiteAddrs(Collections.singletonList("1.1.1.1"));
+                body.setPlainAccessConfigs(Collections.singletonList(new PlainAccessConfig()));
+                response.setBody(body.encode());
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        AclConfig aclConfig = mqClientAPI.getBrokerClusterConfig(brokerAddr, 10000);
+        assertThat(aclConfig.getPlainAccessConfigs()).size().isGreaterThan(0);
+        assertThat(aclConfig.getGlobalWhiteAddrs()).size().isGreaterThan(0);
+    }
+
+    @Test
+    public void testViewMessage() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) throws Exception {
+                RemotingCommand request = mock.getArgument(1);
+
+                RemotingCommand response = RemotingCommand.createResponseCommand(null);
+                MessageExt message = new MessageExt();
+                message.setQueueId(0);
+                message.setFlag(12);
+                message.setQueueOffset(0L);
+                message.setCommitLogOffset(100L);
+                message.setSysFlag(0);
+                message.setBornTimestamp(System.currentTimeMillis());
+                message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+                message.setStoreTimestamp(System.currentTimeMillis());
+                message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+                message.setBody("body".getBytes());
+                message.setTopic(topic);
+                message.putUserProperty("key", "value");
+                response.setBody(MessageDecoder.encode(message, false));
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        MessageExt messageExt = mqClientAPI.viewMessage(brokerAddr, 100L, 10000);
+        assertThat(messageExt.getTopic()).isEqualTo(topic);
+    }
+
+    @Test
+    public void testSearchOffset() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+                final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
+                responseHeader.setOffset(100L);
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        long offset = mqClientAPI.searchOffset(brokerAddr, topic, 0, System.currentTimeMillis() - 1000, 10000);
+        assertThat(offset).isEqualTo(100L);
+    }
+
+    @Test
+    public void testGetMaxOffset() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+                final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
+                responseHeader.setOffset(100L);
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        long offset = mqClientAPI.getMaxOffset(brokerAddr, topic, 0, 10000);
+        assertThat(offset).isEqualTo(100L);
+    }
+
+    @Test
+    public void testGetMinOffset() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
+                final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
+                responseHeader.setOffset(100L);
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        long offset = mqClientAPI.getMinOffset(brokerAddr, topic, 0, 10000);
+        assertThat(offset).isEqualTo(100L);
+    }
+
+    @Test
+    public void testGetEarliestMsgStoretime() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
+                final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
+                responseHeader.setTimestamp(100L);
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        long t = mqClientAPI.getEarliestMsgStoretime(brokerAddr, topic, 0, 10000);
+        assertThat(t).isEqualTo(100L);
+    }
+
+    @Test
+    public void testQueryConsumerOffset() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                final RemotingCommand response =
+                    RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+                final QueryConsumerOffsetResponseHeader responseHeader =
+                    (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+                responseHeader.setOffset(100L);
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        long t = mqClientAPI.queryConsumerOffset(brokerAddr, new QueryConsumerOffsetRequestHeader(), 1000);
+        assertThat(t).isEqualTo(100L);
+    }
+
+    @Test
+    public void testUpdateConsumerOffset() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                final RemotingCommand response =
+                    RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        mqClientAPI.updateConsumerOffset(brokerAddr, new UpdateConsumerOffsetRequestHeader(), 1000);
+    }
+
+    @Test
+    public void testGetConsumerIdListByGroup() throws Exception {
+        doAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock mock) {
+                RemotingCommand request = mock.getArgument(1);
+
+                final RemotingCommand response =
+                    RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+                GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
+                body.setConsumerIdList(Collections.singletonList("consumer1"));
+                response.setBody(body.encode());
+                response.makeCustomHeaderToNet();
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+        List<String> consumerIdList = mqClientAPI.getConsumerIdListByGroup(brokerAddr, group, 10000);
+        assertThat(consumerIdList).size().isGreaterThan(0);
+    }
+
     private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
         response.setCode(ResponseCode.SUCCESS);
@@ -363,7 +845,7 @@ public class MQClientAPIImplTest {
         return response;
     }
 
-    private RemotingCommand createSuccessResponse(RemotingCommand request) {
+    private RemotingCommand createSendMessageSuccessResponse(RemotingCommand request) {
         RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
         response.setCode(ResponseCode.SUCCESS);
         response.setOpaque(request.getOpaque());
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
index d4f5812..702f7b1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
@@ -18,21 +18,66 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.List;
+import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageContext;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
 public class DefaultMQPushConsumerImplTest {
+    @Mock
+    private DefaultMQPushConsumer defaultMQPushConsumer;
 
     @Rule
     public ExpectedException thrown = ExpectedException.none();
 
+    @Before
+    public void setUp() throws Exception {
+        when(defaultMQPushConsumer.getConsumerGroup()).thenReturn("test_group");
+        when(defaultMQPushConsumer.getMessageModel()).thenReturn(MessageModel.CLUSTERING);
+        when(defaultMQPushConsumer.getConsumeFromWhere()).thenReturn(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        when(defaultMQPushConsumer.getConsumeTimestamp()).thenReturn(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)));
+        when(defaultMQPushConsumer.getAllocateMessageQueueStrategy()).thenReturn(new AllocateMessageQueueAveragely());
+        when(defaultMQPushConsumer.getConsumeThreadMin()).thenReturn(20);
+        when(defaultMQPushConsumer.getConsumeThreadMax()).thenReturn(30);
+        when(defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()).thenReturn(2000);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(1000);
+        when(defaultMQPushConsumer.getPullThresholdForTopic()).thenReturn(-1);
+        when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(100);
+        when(defaultMQPushConsumer.getPullThresholdSizeForTopic()).thenReturn(-1);
+        when(defaultMQPushConsumer.getConsumeMessageBatchMaxSize()).thenReturn(1);
+        when(defaultMQPushConsumer.getPullBatchSize()).thenReturn(32);
+        when(defaultMQPushConsumer.getPopInvisibleTime()).thenReturn(60000L);
+        when(defaultMQPushConsumer.getPopBatchNums()).thenReturn(32);
+        when(defaultMQPushConsumer.getClientIP()).thenReturn("127.0.0.1");
+        when(defaultMQPushConsumer.getInstanceName()).thenReturn("test_instance");
+        when(defaultMQPushConsumer.buildMQClientId()).thenCallRealMethod();
+        ClientConfig clientConfig = new ClientConfig();
+        when(defaultMQPushConsumer.cloneClientConfig()).thenReturn(clientConfig);
+        when(defaultMQPushConsumer.getConsumeTimeout()).thenReturn(15L);
+    }
+
     @Test
     public void checkConfigTest() throws MQClientException {
 
@@ -58,4 +103,51 @@ public class DefaultMQPushConsumerImplTest {
         DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(consumer, null);
         defaultMQPushConsumerImpl.start();
     }
+
+    @Test
+    public void testHook() throws Exception {
+        DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, null);
+        defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageHook() {
+            @Override public String hookName() {
+                return "consumerHook";
+            }
+
+            @Override public void consumeMessageBefore(ConsumeMessageContext context) {
+                assertThat(context).isNotNull();
+            }
+
+            @Override public void consumeMessageAfter(ConsumeMessageContext context) {
+                assertThat(context).isNotNull();
+            }
+        });
+        defaultMQPushConsumerImpl.registerFilterMessageHook(new FilterMessageHook() {
+            @Override public String hookName() {
+                return "filterHook";
+            }
+
+            @Override public void filterMessage(FilterMessageContext context) {
+                assertThat(context).isNotNull();
+            }
+        });
+        defaultMQPushConsumerImpl.executeHookBefore(new ConsumeMessageContext());
+        defaultMQPushConsumerImpl.executeHookAfter(new ConsumeMessageContext());
+    }
+
+    @Test
+    public void testPush() throws Exception {
+        when(defaultMQPushConsumer.getMessageListener()).thenReturn(new MessageListenerConcurrently() {
+            @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                ConsumeConcurrentlyContext context) {
+                assertThat(msgs).size().isGreaterThan(0);
+                assertThat(context).isNotNull();
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, null);
+        try {
+            defaultMQPushConsumerImpl.start();
+        } finally {
+            defaultMQPushConsumerImpl.shutdown();
+        }
+    }
 }
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
index 796a394..b7a42e1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@ -22,23 +22,28 @@ import java.util.Set;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -51,6 +56,7 @@ public class RebalancePushImplTest {
     private OffsetStore offsetStore;
     private String consumerGroup = "CID_RebalancePushImplTest";
     private String topic = "TopicA";
+    private final String brokerName = "BrokerA";
 
     @Test
     public void testMessageQueueChanged_CountThreshold() {
@@ -88,16 +94,15 @@ public class RebalancePushImplTest {
 
         rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
 
+        try {
+            when(mqClientInstance.queryAssignment(anyString(), anyString(), anyString(), any(MessageModel.class), anyInt())).thenThrow(new RemotingTimeoutException("unsupported"));
+        } catch (RemotingException ignored) {
+        } catch (InterruptedException ignored) {
+        } catch (MQBrokerException ignored) {
+        }
         when(mqClientInstance.findConsumerIdList(anyString(), anyString())).thenReturn(Collections.singletonList(consumerGroup));
         when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
         when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
-
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(final InvocationOnMock invocation) throws Throwable {
-                return null;
-            }
-        }).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class));
     }
 
     @Test
@@ -134,8 +139,8 @@ public class RebalancePushImplTest {
         defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
         defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
         Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
-        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
-        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+        allocateResultSet.add(new MessageQueue(topic, brokerName, 0));
+        allocateResultSet.add(new MessageQueue(topic, brokerName, 1));
         doRebalanceForcibly(rebalancePush, allocateResultSet);
 
         defaultMQPushConsumer.setConsumeMessageService(new ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null));
@@ -160,4 +165,22 @@ public class RebalancePushImplTest {
         assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
         assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
     }
+
+    @Test
+    public void testDoRebalancePull() throws Exception {
+        RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
+            new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
+        rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData());
+        rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
+
+        when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
+        when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
+        doNothing().when(defaultMQPushConsumer).executePullRequestLater(any(PullRequest.class), anyLong());
+        MessageQueueAssignment queueAssignment = new MessageQueueAssignment();
+        queueAssignment.setMode(MessageRequestMode.PULL);
+        queueAssignment.setMessageQueue(new MessageQueue(topic, brokerName, 0));
+        when(mqClientInstance.queryAssignment(anyString(), anyString(), anyString(), any(MessageModel.class), anyInt())).thenReturn(Collections.singleton(queueAssignment));
+
+        assertThat(rebalancePush.doRebalance(false)).isTrue();
+    }
 }
\ No newline at end of file
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index a3457e1..95582c5 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -180,4 +180,5 @@ public class MQClientInstanceTest {
         flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
         assertThat(flag).isTrue();
     }
+
 }
\ No newline at end of file
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index e1771b9..8902396 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -48,7 +48,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.junit.After;
 import org.junit.Before;
@@ -56,9 +55,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -260,7 +257,7 @@ public class DefaultMQProducerTest {
             }
         };
 
-        List<Message> msgs = new ArrayList<>();
+        List<Message> msgs = new ArrayList<Message>();
         for (int i = 0; i < 5; i++) {
             Message message = new Message();
             message.setTopic("test");
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index 5ea517c..7b75e6b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -53,13 +53,11 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -73,7 +71,6 @@ import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -89,7 +86,6 @@ import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
 public class DefaultMQConsumerWithTraceTest {
     private String consumerGroup;
     private String consumerGroupNormal;
diff --git a/client/src/test/resources/org/powermock/extensions/configuration.properties b/client/src/test/resources/org/powermock/extensions/configuration.properties
new file mode 100644
index 0000000..6389eff
--- /dev/null
+++ b/client/src/test/resources/org/powermock/extensions/configuration.properties
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+powermock.global-ignore=javax.management.*
\ No newline at end of file