You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:15 UTC

[24/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
new file mode 100644
index 0000000..664b9fb
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -0,0 +1,1071 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQPushConsumerImpl implements MQConsumerInner {
+    /**
+     * Delay some time when exception occur
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
+    /**
+     * Flow control interval
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+    /**
+     * Delay some time when suspend pull service
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
+    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
+    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
+    private final Logger log = ClientLogger.getLog();
+    private final DefaultMQPushConsumer defaultMQPushConsumer;
+    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
+    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
+    private final long consumerStartTimestamp = System.currentTimeMillis();
+    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
+    private final RPCHook rpcHook;
+    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private MQClientInstance mQClientFactory;
+    private PullAPIWrapper pullAPIWrapper;
+    private volatile boolean pause = false;
+    private boolean consumeOrderly = false;
+    private MessageListener messageListenerInner;
+    private OffsetStore offsetStore;
+    private ConsumeMessageService consumeMessageService;
+    private long flowControlTimes1 = 0;
+    private long flowControlTimes2 = 0;
+
+
+    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
+        this.defaultMQPushConsumer = defaultMQPushConsumer;
+        this.rpcHook = rpcHook;
+    }
+
+    public void registerFilterMessageHook(final FilterMessageHook hook) {
+        this.filterMessageHookList.add(hook);
+        log.info("register FilterMessageHook Hook, {}", hook.hookName());
+    }
+
+    public boolean hasHook() {
+        return !this.consumeMessageHookList.isEmpty();
+    }
+
+    public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+        this.consumeMessageHookList.add(hook);
+        log.info("register consumeMessageHook Hook, {}", hook.hookName());
+    }
+
+    public void executeHookBefore(final ConsumeMessageContext context) {
+        if (!this.consumeMessageHookList.isEmpty()) {
+            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+                try {
+                    hook.consumeMessageBefore(context);
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    public void executeHookAfter(final ConsumeMessageContext context) {
+        if (!this.consumeMessageHookList.isEmpty()) {
+            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+                try {
+                    hook.consumeMessageAfter(context);
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
+        Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
+        if (null == result) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
+        }
+
+        if (null == result) {
+            throw new MQClientException("The topic[" + topic + "] not exist", null);
+        }
+
+        return result;
+    }
+
+    public DefaultMQPushConsumer getDefaultMQPushConsumer() {
+        return defaultMQPushConsumer;
+    }
+
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
+    }
+
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+    }
+
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
+    }
+
+    public OffsetStore getOffsetStore() {
+        return offsetStore;
+    }
+
+    public void setOffsetStore(OffsetStore offsetStore) {
+        this.offsetStore = offsetStore;
+    }
+
+    public void pullMessage(final PullRequest pullRequest) {
+        final ProcessQueue processQueue = pullRequest.getProcessQueue();
+        if (processQueue.isDropped()) {
+            log.info("the pull request[{}] is dropped.", pullRequest.toString());
+            return;
+        }
+
+        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
+
+        try {
+            this.makeSureStateOK();
+        } catch (MQClientException e) {
+            log.warn("pullMessage exception, consumer state not ok", e);
+            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+            return;
+        }
+
+        if (this.isPause()) {
+            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
+            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
+            return;
+        }
+
+        long size = processQueue.getMsgCount().get();
+        if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
+            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+            if ((flowControlTimes1++ % 1000) == 0) {
+                log.warn(
+                        "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
+                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+            }
+            return;
+        }
+
+        if (!this.consumeOrderly) {
+            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
+                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+                if ((flowControlTimes2++ % 1000) == 0) {
+                    log.warn(
+                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
+                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
+                            pullRequest, flowControlTimes2);
+                }
+                return;
+            }
+        } else {
+            if (processQueue.isLocked()) {
+                if (!pullRequest.isLockedFirst()) {
+                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
+                    boolean brokerBusy = offset < pullRequest.getNextOffset();
+                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
+                            pullRequest, offset, brokerBusy);
+                    if (brokerBusy) {
+                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
+                                pullRequest, offset);
+                    }
+
+                    pullRequest.setLockedFirst(true);
+                    pullRequest.setNextOffset(offset);
+                }
+            } else {
+                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+                log.info("pull message later because not locked in broker, {}", pullRequest);
+                return;
+            }
+        }
+
+        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
+        if (null == subscriptionData) {
+            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+            log.warn("find the consumer's subscription failed, {}", pullRequest);
+            return;
+        }
+
+        final long beginTimestamp = System.currentTimeMillis();
+
+        PullCallback pullCallback = new PullCallback() {
+            @Override
+            public void onSuccess(PullResult pullResult) {
+                if (pullResult != null) {
+                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
+                            subscriptionData);
+
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            long prevRequestOffset = pullRequest.getNextOffset();
+                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+                            long pullRT = System.currentTimeMillis() - beginTimestamp;
+                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
+                                    pullRequest.getMessageQueue().getTopic(), pullRT);
+
+                            long firstMsgOffset = Long.MAX_VALUE;
+                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
+                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+                            } else {
+                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
+
+                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
+                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
+
+                                boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
+                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
+                                        pullResult.getMsgFoundList(), //
+                                        processQueue, //
+                                        pullRequest.getMessageQueue(), //
+                                        dispathToConsume);
+
+                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
+                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
+                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
+                                } else {
+                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+                                }
+                            }
+
+                            if (pullResult.getNextBeginOffset() < prevRequestOffset//
+                                    || firstMsgOffset < prevRequestOffset) {
+                                log.warn(
+                                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
+                                        pullResult.getNextBeginOffset(), //
+                                        firstMsgOffset, //
+                                        prevRequestOffset);
+                            }
+
+                            break;
+                        case NO_NEW_MSG:
+                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+
+                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+
+                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+                            break;
+                        case NO_MATCHED_MSG:
+                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+
+                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+
+                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+                            break;
+                        case OFFSET_ILLEGAL:
+                            log.warn("the pull request offset illegal, {} {}", //
+                                    pullRequest.toString(), pullResult.toString());
+                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+
+                            pullRequest.getProcessQueue().setDropped(true);
+                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
+
+                                @Override
+                                public void run() {
+                                    try {
+                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
+                                                pullRequest.getNextOffset(), false);
+
+                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
+
+                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
+
+                                        log.warn("fix the pull request offset, {}", pullRequest);
+                                    } catch (Throwable e) {
+                                        log.error("executeTaskLater Exception", e);
+                                    }
+                                }
+                            }, 10000);
+                            break;
+                        default:
+                            break;
+                    }
+                }
+            }
+
+
+            @Override
+            public void onException(Throwable e) {
+                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                    log.warn("execute the pull request exception", e);
+                }
+
+                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+            }
+        };
+
+        boolean commitOffsetEnable = false;
+        long commitOffsetValue = 0L;
+        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
+            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
+            if (commitOffsetValue > 0) {
+                commitOffsetEnable = true;
+            }
+        }
+
+        String subExpression = null;
+        boolean classFilter = false;
+        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
+        if (sd != null) {
+            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
+                subExpression = sd.getSubString();
+            }
+
+            classFilter = sd.isClassFilterMode();
+        }
+
+        int sysFlag = PullSysFlag.buildSysFlag(//
+                commitOffsetEnable, // commitOffset
+                true, // suspend
+                subExpression != null, // subscription
+                classFilter // class filter
+        );
+        try {
+            this.pullAPIWrapper.pullKernelImpl(//
+                    pullRequest.getMessageQueue(), // 1
+                    subExpression, // 2
+                    subscriptionData.getSubVersion(), // 3
+                    pullRequest.getNextOffset(), // 4
+                    this.defaultMQPushConsumer.getPullBatchSize(), // 5
+                    sysFlag, // 6
+                    commitOffsetValue, // 7
+                    BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
+                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
+                    CommunicationMode.ASYNC, // 10
+                    pullCallback// 11
+            );
+        } catch (Exception e) {
+            log.error("pullKernelImpl exception", e);
+            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+        }
+    }
+
+    private void makeSureStateOK() throws MQClientException {
+        if (this.serviceState != ServiceState.RUNNING) {
+            throw new MQClientException("The consumer service state not OK, "//
+                    + this.serviceState//
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                    null);
+        }
+    }
+
+    private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
+        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
+    }
+
+    public boolean isPause() {
+        return pause;
+    }
+
+    public void setPause(boolean pause) {
+        this.pause = pause;
+    }
+
+    public ConsumerStatsManager getConsumerStatsManager() {
+        return this.mQClientFactory.getConsumerStatsManager();
+    }
+
+    public void executePullRequestImmediately(final PullRequest pullRequest) {
+        this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
+    }
+
+    private void correctTagsOffset(final PullRequest pullRequest) {
+        if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
+            this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
+        }
+    }
+
+    public void executeTaskLater(final Runnable r, final long timeDelay) {
+        this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay);
+    }
+
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+            throws MQClientException, InterruptedException {
+        return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
+    }
+
+    public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException,
+            InterruptedException {
+        return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
+    }
+
+
+    public void registerMessageListener(MessageListener messageListener) {
+        this.messageListenerInner = messageListener;
+    }
+
+    public void resume() {
+        this.pause = false;
+        doRebalance();
+        log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
+    }
+
+    public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+                    : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
+                    this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
+        } catch (Exception e) {
+            log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
+
+            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
+
+            String originMsgId = MessageAccessor.getOriginMessageId(msg);
+            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+
+            newMsg.setFlag(msg.getFlag());
+            MessageAccessor.setProperties(newMsg, msg.getProperties());
+            MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
+            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+
+            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
+        }
+    }
+
+    private int getMaxReconsumeTimes() {
+        // default reconsume times: 16
+        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
+            return 16;
+        } else {
+            return this.defaultMQPushConsumer.getMaxReconsumeTimes();
+        }
+    }
+
+    public void shutdown() {
+        switch (this.serviceState) {
+            case CREATE_JUST:
+                break;
+            case RUNNING:
+                this.consumeMessageService.shutdown();
+                this.persistConsumerOffset();
+                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
+                this.mQClientFactory.shutdown();
+                log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
+                this.rebalanceImpl.destroy();
+                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+                break;
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+    }
+
+    public void start() throws MQClientException {
+        switch (this.serviceState) {
+            case CREATE_JUST:
+                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
+                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
+                this.serviceState = ServiceState.START_FAILED;
+
+                this.checkConfig();
+
+                this.copySubscription();
+
+                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
+                    this.defaultMQPushConsumer.changeInstanceNameToPID();
+                }
+
+                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
+
+                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
+                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
+                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
+                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
+
+                this.pullAPIWrapper = new PullAPIWrapper(
+                        mQClientFactory,
+                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
+                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
+
+                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
+                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
+                } else {
+                    switch (this.defaultMQPushConsumer.getMessageModel()) {
+                        case BROADCASTING:
+                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
+                            break;
+                        case CLUSTERING:
+                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
+                            break;
+                        default:
+                            break;
+                    }
+                }
+                this.offsetStore.load();
+
+                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
+                    this.consumeOrderly = true;
+                    this.consumeMessageService =
+                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
+                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
+                    this.consumeOrderly = false;
+                    this.consumeMessageService =
+                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
+                }
+
+                this.consumeMessageService.start();
+
+                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
+                if (!registerOK) {
+                    this.serviceState = ServiceState.CREATE_JUST;
+                    this.consumeMessageService.shutdown();
+                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+                            null);
+                }
+
+                mQClientFactory.start();
+                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
+                this.serviceState = ServiceState.RUNNING;
+                break;
+            case RUNNING:
+            case START_FAILED:
+            case SHUTDOWN_ALREADY:
+                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
+                        + this.serviceState//
+                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                        null);
+            default:
+                break;
+        }
+
+        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
+
+        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+
+        this.mQClientFactory.rebalanceImmediately();
+    }
+
+    private void checkConfig() throws MQClientException {
+        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
+
+        if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
+            throw new MQClientException(
+                    "consumerGroup is null"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
+            throw new MQClientException(
+                    "consumerGroup can not equal "
+                            + MixAll.DEFAULT_CONSUMER_GROUP
+                            + ", please specify another one."
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        if (null == this.defaultMQPushConsumer.getMessageModel()) {
+            throw new MQClientException(
+                    "messageModel is null"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
+            throw new MQClientException(
+                    "consumeFromWhere is null"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS);
+        if (null == dt) {
+            throw new MQClientException(
+                    "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // allocateMessageQueueStrategy
+        if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
+            throw new MQClientException(
+                    "allocateMessageQueueStrategy is null"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // subscription
+        if (null == this.defaultMQPushConsumer.getSubscription()) {
+            throw new MQClientException(
+                    "subscription is null"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // messageListener
+        if (null == this.defaultMQPushConsumer.getMessageListener()) {
+            throw new MQClientException(
+                    "messageListener is null"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
+        boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
+        if (!orderly && !concurrently) {
+            throw new MQClientException(
+                    "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // consumeThreadMin
+        if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
+                || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
+                || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
+            throw new MQClientException(
+                    "consumeThreadMin Out of range [1, 1000]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // consumeThreadMax
+        if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
+            throw new MQClientException(
+                    "consumeThreadMax Out of range [1, 1000]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // consumeConcurrentlyMaxSpan
+        if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
+                || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
+            throw new MQClientException(
+                    "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // pullThresholdForQueue
+        if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
+            throw new MQClientException(
+                    "pullThresholdForQueue Out of range [1, 65535]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // pullInterval
+        if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
+            throw new MQClientException(
+                    "pullInterval Out of range [0, 65535]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // consumeMessageBatchMaxSize
+        if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
+                || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
+            throw new MQClientException(
+                    "consumeMessageBatchMaxSize Out of range [1, 1024]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+
+        // pullBatchSize
+        if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
+            throw new MQClientException(
+                    "pullBatchSize Out of range [1, 1024]"
+                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+        }
+    }
+
+    private void copySubscription() throws MQClientException {
+        try {
+            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
+            if (sub != null) {
+                for (final Map.Entry<String, String> entry : sub.entrySet()) {
+                    final String topic = entry.getKey();
+                    final String subString = entry.getValue();
+                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+                            topic, subString);
+                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+                }
+            }
+
+            if (null == this.messageListenerInner) {
+                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
+            }
+
+            switch (this.defaultMQPushConsumer.getMessageModel()) {
+                case BROADCASTING:
+                    break;
+                case CLUSTERING:
+                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
+                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+                            retryTopic, SubscriptionData.SUB_ALL);
+                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
+                    break;
+                default:
+                    break;
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscription exception", e);
+        }
+    }
+
+    public MessageListener getMessageListenerInner() {
+        return messageListenerInner;
+    }
+
+    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+        if (subTable != null) {
+            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+                final String topic = entry.getKey();
+                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            }
+        }
+    }
+
+    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
+        return this.rebalanceImpl.getSubscriptionInner();
+    }
+
+    public void subscribe(String topic, String subExpression) throws MQClientException {
+        try {
+            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+                    topic, subExpression);
+            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+            if (this.mQClientFactory != null) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscription exception", e);
+        }
+    }
+
+    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
+        try {
+            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+                    topic, "*");
+            subscriptionData.setSubString(fullClassName);
+            subscriptionData.setClassFilterMode(true);
+            subscriptionData.setFilterClassSource(filterClassSource);
+            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+            if (this.mQClientFactory != null) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+            }
+
+        } catch (Exception e) {
+            throw new MQClientException("subscription exception", e);
+        }
+    }
+
+    public void suspend() {
+        this.pause = true;
+        log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
+    }
+
+    public void unsubscribe(String topic) {
+        this.rebalanceImpl.getSubscriptionInner().remove(topic);
+    }
+
+    public void updateConsumeOffset(MessageQueue mq, long offset) {
+        this.offsetStore.updateOffset(mq, offset, false);
+    }
+
+    public void updateCorePoolSize(int corePoolSize) {
+        this.consumeMessageService.updateCorePoolSize(corePoolSize);
+    }
+
+    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
+    }
+
+    public RebalanceImpl getRebalanceImpl() {
+        return rebalanceImpl;
+    }
+
+    public boolean isConsumeOrderly() {
+        return consumeOrderly;
+    }
+
+    public void setConsumeOrderly(boolean consumeOrderly) {
+        this.consumeOrderly = consumeOrderly;
+    }
+
+    public void resetOffsetByTimeStamp(long timeStamp)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) {
+            Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
+            Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
+            if (mqs != null) {
+                for (MessageQueue mq : mqs) {
+                    long offset = searchOffset(mq, timeStamp);
+                    offsetTable.put(mq, offset);
+                }
+                this.mQClientFactory.resetOffset(topic, groupName(), offsetTable);
+            }
+        }
+    }
+
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+    }
+
+    @Override
+    public String groupName() {
+        return this.defaultMQPushConsumer.getConsumerGroup();
+    }
+
+    @Override
+    public MessageModel messageModel() {
+        return this.defaultMQPushConsumer.getMessageModel();
+    }
+
+    @Override
+    public ConsumeType consumeType() {
+        return ConsumeType.CONSUME_PASSIVELY;
+    }
+
+    @Override
+    public ConsumeFromWhere consumeFromWhere() {
+        return this.defaultMQPushConsumer.getConsumeFromWhere();
+    }
+
+    @Override
+    public Set<SubscriptionData> subscriptions() {
+        Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
+
+        subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
+
+        return subSet;
+    }
+
+    @Override
+    public void doRebalance() {
+        if (this.rebalanceImpl != null && !this.pause) {
+            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
+        }
+    }
+
+    @Override
+    public void persistConsumerOffset() {
+        try {
+            this.makeSureStateOK();
+            Set<MessageQueue> mqs = new HashSet<MessageQueue>();
+            Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
+            if (allocateMq != null) {
+                mqs.addAll(allocateMq);
+            }
+
+            this.offsetStore.persistAll(mqs);
+        } catch (Exception e) {
+            log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
+        }
+    }
+
+    @Override
+    public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
+        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+        if (subTable != null) {
+            if (subTable.containsKey(topic)) {
+                this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
+            }
+        }
+    }
+
+    @Override
+    public boolean isSubscribeTopicNeedUpdate(String topic) {
+        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+        if (subTable != null) {
+            if (subTable.containsKey(topic)) {
+                return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean isUnitMode() {
+        return this.defaultMQPushConsumer.isUnitMode();
+    }
+
+    @Override
+    public ConsumerRunningInfo consumerRunningInfo() {
+        ConsumerRunningInfo info = new ConsumerRunningInfo();
+
+        Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);
+
+        prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));
+        prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize()));
+        prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
+
+        info.setProperties(prop);
+
+        Set<SubscriptionData> subSet = this.subscriptions();
+        info.getSubscriptionSet().addAll(subSet);
+
+        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, ProcessQueue> next = it.next();
+            MessageQueue mq = next.getKey();
+            ProcessQueue pq = next.getValue();
+
+            ProcessQueueInfo pqinfo = new ProcessQueueInfo();
+            pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
+            pq.fillProcessQueueInfo(pqinfo);
+            info.getMqTable().put(mq, pqinfo);
+        }
+
+        for (SubscriptionData sd : subSet) {
+            ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());
+            info.getStatusTable().put(sd.getTopic(), consumeStatus);
+        }
+
+        return info;
+    }
+
+    public MQClientInstance getmQClientFactory() {
+        return mQClientFactory;
+    }
+
+    public void setmQClientFactory(MQClientInstance mQClientFactory) {
+        this.mQClientFactory = mQClientFactory;
+    }
+
+    public ServiceState getServiceState() {
+        return serviceState;
+    }
+
+    public void setServiceState(ServiceState serviceState) {
+        this.serviceState = serviceState;
+    }
+
+    public void adjustThreadPool() {
+        long computeAccTotal = this.computeAccumulationTotal();
+        long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
+
+        long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
+
+        long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
+
+        if (computeAccTotal >= incThreshold) {
+            this.consumeMessageService.incCorePoolSize();
+        }
+
+        if (computeAccTotal < decThreshold) {
+            this.consumeMessageService.decCorePoolSize();
+        }
+    }
+
+    private long computeAccumulationTotal() {
+        long msgAccTotal = 0;
+        ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
+        Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, ProcessQueue> next = it.next();
+            ProcessQueue value = next.getValue();
+            msgAccTotal += value.getMsgAccCnt();
+        }
+
+        return msgAccTotal;
+    }
+
+    public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic)
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
+        TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000);
+        for (BrokerData brokerData : routeData.getBrokerDatas()) {
+            String addr = brokerData.selectBrokerAddr();
+            queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, groupName(), 3000));
+        }
+
+        return queueTimeSpan;
+    }
+
+
+    public ConsumeMessageService getConsumeMessageService() {
+        return consumeMessageService;
+    }
+
+
+    public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
+        this.consumeMessageService = consumeMessageService;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
new file mode 100644
index 0000000..c1abd2f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.util.Set;
+
+
+/**
+ * Consumer inner interface
+ *
+ * @author shijia.wxr
+ */
+public interface MQConsumerInner {
+    String groupName();
+
+
+    MessageModel messageModel();
+
+
+    ConsumeType consumeType();
+
+
+    ConsumeFromWhere consumeFromWhere();
+
+
+    Set<SubscriptionData> subscriptions();
+
+
+    void doRebalance();
+
+
+    void persistConsumerOffset();
+
+
+    void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info);
+
+
+    boolean isSubscribeTopicNeedUpdate(final String topic);
+
+
+    boolean isUnitMode();
+
+
+    ConsumerRunningInfo consumerRunningInfo();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
new file mode 100644
index 0000000..0849b5e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Message lock,strictly ensure the single queue only one thread at a time consuming
+ *
+ * @author shijia.wxr
+ */
+public class MessageQueueLock {
+    private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
+            new ConcurrentHashMap<MessageQueue, Object>();
+
+
+    public Object fetchLockObject(final MessageQueue mq) {
+        Object objLock = this.mqLockTable.get(mq);
+        if (null == objLock) {
+            objLock = new Object();
+            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
+            if (prevLock != null) {
+                objLock = prevLock;
+            }
+        }
+
+        return objLock;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
new file mode 100644
index 0000000..adca859
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * Queue consumption snapshot
+ *
+ * @author shijia.wxr
+ */
+public class ProcessQueue {
+    public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
+            Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
+    public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
+    private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
+    private final Logger log = ClientLogger.getLog();
+    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
+    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
+    private final AtomicLong msgCount = new AtomicLong();
+    private final Lock lockConsume = new ReentrantLock();
+    private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
+    private final AtomicLong tryUnlockTimes = new AtomicLong(0);
+    private volatile long queueOffsetMax = 0L;
+    private volatile boolean dropped = false;
+    private volatile long lastPullTimestamp = System.currentTimeMillis();
+    private volatile long lastConsumeTimestamp = System.currentTimeMillis();
+    private volatile boolean locked = false;
+    private volatile long lastLockTimestamp = System.currentTimeMillis();
+    private volatile boolean consuming = false;
+    private volatile long msgAccCnt = 0;
+
+    public boolean isLockExpired() {
+        boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
+        return result;
+    }
+
+
+    public boolean isPullExpired() {
+        boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
+        return result;
+    }
+
+    /**
+
+     *
+     * @param pushConsumer
+     */
+    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
+        if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
+            return;
+        }
+        
+        int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
+        for (int i = 0; i < loop; i++) {
+            MessageExt msg = null;
+            try {
+                this.lockTreeMap.readLock().lockInterruptibly();
+                try {
+                    if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
+                        msg = msgTreeMap.firstEntry().getValue();
+                    } else {
+
+                        break;
+                    }
+                } finally {
+                    this.lockTreeMap.readLock().unlock();
+                }
+            } catch (InterruptedException e) {
+                log.error("getExpiredMsg exception", e);
+            }
+
+            try {
+
+                pushConsumer.sendMessageBack(msg, 3);
+                log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+                try {
+                    this.lockTreeMap.writeLock().lockInterruptibly();
+                    try {
+                        if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
+                            try {
+                                msgTreeMap.remove(msgTreeMap.firstKey());
+                            } catch (Exception e) {
+                                log.error("send expired msg exception", e);
+                            }
+                        }
+                    } finally {
+                        this.lockTreeMap.writeLock().unlock();
+                    }
+                } catch (InterruptedException e) {
+                    log.error("getExpiredMsg exception", e);
+                }
+            } catch (Exception e) {
+                log.error("send expired msg exception", e);
+            }
+        }
+    }
+
+
+    public boolean putMessage(final List<MessageExt> msgs) {
+        boolean dispatchToConsume = false;
+        try {
+            this.lockTreeMap.writeLock().lockInterruptibly();
+            try {
+                int validMsgCnt = 0;
+                for (MessageExt msg : msgs) {
+                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
+                    if (null == old) {
+                        validMsgCnt++;
+                        this.queueOffsetMax = msg.getQueueOffset();
+                    }
+                }
+                msgCount.addAndGet(validMsgCnt);
+
+                if (!msgTreeMap.isEmpty() && !this.consuming) {
+                    dispatchToConsume = true;
+                    this.consuming = true;
+                }
+
+                if (!msgs.isEmpty()) {
+                    MessageExt messageExt = msgs.get(msgs.size() - 1);
+                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
+                    if (property != null) {
+                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
+                        if (accTotal > 0) {
+                            this.msgAccCnt = accTotal;
+                        }
+                    }
+                }
+            } finally {
+                this.lockTreeMap.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("putMessage exception", e);
+        }
+
+        return dispatchToConsume;
+    }
+
+
+    public long getMaxSpan() {
+        try {
+            this.lockTreeMap.readLock().lockInterruptibly();
+            try {
+                if (!this.msgTreeMap.isEmpty()) {
+                    return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
+                }
+            } finally {
+                this.lockTreeMap.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("getMaxSpan exception", e);
+        }
+
+        return 0;
+    }
+
+
+    public long removeMessage(final List<MessageExt> msgs) {
+        long result = -1;
+        final long now = System.currentTimeMillis();
+        try {
+            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.lastConsumeTimestamp = now;
+            try {
+                if (!msgTreeMap.isEmpty()) {
+                    result = this.queueOffsetMax + 1;
+                    int removedCnt = 0;
+                    for (MessageExt msg : msgs) {
+                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
+                        if (prev != null) {
+                            removedCnt--;
+                        }
+                    }
+                    msgCount.addAndGet(removedCnt);
+
+                    if (!msgTreeMap.isEmpty()) {
+                        result = msgTreeMap.firstKey();
+                    }
+                }
+            } finally {
+                this.lockTreeMap.writeLock().unlock();
+            }
+        } catch (Throwable t) {
+            log.error("removeMessage exception", t);
+        }
+
+        return result;
+    }
+
+
+    public TreeMap<Long, MessageExt> getMsgTreeMap() {
+        return msgTreeMap;
+    }
+
+
+    public AtomicLong getMsgCount() {
+        return msgCount;
+    }
+
+
+    public boolean isDropped() {
+        return dropped;
+    }
+
+
+    public void setDropped(boolean dropped) {
+        this.dropped = dropped;
+    }
+
+    public boolean isLocked() {
+        return locked;
+    }
+
+    public void setLocked(boolean locked) {
+        this.locked = locked;
+    }
+
+    public void rollback() {
+        try {
+            this.lockTreeMap.writeLock().lockInterruptibly();
+            try {
+                this.msgTreeMap.putAll(this.msgTreeMapTemp);
+                this.msgTreeMapTemp.clear();
+            } finally {
+                this.lockTreeMap.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("rollback exception", e);
+        }
+    }
+
+
+    public long commit() {
+        try {
+            this.lockTreeMap.writeLock().lockInterruptibly();
+            try {
+                Long offset = this.msgTreeMapTemp.lastKey();
+                msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
+                this.msgTreeMapTemp.clear();
+                if (offset != null) {
+                    return offset + 1;
+                }
+            } finally {
+                this.lockTreeMap.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("commit exception", e);
+        }
+
+        return -1;
+    }
+
+
+    public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
+        try {
+            this.lockTreeMap.writeLock().lockInterruptibly();
+            try {
+                for (MessageExt msg : msgs) {
+                    this.msgTreeMapTemp.remove(msg.getQueueOffset());
+                    this.msgTreeMap.put(msg.getQueueOffset(), msg);
+                }
+            } finally {
+                this.lockTreeMap.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("makeMessageToCosumeAgain exception", e);
+        }
+    }
+
+
+    public List<MessageExt> takeMessags(final int batchSize) {
+        List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
+        final long now = System.currentTimeMillis();
+        try {
+            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.lastConsumeTimestamp = now;
+            try {
+                if (!this.msgTreeMap.isEmpty()) {
+                    for (int i = 0; i < batchSize; i++) {
+                        Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
+                        if (entry != null) {
+                            result.add(entry.getValue());
+                            msgTreeMapTemp.put(entry.getKey(), entry.getValue());
+                        } else {
+                            break;
+                        }
+                    }
+                }
+
+                if (result.isEmpty()) {
+                    consuming = false;
+                }
+            } finally {
+                this.lockTreeMap.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("take Messages exception", e);
+        }
+
+        return result;
+    }
+
+
+    public boolean hasTempMessage() {
+        try {
+            this.lockTreeMap.readLock().lockInterruptibly();
+            try {
+                return !this.msgTreeMap.isEmpty();
+            } finally {
+                this.lockTreeMap.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+        }
+
+        return true;
+    }
+
+
+    public void clear() {
+        try {
+            this.lockTreeMap.writeLock().lockInterruptibly();
+            try {
+                this.msgTreeMap.clear();
+                this.msgTreeMapTemp.clear();
+                this.msgCount.set(0);
+                this.queueOffsetMax = 0L;
+            } finally {
+                this.lockTreeMap.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("rollback exception", e);
+        }
+    }
+
+
+    public long getLastLockTimestamp() {
+        return lastLockTimestamp;
+    }
+
+
+    public void setLastLockTimestamp(long lastLockTimestamp) {
+        this.lastLockTimestamp = lastLockTimestamp;
+    }
+
+
+    public Lock getLockConsume() {
+        return lockConsume;
+    }
+
+
+    public long getLastPullTimestamp() {
+        return lastPullTimestamp;
+    }
+
+
+    public void setLastPullTimestamp(long lastPullTimestamp) {
+        this.lastPullTimestamp = lastPullTimestamp;
+    }
+
+
+    public long getMsgAccCnt() {
+        return msgAccCnt;
+    }
+
+
+    public void setMsgAccCnt(long msgAccCnt) {
+        this.msgAccCnt = msgAccCnt;
+    }
+
+
+    public long getTryUnlockTimes() {
+        return this.tryUnlockTimes.get();
+    }
+
+
+    public void incTryUnlockTimes() {
+        this.tryUnlockTimes.incrementAndGet();
+    }
+
+
+    public void fillProcessQueueInfo(final ProcessQueueInfo info) {
+        try {
+            this.lockTreeMap.readLock().lockInterruptibly();
+
+            if (!this.msgTreeMap.isEmpty()) {
+                info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
+                info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
+                info.setCachedMsgCount(this.msgTreeMap.size());
+            }
+
+            if (!this.msgTreeMapTemp.isEmpty()) {
+                info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
+                info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
+                info.setTransactionMsgCount(this.msgTreeMapTemp.size());
+            }
+
+            info.setLocked(this.locked);
+            info.setTryUnlockTimes(this.tryUnlockTimes.get());
+            info.setLastLockTimestamp(this.lastLockTimestamp);
+
+            info.setDroped(this.dropped);
+            info.setLastPullTimestamp(this.lastPullTimestamp);
+            info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
+        } catch (Exception e) {
+        } finally {
+            this.lockTreeMap.readLock().unlock();
+        }
+    }
+
+
+    public long getLastConsumeTimestamp() {
+        return lastConsumeTimestamp;
+    }
+
+
+    public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
+        this.lastConsumeTimestamp = lastConsumeTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
new file mode 100644
index 0000000..05aa8d1
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.FilterMessageContext;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullAPIWrapper {
+    private final Logger log = ClientLogger.getLog();
+    private final MQClientInstance mQClientFactory;
+    private final String consumerGroup;
+    private final boolean unitMode;
+    private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
+            new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
+    private volatile boolean connectBrokerByUser = false;
+    private volatile long defaultBrokerId = MixAll.MASTER_ID;
+    private Random random = new Random(System.currentTimeMillis());
+    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
+
+    public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {
+        this.mQClientFactory = mQClientFactory;
+        this.consumerGroup = consumerGroup;
+        this.unitMode = unitMode;
+    }
+
+    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
+                                        final SubscriptionData subscriptionData) {
+        PullResultExt pullResultExt = (PullResultExt) pullResult;
+
+        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
+        if (PullStatus.FOUND == pullResult.getPullStatus()) {
+            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
+            List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
+
+            List<MessageExt> msgListFilterAgain = msgList;
+            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
+                msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
+                for (MessageExt msg : msgList) {
+                    if (msg.getTags() != null) {
+                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
+                            msgListFilterAgain.add(msg);
+                        }
+                    }
+                }
+            }
+
+            if (this.hasHook()) {
+                FilterMessageContext filterMessageContext = new FilterMessageContext();
+                filterMessageContext.setUnitMode(unitMode);
+                filterMessageContext.setMsgList(msgListFilterAgain);
+                this.executeHook(filterMessageContext);
+            }
+
+            for (MessageExt msg : msgListFilterAgain) {
+                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
+                        Long.toString(pullResult.getMinOffset()));
+                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
+                        Long.toString(pullResult.getMaxOffset()));
+            }
+
+            pullResultExt.setMsgFoundList(msgListFilterAgain);
+        }
+
+        pullResultExt.setMessageBinary(null);
+
+        return pullResult;
+    }
+
+    public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
+        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
+        if (null == suggest) {
+            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
+        } else {
+            suggest.set(brokerId);
+        }
+    }
+
+    public boolean hasHook() {
+        return !this.filterMessageHookList.isEmpty();
+    }
+
+    public void executeHook(final FilterMessageContext context) {
+        if (!this.filterMessageHookList.isEmpty()) {
+            for (FilterMessageHook hook : this.filterMessageHookList) {
+                try {
+                    hook.filterMessage(context);
+                } catch (Throwable e) {
+                    log.error("execute hook error. hookName={}", hook.hookName());
+                }
+            }
+        }
+    }
+
+    public PullResult pullKernelImpl(
+            final MessageQueue mq,
+            final String subExpression,
+            final long subVersion,
+            final long offset,
+            final int maxNums,
+            final int sysFlag,
+            final long commitOffset,
+            final long brokerSuspendMaxTimeMillis,
+            final long timeoutMillis,
+            final CommunicationMode communicationMode,
+            final PullCallback pullCallback
+    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        FindBrokerResult findBrokerResult =
+                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+                        this.recalculatePullFromWhichNode(mq), false);
+        if (null == findBrokerResult) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            findBrokerResult =
+                    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+                            this.recalculatePullFromWhichNode(mq), false);
+        }
+
+        if (findBrokerResult != null) {
+            int sysFlagInner = sysFlag;
+
+            if (findBrokerResult.isSlave()) {
+                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
+            }
+
+            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+            requestHeader.setConsumerGroup(this.consumerGroup);
+            requestHeader.setTopic(mq.getTopic());
+            requestHeader.setQueueId(mq.getQueueId());
+            requestHeader.setQueueOffset(offset);
+            requestHeader.setMaxMsgNums(maxNums);
+            requestHeader.setSysFlag(sysFlagInner);
+            requestHeader.setCommitOffset(commitOffset);
+            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
+            requestHeader.setSubscription(subExpression);
+            requestHeader.setSubVersion(subVersion);
+
+            String brokerAddr = findBrokerResult.getBrokerAddr();
+            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
+                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
+            }
+
+            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
+                    brokerAddr,
+                    requestHeader,
+                    timeoutMillis,
+                    communicationMode,
+                    pullCallback);
+
+            return pullResult;
+        }
+
+        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+    }
+
+    public long recalculatePullFromWhichNode(final MessageQueue mq) {
+        if (this.isConnectBrokerByUser()) {
+            return this.defaultBrokerId;
+        }
+
+        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
+        if (suggest != null) {
+            return suggest.get();
+        }
+
+        return MixAll.MASTER_ID;
+    }
+
+    private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
+            throws MQClientException {
+        ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
+        if (topicRouteTable != null) {
+            TopicRouteData topicRouteData = topicRouteTable.get(topic);
+            List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
+
+            if (list != null && !list.isEmpty()) {
+                return list.get(randomNum() % list.size());
+            }
+        }
+
+        throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
+                + topic, null);
+    }
+
+    public boolean isConnectBrokerByUser() {
+        return connectBrokerByUser;
+    }
+
+    public int randomNum() {
+        int value = random.nextInt();
+        if (value < 0) {
+            value = Math.abs(value);
+            if (value < 0)
+                value = 0;
+        }
+        return value;
+    }
+
+    public void setConnectBrokerByUser(boolean connectBrokerByUser) {
+        this.connectBrokerByUser = connectBrokerByUser;
+
+    }
+
+    public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) {
+        this.filterMessageHookList = filterMessageHookList;
+    }
+
+    public long getDefaultBrokerId() {
+        return defaultBrokerId;
+    }
+
+    public void setDefaultBrokerId(long defaultBrokerId) {
+        this.defaultBrokerId = defaultBrokerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
new file mode 100644
index 0000000..9f79543
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.slf4j.Logger;
+
+import java.util.concurrent.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullMessageService extends ServiceThread {
+    private final Logger log = ClientLogger.getLog();
+    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
+    private final MQClientInstance mQClientFactory;
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newSingleThreadScheduledExecutor(new ThreadFactory() {
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, "PullMessageServiceScheduledThread");
+                }
+            });
+
+    public PullMessageService(MQClientInstance mQClientFactory) {
+        this.mQClientFactory = mQClientFactory;
+    }
+
+    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
+        this.scheduledExecutorService.schedule(new Runnable() {
+
+            @Override
+            public void run() {
+                PullMessageService.this.executePullRequestImmediately(pullRequest);
+            }
+        }, timeDelay, TimeUnit.MILLISECONDS);
+    }
+
+    public void executePullRequestImmediately(final PullRequest pullRequest) {
+        try {
+            this.pullRequestQueue.put(pullRequest);
+        } catch (InterruptedException e) {
+            log.error("executePullRequestImmediately pullRequestQueue.put", e);
+        }
+    }
+
+    public void executeTaskLater(final Runnable r, final long timeDelay) {
+        this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
+    }
+
+    public ScheduledExecutorService getScheduledExecutorService() {
+        return scheduledExecutorService;
+    }
+
+    private void pullMessage(final PullRequest pullRequest) {
+        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
+        if (consumer != null) {
+            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
+            impl.pullMessage(pullRequest);
+        } else {
+            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
+        }
+    }
+
+
+    @Override
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+
+        while (!this.isStopped()) {
+            try {
+                PullRequest pullRequest = this.pullRequestQueue.take();
+                if (pullRequest != null) {
+                    this.pullMessage(pullRequest);
+                }
+            } catch (InterruptedException e) {
+            } catch (Exception e) {
+                log.error("Pull Message Service Run Method exception", e);
+            }
+        }
+
+        log.info(this.getServiceName() + " service end");
+    }
+
+
+    @Override
+    public String getServiceName() {
+        return PullMessageService.class.getSimpleName();
+    }
+
+
+}