You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/22 11:37:42 UTC

[rocketmq] branch snode updated: Polish the push consumer to support 4.x

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new fbe64ff  Polish the push consumer to support 4.x
     new 509bce4  Merge pull request #864 from ShannonDing/snode
fbe64ff is described below

commit fbe64ffbe58b026d783254dcb5b4b468b2901c52
Author: ShannonDing <li...@163.com>
AuthorDate: Fri Feb 22 19:28:23 2019 +0800

    Polish the push consumer to support 4.x
---
 .../client/consumer/DefaultMQPushConsumer.java     |  22 +--
 ...onsumer.java => DefaultMQRealPushConsumer.java} |  36 ++---
 .../rocketmq/client/consumer/MQPushConsumer.java   |  12 +-
 .../client/consumer/MQRealPushConsumer.java        |  68 +++++++++
 .../ConsumeMessageConcurrentlyService.java         |   7 +-
 .../consumer/ConsumeMessageOrderlyService.java     |   7 +-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 154 ++-------------------
 ...mpl.java => DefaultMQRealPushConsumerImpl.java} |  61 ++++----
 .../client/impl/consumer/MQPushConsumerInner.java  |  63 +++++++++
 .../client/impl/consumer/ProcessQueue.java         |   4 +-
 .../client/impl/consumer/PullMessageService.java   |   7 +-
 .../client/impl/consumer/RebalancePushImpl.java    |   7 +-
 .../impl/consumer/RebalanceRealPushImpl.java       |   2 +-
 .../client/impl/factory/MQClientInstance.java      |   3 +-
 .../client/trace/AsyncTraceDispatcher.java         |   7 +-
 .../rocketmq/example/quickstart/Consumer.java      |   7 +-
 .../rocketmq/example/quickstart/Producer.java      |   3 +-
 .../rocketmq/example/simple/PushConsumer.java      |   4 +-
 18 files changed, 231 insertions(+), 243 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 91e9f88..dc08ac1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -59,7 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
  * <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
  * </p>
  */
-public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
+public class DefaultMQPushConsumer extends ClientConfig implements MQRealPushConsumer {
 
     private final InternalLogger log = ClientLogger.getLog();
 
@@ -223,7 +223,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     /**
      * Whether update subscription relationship when every pull
      */
-    private boolean postSubscriptionWhenPull = true;
+    private boolean postSubscriptionWhenPull = false;
 
     /**
      * Whether the unit of subscription group
@@ -276,24 +276,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     }
 
     /**
-     * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
-     *
-     * @param consumerGroup Consume queue.
-     * @param rpcHook RPC hook to execute before each remoting command.
-     * @param allocateMessageQueueStrategy Message queue allocating algorithm.
-     */
-    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
-        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
-        this.consumerGroup = consumerGroup;
-        if (allocateMessageQueueStrategy == null) {
-            this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
-        } else {
-            this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-        }
-        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
-    }
-
-    /**
      * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
      * customized trace topic name.
      *
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java
similarity index 95%
copy from client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java
index 91e9f88..b4ca6f7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java
@@ -29,6 +29,8 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
 import org.apache.rocketmq.client.trace.TraceDispatcher;
@@ -59,14 +61,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
  * <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
  * </p>
  */
-public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
+public class DefaultMQRealPushConsumer extends ClientConfig implements MQRealPushConsumer {
 
     private final InternalLogger log = ClientLogger.getLog();
 
     /**
      * Internal implementation. Most of the functions herein are delegated to it.
      */
-    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+    protected final transient DefaultMQRealPushConsumerImpl defaultMQPushConsumerImpl;
 
     /**
      * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
@@ -257,7 +259,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     /**
      * Default constructor.
      */
-    public DefaultMQPushConsumer() {
+    public DefaultMQRealPushConsumer() {
         this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
     }
 
@@ -268,11 +270,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      * @param rpcHook RPC hook to execute before each remoting command.
      * @param allocateMessageQueueStrategy Message queue allocating algorithm.
      */
-    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+    public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
         this.consumerGroup = consumerGroup;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
+        defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook);
     }
 
     /**
@@ -282,7 +284,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      * @param rpcHook RPC hook to execute before each remoting command.
      * @param allocateMessageQueueStrategy Message queue allocating algorithm.
      */
-    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+    public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
         this.consumerGroup = consumerGroup;
         if (allocateMessageQueueStrategy == null) {
@@ -290,7 +292,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         } else {
             this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         }
-        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
+        defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook, realPushModel);
     }
 
     /**
@@ -304,12 +306,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
      * trace topic name.
      */
-    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+    public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
         final String customizedTraceTopic) {
         this.consumerGroup = consumerGroup;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
+        defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook);
         if (enableMsgTrace) {
             try {
                 AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
@@ -328,7 +330,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      *
      * @param rpcHook RPC hook to execute before each remoting command.
      */
-    public DefaultMQPushConsumer(RPCHook rpcHook) {
+    public DefaultMQRealPushConsumer(RPCHook rpcHook) {
         this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
     }
 
@@ -338,7 +340,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      * @param consumerGroup Consumer group.
      * @param enableMsgTrace Switch flag instance for message trace.
      */
-    public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
+    public DefaultMQRealPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
         this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
     }
 
@@ -350,7 +352,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
      * trace topic name.
      */
-    public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
+    public DefaultMQRealPushConsumer(final String consumerGroup, boolean enableMsgTrace,
         final String customizedTraceTopic) {
         this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
     }
@@ -360,7 +362,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      *
      * @param consumerGroup Consumer group.
      */
-    public DefaultMQPushConsumer(final String consumerGroup) {
+    public DefaultMQRealPushConsumer(final String consumerGroup) {
         this(consumerGroup, null, new AllocateMessageQueueAveragely());
     }
 
@@ -474,7 +476,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         this.consumeThreadMin = consumeThreadMin;
     }
 
-    public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
+    public MQPushConsumerInner getDefaultMQPushConsumerImpl() {
         return defaultMQPushConsumerImpl;
     }
 
@@ -675,9 +677,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      * Subscribe a topic by message selector.
      *
      * @param topic topic to consume.
-     * @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector}
-     * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
-     * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
+     * @param messageSelector {@link MessageSelector}
+     * @see MessageSelector#bySql
+     * @see MessageSelector#byTag
      */
     @Override
     public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
index bc6d328..e60085f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListener;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
 
 /**
  * Push consumer
@@ -49,14 +50,13 @@ public interface MQPushConsumer extends MQConsumer {
      * Subscribe some topic
      *
      * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
-     * null or * expression,meaning subscribe
-     * all
+     * null or * expression,meaning subscribe all
      */
     void subscribe(final String topic, final String subExpression) throws MQClientException;
 
     /**
-     * This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
-     * is recommended.
+     * This method will be removed in the version 5.0.0,because filterServer was removed,and method
+     * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
      *
      * Subscribe some topic
      *
@@ -70,8 +70,8 @@ public interface MQPushConsumer extends MQConsumer {
     /**
      * Subscribe some topic with selector.
      * <p>
-     * This interface also has the ability of {@link #subscribe(String, String)},
-     * and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
+     * This interface also has the ability of {@link #subscribe(String, String)}, and, support other message selection,
+     * such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
      * </p>
      * <p/>
      * <p>
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java
new file mode 100644
index 0000000..80e4e34
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+/**
+ * Real Push consumer
+ */
+public interface MQRealPushConsumer extends MQPushConsumer {
+
+    MQPushConsumerInner getDefaultMQPushConsumerImpl();
+
+    long getConsumeTimeout();
+
+    int getPullThresholdForTopic();
+
+    int getPullThresholdForQueue();
+
+    void setPullThresholdForQueue(int pullThresholdForQueue);
+
+    int getPullThresholdSizeForTopic();
+
+    void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic);
+
+    int getPullThresholdSizeForQueue();
+
+    void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue);
+
+    ConsumeFromWhere getConsumeFromWhere();
+
+    String getConsumeTimestamp();
+
+    String getConsumerGroup();
+
+    int getConsumeThreadMin();
+
+    void setConsumeThreadMin(int consumeThreadMin);
+
+    int getConsumeThreadMax();
+
+    long getSuspendCurrentQueueTimeMillis();
+
+    int getMaxReconsumeTimes();
+
+    void setMaxReconsumeTimes(final int maxReconsumeTimes);
+
+    int getConsumeMessageBatchMaxSize();
+
+    MessageModel getMessageModel();
+
+}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 58b985f..c6c3f2d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
@@ -50,8 +51,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
     private static final InternalLogger log = ClientLogger.getLog();
-    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
-    private final DefaultMQPushConsumer defaultMQPushConsumer;
+    private final MQPushConsumerInner defaultMQPushConsumerImpl;
+    private final MQRealPushConsumer defaultMQPushConsumer;
     private final MessageListenerConcurrently messageListener;
     private final BlockingQueue<Runnable> consumeRequestQueue;
     private final ThreadPoolExecutor consumeExecutor;
@@ -60,7 +61,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     private final ScheduledExecutorService scheduledExecutorService;
     private final ScheduledExecutorService cleanExpireMsgExecutors;
 
-    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+    public ConsumeMessageConcurrentlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
         MessageListenerConcurrently messageListener) {
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
         this.messageListener = messageListener;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index d4a9953..e1717f8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
@@ -52,8 +53,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
     private static final InternalLogger log = ClientLogger.getLog();
     private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
         Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
-    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
-    private final DefaultMQPushConsumer defaultMQPushConsumer;
+    private final MQPushConsumerInner defaultMQPushConsumerImpl;
+    private final MQRealPushConsumer defaultMQPushConsumer;
     private final MessageListenerOrderly messageListener;
     private final BlockingQueue<Runnable> consumeRequestQueue;
     private final ThreadPoolExecutor consumeExecutor;
@@ -62,7 +63,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
     private final ScheduledExecutorService scheduledExecutorService;
     private volatile boolean stopped = false;
 
-    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+    public ConsumeMessageOrderlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
         MessageListenerOrderly messageListener) {
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
         this.messageListener = messageListener;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index ff5f418..6108530 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -26,10 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -76,9 +73,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-public class DefaultMQPushConsumerImpl implements MQConsumerInner {
+public class DefaultMQPushConsumerImpl implements MQPushConsumerInner {
     /**
      * Delay some time when exception occur
      */
@@ -95,8 +93,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
     private final InternalLogger log = ClientLogger.getLog();
     private final DefaultMQPushConsumer defaultMQPushConsumer;
-    //private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
-    private final RebalanceImpl rebalanceImpl;
+    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
     private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
     private final long consumerStartTimestamp = System.currentTimeMillis();
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
@@ -111,26 +108,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private ConsumeMessageService consumeMessageService;
     private long queueFlowControlTimes = 0;
     private long queueMaxSpanFlowControlTimes = 0;
-    private boolean realPushModel = true;
-    private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
-    private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
-    private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
 
     public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
-        this(defaultMQPushConsumer, rpcHook, true);
-    }
-
-    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
-        boolean realPushModel) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
         this.rpcHook = rpcHook;
-        this.realPushModel = realPushModel;
-        if (realPushModel) {
-            log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
-            rebalanceImpl = new RebalanceRealPushImpl(this);
-        } else {
-            rebalanceImpl = new RebalancePushImpl(this);
-        }
     }
 
     public void registerFilterMessageHook(final FilterMessageHook hook) {
@@ -307,17 +288,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             @Override
             public void onSuccess(PullResult pullResult) {
                 if (pullResult != null) {
-                    //Update local offset according remote offset
-                    String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
-                        pullRequest.getMessageQueue().getTopic(),
-                        pullRequest.getMessageQueue().getBrokerName(),
-                        pullRequest.getMessageQueue().getQueueId());
-                    AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
-                    if (localOffset == null) {
-                        localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
-                    }
-                    localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
-
                     pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                         subscriptionData);
 
@@ -475,15 +445,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
-        String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
-            pullRequest.getMessageQueue().getTopic(),
-            pullRequest.getMessageQueue().getBrokerName(),
-            pullRequest.getMessageQueue().getQueueId());
-        if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
-            //Stop pull request
-            log.info("Stop pull request, {}", localOffsetKey);
-            return;
-        }
         this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
     }
 
@@ -500,15 +461,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     public void executePullRequestImmediately(final PullRequest pullRequest) {
-        String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
-            pullRequest.getMessageQueue().getTopic(),
-            pullRequest.getMessageQueue().getBrokerName(),
-            pullRequest.getMessageQueue().getQueueId());
-        if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
-            //Stop pull request
-            log.info("Stop pull request, {}", localOffsetKey);
-            return;
-        }
         this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
     }
 
@@ -545,8 +497,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
-            String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
-            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg,
+            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(null, brokerAddr,msg,
                 this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
@@ -675,10 +628,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             default:
                 break;
         }
-        this.tryToFindSnodePublishInfo();
+
         this.updateTopicSubscribeInfoWhenSubscriptionChanged();
         this.mQClientFactory.checkClientInBroker();
-        this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
+        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
         this.mQClientFactory.rebalanceImmediately();
     }
 
@@ -896,8 +849,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                 final String topic = entry.getKey();
                 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-                this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
-
             }
         }
     }
@@ -1020,11 +971,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
 
     @Override
     public ConsumeType consumeType() {
-        if (realPushModel) {
-            return ConsumeType.CONSUME_PUSH;
-        } else {
-            return ConsumeType.CONSUME_PASSIVELY;
-        }
+        return ConsumeType.CONSUME_PASSIVELY;
     }
 
     @Override
@@ -1193,90 +1140,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
 
     }
 
-    private void tryToFindSnodePublishInfo() {
-        this.mQClientFactory.updateSnodeInfoFromNameServer();
-    }
-
-    public boolean processPushMessage(final MessageExt msg,
-        final String consumerGroup,
-        final String topic,
-        final String brokerName,
-        final int queueID,
-        final long offset) {
-        String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
-        AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
-        if (localOffset == null) {
-            log.info("Current Local offset have not set, initiallized to -1.");
-            this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
-            return false;
-        }
-        if (localOffset.get() + 1 < offset) {
-            //should start pull message process
-            log.debug("#####Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
-            return false;
-        } else {
-            //Stop pull request
-            log.debug("#####Process Push : Current Local key:{} and  offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
-            AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
-            if (pullStop == null) {
-                this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
-                log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
-            }
-            pullStop = this.pullStopped.get(localOffsetKey);
-            if (!pullStop.get()) {
-                pullStop.set(true);
-                log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
-            }
-            //update local offset
-            localOffset.set(offset);
-            //submit to process queue
-            List<MessageExt> messageExtList = new ArrayList<MessageExt>();
-            messageExtList.add(msg);
-            ProcessQueue processQueue = processQueues.get(localOffsetKey);
-            if (processQueue == null) {
-                processQueues.put(localOffsetKey, new ProcessQueue());
-                processQueue = processQueues.get(localOffsetKey);
-            }
-            processQueue.putMessage(messageExtList);
-            MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
-            this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
-        }
-        return true;
-    }
-
-    private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
-        return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
-    }
-
     public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
-        String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
-        ProcessQueue processQueue = processQueues.get(localOffsetKey);
-        if (processQueue != null) {
-            log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
-            processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
-        }
-        AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
-        if (pullStop != null) {
-            if (pullStop.get()) {
-                pullStop.set(false);
-                log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
-            }
-        }
-        return true;
-    }
-
-    public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
-        String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
-        AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
-        if (pullStop == null) {
-            this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
-            log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
-            return true;
-        }
-        if (!pullStop.get()) {
-            pullStop.set(true);
-            log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
-        }
         return true;
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
similarity index 95%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
index ff5f418..785e5b8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -78,7 +81,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-public class DefaultMQPushConsumerImpl implements MQConsumerInner {
+public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
     /**
      * Delay some time when exception occur
      */
@@ -94,7 +97,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
     private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
     private final InternalLogger log = ClientLogger.getLog();
-    private final DefaultMQPushConsumer defaultMQPushConsumer;
+    private final DefaultMQRealPushConsumer defaultMQPushConsumer;
     //private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
     private final RebalanceImpl rebalanceImpl;
     private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
@@ -116,21 +119,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
     private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
 
-    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
+    public DefaultMQRealPushConsumerImpl(DefaultMQRealPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
         this(defaultMQPushConsumer, rpcHook, true);
     }
 
-    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
+    public DefaultMQRealPushConsumerImpl(DefaultMQRealPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
         boolean realPushModel) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
         this.rpcHook = rpcHook;
         this.realPushModel = realPushModel;
-        if (realPushModel) {
-            log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
-            rebalanceImpl = new RebalanceRealPushImpl(this);
-        } else {
-            rebalanceImpl = new RebalancePushImpl(this);
-        }
+        log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
+        rebalanceImpl = new RebalanceRealPushImpl(this);
     }
 
     public void registerFilterMessageHook(final FilterMessageHook hook) {
@@ -191,7 +190,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         return result;
     }
 
-    public DefaultMQPushConsumer getDefaultMQPushConsumer() {
+    public MQRealPushConsumer getDefaultMQPushConsumer() {
         return defaultMQPushConsumer;
     }
 
@@ -318,7 +317,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     }
                     localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
 
-                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
+                    pullResult = DefaultMQRealPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                         subscriptionData);
 
                     switch (pullResult.getPullStatus()) {
@@ -326,30 +325,30 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                             long prevRequestOffset = pullRequest.getNextOffset();
                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                             long pullRT = System.currentTimeMillis() - beginTimestamp;
-                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
+                            DefaultMQRealPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                 pullRequest.getMessageQueue().getTopic(), pullRT);
 
                             long firstMsgOffset = Long.MAX_VALUE;
                             if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
-                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+                                DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                             } else {
                                 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
 
-                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
+                                DefaultMQRealPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                     pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
 
                                 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
-                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
+                                DefaultMQRealPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                     pullResult.getMsgFoundList(),
                                     processQueue,
                                     pullRequest.getMessageQueue(),
                                     dispatchToConsume);
 
-                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
-                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
-                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
+                                if (DefaultMQRealPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
+                                    DefaultMQRealPushConsumerImpl.this.executePullRequestLater(pullRequest,
+                                        DefaultMQRealPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                 } else {
-                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+                                    DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                 }
                             }
 
@@ -366,16 +365,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                         case NO_NEW_MSG:
                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 
-                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+                            DefaultMQRealPushConsumerImpl.this.correctTagsOffset(pullRequest);
 
-                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+                            DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                             break;
                         case NO_MATCHED_MSG:
                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 
-                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+                            DefaultMQRealPushConsumerImpl.this.correctTagsOffset(pullRequest);
 
-                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+                            DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                             break;
                         case OFFSET_ILLEGAL:
                             log.warn("the pull request offset illegal, {} {}",
@@ -383,17 +382,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 
                             pullRequest.getProcessQueue().setDropped(true);
-                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
+                            DefaultMQRealPushConsumerImpl.this.executeTaskLater(new Runnable() {
 
                                 @Override
                                 public void run() {
                                     try {
-                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
+                                        DefaultMQRealPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                             pullRequest.getNextOffset(), false);
 
-                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
+                                        DefaultMQRealPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
 
-                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
+                                        DefaultMQRealPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
 
                                         log.warn("fix the pull request offset, {}", pullRequest);
                                     } catch (Throwable e) {
@@ -414,7 +413,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     log.warn("execute the pull request exception", e);
                 }
 
-                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+                DefaultMQRealPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
             }
         };
 
@@ -856,7 +855,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         try {
             Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
             if (sub != null) {
-                for (final Map.Entry<String, String> entry : sub.entrySet()) {
+                for (final Entry<String, String> entry : sub.entrySet()) {
                     final String topic = entry.getKey();
                     final String subString = entry.getValue();
                     SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
@@ -893,7 +892,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
         Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
         if (subTable != null) {
-            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+            for (final Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                 final String topic = entry.getKey();
                 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                 this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java
new file mode 100644
index 0000000..d29667d
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+/**
+ * Push Consumer inner interface
+ */
+public interface MQPushConsumerInner extends MQConsumerInner {
+
+    MQRealPushConsumer getDefaultMQPushConsumer();
+
+    OffsetStore getOffsetStore();
+
+    boolean isConsumeOrderly();
+
+    MQClientInstance getmQClientFactory();
+
+    boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID);
+
+    void executePullRequestImmediately(final PullRequest pullRequest);
+
+    RebalanceImpl getRebalanceImpl();
+
+    ConsumerStatsManager getConsumerStatsManager();
+
+    void sendMessageBack(MessageExt msg, int delayLevel,
+        final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
+    boolean hasHook();
+
+    void registerConsumeMessageHook(final ConsumeMessageHook hook);
+
+    void executeHookBefore(final ConsumeMessageContext context);
+
+    void executeHookAfter(final ConsumeMessageContext context);
+
+    void pullMessage(final PullRequest pullRequest);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 0a52817..661bf03 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -73,7 +75,7 @@ public class ProcessQueue {
     /**
      * @param pushConsumer
      */
-    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
+    public void cleanExpiredMsg(MQRealPushConsumer pushConsumer) {
         if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
             return;
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index bd46a58..d58eed3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -79,7 +79,12 @@ public class PullMessageService extends ServiceThread {
     private void pullMessage(final PullRequest pullRequest) {
         final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
         if (consumer != null) {
-            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
+            MQPushConsumerInner impl;
+            if (consumer instanceof DefaultMQRealPushConsumerImpl) {
+                impl = (DefaultMQRealPushConsumerImpl) consumer;
+            } else {
+                impl = (DefaultMQPushConsumerImpl) consumer;
+            }
             impl.pullMessage(pullRequest);
         } else {
             log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 2e622be..8768ffa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -34,15 +35,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
 public class RebalancePushImpl extends RebalanceImpl {
     private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
-    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+    private final MQPushConsumerInner defaultMQPushConsumerImpl;
 
-    public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+    public RebalancePushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
         this(null, null, null, null, defaultMQPushConsumerImpl);
     }
 
     public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy,
-        MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+        MQClientInstance mQClientFactory, MQPushConsumerInner defaultMQPushConsumerImpl) {
         super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java
index 8bd2701..8ee30a1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java
@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 
 public class RebalanceRealPushImpl extends RebalancePushImpl {
 
-    public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+    public RebalanceRealPushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
         super(defaultMQPushConsumerImpl);
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 0953fdc..d5dcb39 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl;
 import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.client.impl.consumer.PullMessageService;
@@ -1387,7 +1388,7 @@ public class MQClientInstance {
             consumerGroup, topic, brokerName, queueID, offset);
         MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
         if (null != mqConsumerInner) {
-            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
+            DefaultMQRealPushConsumerImpl consumer = (DefaultMQRealPushConsumerImpl) mqConsumerInner;
             consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset);
             return true;
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 87a795e..dd795cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.client.log.ClientLogger;
@@ -67,7 +68,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private volatile Thread shutDownHook;
     private volatile boolean stopped = false;
     private DefaultMQProducerImpl hostProducer;
-    private DefaultMQPushConsumerImpl hostConsumer;
+    private MQPushConsumerInner hostConsumer;
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
     private String dispatcherId = UUID.randomUUID().toString();
     private String traceTopicName;
@@ -117,11 +118,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         this.hostProducer = hostProducer;
     }
 
-    public DefaultMQPushConsumerImpl getHostConsumer() {
+    public MQPushConsumerInner getHostConsumer() {
         return hostConsumer;
     }
 
-    public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
+    public void setHostConsumer(MQPushConsumerInner hostConsumer) {
         this.hostConsumer = hostConsumer;
     }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
index df3b3b0..233b002 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.example.quickstart;
 
 import java.util.List;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@@ -26,7 +26,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 
 /**
- * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
+ * This example shows how to subscribe and consume messages using providing {@link DefaultMQRealPushConsumer}.
  */
 public class Consumer {
 
@@ -35,7 +35,7 @@ public class Consumer {
         /*
          * Instantiate with specified consumer group name.
          */
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello");
+        DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("hello");
 
         /*
          * Specify name server addresses.
@@ -61,7 +61,6 @@ public class Consumer {
         /*
          *  Register callback to execute on arrival of messages fetched from brokers.
          */
-//        consumer.setNamesrvAddr("47.102.149.193:9876");
 
         consumer.registerMessageListener(new MessageListenerConcurrently() {
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index 98dcd61..80c27f4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -47,7 +47,6 @@ public class Producer {
         /*
          * Launch the instance.
          */
-//        producer.setNamesrvAddr("47.102.149.193:9876");
         producer.start();
 
         for (int i = 0; i < 10; i++) {
@@ -76,7 +75,7 @@ public class Producer {
         /*
          * Shut down once the producer instance is not longer in use.
          */
-        Thread.sleep(30000L);
+        Thread.sleep(3000L);
         producer.shutdown();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
index abbfbdf..29a51ac 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.example.simple;
 
 import java.util.List;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@@ -28,7 +28,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 public class PushConsumer {
 
     public static void main(String[] args) throws InterruptedException, MQClientException {
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
+        DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("CID_JODIE_1");
         consumer.subscribe("TopicTest", "*");
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         //wrong time format 2017_0422_221800