You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:30 UTC

[34/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
deleted file mode 100644
index 82c342f..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ /dev/null
@@ -1,1071 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.Validators;
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.PullCallback;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
-import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
-import com.alibaba.rocketmq.client.hook.ConsumeMessageHook;
-import com.alibaba.rocketmq.client.hook.FilterMessageHook;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.MQClientManager;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.ServiceState;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.filter.FilterAPI;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo;
-import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.common.protocol.route.BrokerData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import org.slf4j.Logger;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class DefaultMQPushConsumerImpl implements MQConsumerInner {
-    /**
-     * Delay some time when exception occur
-     */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
-    /**
-     * Flow control interval
-     */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
-    /**
-     * Delay some time when suspend pull service
-     */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
-    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
-    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
-    private final Logger log = ClientLogger.getLog();
-    private final DefaultMQPushConsumer defaultMQPushConsumer;
-    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
-    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
-    private final long consumerStartTimestamp = System.currentTimeMillis();
-    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
-    private final RPCHook rpcHook;
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
-    private MQClientInstance mQClientFactory;
-    private PullAPIWrapper pullAPIWrapper;
-    private volatile boolean pause = false;
-    private boolean consumeOrderly = false;
-    private MessageListener messageListenerInner;
-    private OffsetStore offsetStore;
-    private ConsumeMessageService consumeMessageService;
-    private long flowControlTimes1 = 0;
-    private long flowControlTimes2 = 0;
-
-
-    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
-        this.defaultMQPushConsumer = defaultMQPushConsumer;
-        this.rpcHook = rpcHook;
-    }
-
-    public void registerFilterMessageHook(final FilterMessageHook hook) {
-        this.filterMessageHookList.add(hook);
-        log.info("register FilterMessageHook Hook, {}", hook.hookName());
-    }
-
-    public boolean hasHook() {
-        return !this.consumeMessageHookList.isEmpty();
-    }
-
-    public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
-        this.consumeMessageHookList.add(hook);
-        log.info("register consumeMessageHook Hook, {}", hook.hookName());
-    }
-
-    public void executeHookBefore(final ConsumeMessageContext context) {
-        if (!this.consumeMessageHookList.isEmpty()) {
-            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
-                try {
-                    hook.consumeMessageBefore(context);
-                } catch (Throwable e) {
-                }
-            }
-        }
-    }
-
-    public void executeHookAfter(final ConsumeMessageContext context) {
-        if (!this.consumeMessageHookList.isEmpty()) {
-            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
-                try {
-                    hook.consumeMessageAfter(context);
-                } catch (Throwable e) {
-                }
-            }
-        }
-    }
-
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
-    }
-
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
-        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
-    }
-
-    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
-        Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
-        if (null == result) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-            result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
-        }
-
-        if (null == result) {
-            throw new MQClientException("The topic[" + topic + "] not exist", null);
-        }
-
-        return result;
-    }
-
-    public DefaultMQPushConsumer getDefaultMQPushConsumer() {
-        return defaultMQPushConsumer;
-    }
-
-    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
-        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
-    }
-
-    public long maxOffset(MessageQueue mq) throws MQClientException {
-        return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
-    }
-
-    public long minOffset(MessageQueue mq) throws MQClientException {
-        return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
-    }
-
-    public OffsetStore getOffsetStore() {
-        return offsetStore;
-    }
-
-    public void setOffsetStore(OffsetStore offsetStore) {
-        this.offsetStore = offsetStore;
-    }
-
-    public void pullMessage(final PullRequest pullRequest) {
-        final ProcessQueue processQueue = pullRequest.getProcessQueue();
-        if (processQueue.isDropped()) {
-            log.info("the pull request[{}] is dropped.", pullRequest.toString());
-            return;
-        }
-
-        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
-
-        try {
-            this.makeSureStateOK();
-        } catch (MQClientException e) {
-            log.warn("pullMessage exception, consumer state not ok", e);
-            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
-            return;
-        }
-
-        if (this.isPause()) {
-            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
-            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
-            return;
-        }
-
-        long size = processQueue.getMsgCount().get();
-        if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
-            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
-            if ((flowControlTimes1++ % 1000) == 0) {
-                log.warn(
-                        "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
-                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
-            }
-            return;
-        }
-
-        if (!this.consumeOrderly) {
-            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
-                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
-                if ((flowControlTimes2++ % 1000) == 0) {
-                    log.warn(
-                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
-                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
-                            pullRequest, flowControlTimes2);
-                }
-                return;
-            }
-        } else {
-            if (processQueue.isLocked()) {
-                if (!pullRequest.isLockedFirst()) {
-                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
-                    boolean brokerBusy = offset < pullRequest.getNextOffset();
-                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
-                            pullRequest, offset, brokerBusy);
-                    if (brokerBusy) {
-                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
-                                pullRequest, offset);
-                    }
-
-                    pullRequest.setLockedFirst(true);
-                    pullRequest.setNextOffset(offset);
-                }
-            } else {
-                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
-                log.info("pull message later because not locked in broker, {}", pullRequest);
-                return;
-            }
-        }
-
-        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
-        if (null == subscriptionData) {
-            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
-            log.warn("find the consumer's subscription failed, {}", pullRequest);
-            return;
-        }
-
-        final long beginTimestamp = System.currentTimeMillis();
-
-        PullCallback pullCallback = new PullCallback() {
-            @Override
-            public void onSuccess(PullResult pullResult) {
-                if (pullResult != null) {
-                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
-                            subscriptionData);
-
-                    switch (pullResult.getPullStatus()) {
-                        case FOUND:
-                            long prevRequestOffset = pullRequest.getNextOffset();
-                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-                            long pullRT = System.currentTimeMillis() - beginTimestamp;
-                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
-                                    pullRequest.getMessageQueue().getTopic(), pullRT);
-
-                            long firstMsgOffset = Long.MAX_VALUE;
-                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
-                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
-                            } else {
-                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
-
-                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
-                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
-
-                                boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
-                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
-                                        pullResult.getMsgFoundList(), //
-                                        processQueue, //
-                                        pullRequest.getMessageQueue(), //
-                                        dispathToConsume);
-
-                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
-                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
-                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
-                                } else {
-                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
-                                }
-                            }
-
-                            if (pullResult.getNextBeginOffset() < prevRequestOffset//
-                                    || firstMsgOffset < prevRequestOffset) {
-                                log.warn(
-                                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
-                                        pullResult.getNextBeginOffset(), //
-                                        firstMsgOffset, //
-                                        prevRequestOffset);
-                            }
-
-                            break;
-                        case NO_NEW_MSG:
-                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
-                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
-
-                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
-                            break;
-                        case NO_MATCHED_MSG:
-                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
-                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
-
-                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
-                            break;
-                        case OFFSET_ILLEGAL:
-                            log.warn("the pull request offset illegal, {} {}", //
-                                    pullRequest.toString(), pullResult.toString());
-                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
-                            pullRequest.getProcessQueue().setDropped(true);
-                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
-
-                                @Override
-                                public void run() {
-                                    try {
-                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
-                                                pullRequest.getNextOffset(), false);
-
-                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
-
-                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
-
-                                        log.warn("fix the pull request offset, {}", pullRequest);
-                                    } catch (Throwable e) {
-                                        log.error("executeTaskLater Exception", e);
-                                    }
-                                }
-                            }, 10000);
-                            break;
-                        default:
-                            break;
-                    }
-                }
-            }
-
-
-            @Override
-            public void onException(Throwable e) {
-                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                    log.warn("execute the pull request exception", e);
-                }
-
-                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
-            }
-        };
-
-        boolean commitOffsetEnable = false;
-        long commitOffsetValue = 0L;
-        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
-            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
-            if (commitOffsetValue > 0) {
-                commitOffsetEnable = true;
-            }
-        }
-
-        String subExpression = null;
-        boolean classFilter = false;
-        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
-        if (sd != null) {
-            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
-                subExpression = sd.getSubString();
-            }
-
-            classFilter = sd.isClassFilterMode();
-        }
-
-        int sysFlag = PullSysFlag.buildSysFlag(//
-                commitOffsetEnable, // commitOffset
-                true, // suspend
-                subExpression != null, // subscription
-                classFilter // class filter
-        );
-        try {
-            this.pullAPIWrapper.pullKernelImpl(//
-                    pullRequest.getMessageQueue(), // 1
-                    subExpression, // 2
-                    subscriptionData.getSubVersion(), // 3
-                    pullRequest.getNextOffset(), // 4
-                    this.defaultMQPushConsumer.getPullBatchSize(), // 5
-                    sysFlag, // 6
-                    commitOffsetValue, // 7
-                    BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
-                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
-                    CommunicationMode.ASYNC, // 10
-                    pullCallback// 11
-            );
-        } catch (Exception e) {
-            log.error("pullKernelImpl exception", e);
-            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
-        }
-    }
-
-    private void makeSureStateOK() throws MQClientException {
-        if (this.serviceState != ServiceState.RUNNING) {
-            throw new MQClientException("The consumer service state not OK, "//
-                    + this.serviceState//
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                    null);
-        }
-    }
-
-    private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
-        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
-    }
-
-    public boolean isPause() {
-        return pause;
-    }
-
-    public void setPause(boolean pause) {
-        this.pause = pause;
-    }
-
-    public ConsumerStatsManager getConsumerStatsManager() {
-        return this.mQClientFactory.getConsumerStatsManager();
-    }
-
-    public void executePullRequestImmediately(final PullRequest pullRequest) {
-        this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
-    }
-
-    private void correctTagsOffset(final PullRequest pullRequest) {
-        if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
-            this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
-        }
-    }
-
-    public void executeTaskLater(final Runnable r, final long timeDelay) {
-        this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay);
-    }
-
-    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
-        return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
-    }
-
-    public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException,
-            InterruptedException {
-        return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
-    }
-
-
-    public void registerMessageListener(MessageListener messageListener) {
-        this.messageListenerInner = messageListener;
-    }
-
-    public void resume() {
-        this.pause = false;
-        doRebalance();
-        log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
-    }
-
-    public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        try {
-            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
-                    : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
-            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
-                    this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
-        } catch (Exception e) {
-            log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
-
-            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
-
-            String originMsgId = MessageAccessor.getOriginMessageId(msg);
-            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
-
-            newMsg.setFlag(msg.getFlag());
-            MessageAccessor.setProperties(newMsg, msg.getProperties());
-            MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
-            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
-            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
-            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
-
-            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
-        }
-    }
-
-    private int getMaxReconsumeTimes() {
-        // default reconsume times: 16
-        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
-            return 16;
-        } else {
-            return this.defaultMQPushConsumer.getMaxReconsumeTimes();
-        }
-    }
-
-    public void shutdown() {
-        switch (this.serviceState) {
-            case CREATE_JUST:
-                break;
-            case RUNNING:
-                this.consumeMessageService.shutdown();
-                this.persistConsumerOffset();
-                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
-                this.mQClientFactory.shutdown();
-                log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
-                this.rebalanceImpl.destroy();
-                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
-                break;
-            case SHUTDOWN_ALREADY:
-                break;
-            default:
-                break;
-        }
-    }
-
-    public void start() throws MQClientException {
-        switch (this.serviceState) {
-            case CREATE_JUST:
-                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
-                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
-                this.serviceState = ServiceState.START_FAILED;
-
-                this.checkConfig();
-
-                this.copySubscription();
-
-                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
-                    this.defaultMQPushConsumer.changeInstanceNameToPID();
-                }
-
-                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
-
-                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
-                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
-                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
-                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
-
-                this.pullAPIWrapper = new PullAPIWrapper(
-                        mQClientFactory,
-                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
-                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
-
-                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
-                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
-                } else {
-                    switch (this.defaultMQPushConsumer.getMessageModel()) {
-                        case BROADCASTING:
-                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
-                            break;
-                        case CLUSTERING:
-                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
-                            break;
-                        default:
-                            break;
-                    }
-                }
-                this.offsetStore.load();
-
-                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
-                    this.consumeOrderly = true;
-                    this.consumeMessageService =
-                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
-                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
-                    this.consumeOrderly = false;
-                    this.consumeMessageService =
-                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
-                }
-
-                this.consumeMessageService.start();
-
-                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
-                if (!registerOK) {
-                    this.serviceState = ServiceState.CREATE_JUST;
-                    this.consumeMessageService.shutdown();
-                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
-                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
-                            null);
-                }
-
-                mQClientFactory.start();
-                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
-                this.serviceState = ServiceState.RUNNING;
-                break;
-            case RUNNING:
-            case START_FAILED:
-            case SHUTDOWN_ALREADY:
-                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
-                        + this.serviceState//
-                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                        null);
-            default:
-                break;
-        }
-
-        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-
-        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
-        this.mQClientFactory.rebalanceImmediately();
-    }
-
-    private void checkConfig() throws MQClientException {
-        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
-
-        if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
-            throw new MQClientException(
-                    "consumerGroup is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
-            throw new MQClientException(
-                    "consumerGroup can not equal "
-                            + MixAll.DEFAULT_CONSUMER_GROUP
-                            + ", please specify another one."
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        if (null == this.defaultMQPushConsumer.getMessageModel()) {
-            throw new MQClientException(
-                    "messageModel is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
-            throw new MQClientException(
-                    "consumeFromWhere is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS);
-        if (null == dt) {
-            throw new MQClientException(
-                    "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // allocateMessageQueueStrategy
-        if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
-            throw new MQClientException(
-                    "allocateMessageQueueStrategy is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // subscription
-        if (null == this.defaultMQPushConsumer.getSubscription()) {
-            throw new MQClientException(
-                    "subscription is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // messageListener
-        if (null == this.defaultMQPushConsumer.getMessageListener()) {
-            throw new MQClientException(
-                    "messageListener is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
-        boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
-        if (!orderly && !concurrently) {
-            throw new MQClientException(
-                    "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // consumeThreadMin
-        if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
-                || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
-                || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
-            throw new MQClientException(
-                    "consumeThreadMin Out of range [1, 1000]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // consumeThreadMax
-        if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
-            throw new MQClientException(
-                    "consumeThreadMax Out of range [1, 1000]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // consumeConcurrentlyMaxSpan
-        if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
-                || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
-            throw new MQClientException(
-                    "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // pullThresholdForQueue
-        if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
-            throw new MQClientException(
-                    "pullThresholdForQueue Out of range [1, 65535]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // pullInterval
-        if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
-            throw new MQClientException(
-                    "pullInterval Out of range [0, 65535]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // consumeMessageBatchMaxSize
-        if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
-                || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
-            throw new MQClientException(
-                    "consumeMessageBatchMaxSize Out of range [1, 1024]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-
-        // pullBatchSize
-        if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
-            throw new MQClientException(
-                    "pullBatchSize Out of range [1, 1024]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
-        }
-    }
-
-    private void copySubscription() throws MQClientException {
-        try {
-            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
-            if (sub != null) {
-                for (final Map.Entry<String, String> entry : sub.entrySet()) {
-                    final String topic = entry.getKey();
-                    final String subString = entry.getValue();
-                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
-                            topic, subString);
-                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
-                }
-            }
-
-            if (null == this.messageListenerInner) {
-                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
-            }
-
-            switch (this.defaultMQPushConsumer.getMessageModel()) {
-                case BROADCASTING:
-                    break;
-                case CLUSTERING:
-                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
-                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
-                            retryTopic, SubscriptionData.SUB_ALL);
-                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
-                    break;
-                default:
-                    break;
-            }
-        } catch (Exception e) {
-            throw new MQClientException("subscription exception", e);
-        }
-    }
-
-    public MessageListener getMessageListenerInner() {
-        return messageListenerInner;
-    }
-
-    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
-        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
-        if (subTable != null) {
-            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
-                final String topic = entry.getKey();
-                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-            }
-        }
-    }
-
-    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
-        return this.rebalanceImpl.getSubscriptionInner();
-    }
-
-    public void subscribe(String topic, String subExpression) throws MQClientException {
-        try {
-            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
-                    topic, subExpression);
-            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
-            if (this.mQClientFactory != null) {
-                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-            }
-        } catch (Exception e) {
-            throw new MQClientException("subscription exception", e);
-        }
-    }
-
-    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
-        try {
-            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
-                    topic, "*");
-            subscriptionData.setSubString(fullClassName);
-            subscriptionData.setClassFilterMode(true);
-            subscriptionData.setFilterClassSource(filterClassSource);
-            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
-            if (this.mQClientFactory != null) {
-                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-            }
-
-        } catch (Exception e) {
-            throw new MQClientException("subscription exception", e);
-        }
-    }
-
-    public void suspend() {
-        this.pause = true;
-        log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
-    }
-
-    public void unsubscribe(String topic) {
-        this.rebalanceImpl.getSubscriptionInner().remove(topic);
-    }
-
-    public void updateConsumeOffset(MessageQueue mq, long offset) {
-        this.offsetStore.updateOffset(mq, offset, false);
-    }
-
-    public void updateCorePoolSize(int corePoolSize) {
-        this.consumeMessageService.updateCorePoolSize(corePoolSize);
-    }
-
-    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
-    }
-
-    public RebalanceImpl getRebalanceImpl() {
-        return rebalanceImpl;
-    }
-
-    public boolean isConsumeOrderly() {
-        return consumeOrderly;
-    }
-
-    public void setConsumeOrderly(boolean consumeOrderly) {
-        this.consumeOrderly = consumeOrderly;
-    }
-
-    public void resetOffsetByTimeStamp(long timeStamp)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) {
-            Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
-            Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
-            if (mqs != null) {
-                for (MessageQueue mq : mqs) {
-                    long offset = searchOffset(mq, timeStamp);
-                    offsetTable.put(mq, offset);
-                }
-                this.mQClientFactory.resetOffset(topic, groupName(), offsetTable);
-            }
-        }
-    }
-
-    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
-        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
-    }
-
-    @Override
-    public String groupName() {
-        return this.defaultMQPushConsumer.getConsumerGroup();
-    }
-
-    @Override
-    public MessageModel messageModel() {
-        return this.defaultMQPushConsumer.getMessageModel();
-    }
-
-    @Override
-    public ConsumeType consumeType() {
-        return ConsumeType.CONSUME_PASSIVELY;
-    }
-
-    @Override
-    public ConsumeFromWhere consumeFromWhere() {
-        return this.defaultMQPushConsumer.getConsumeFromWhere();
-    }
-
-    @Override
-    public Set<SubscriptionData> subscriptions() {
-        Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
-
-        subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
-
-        return subSet;
-    }
-
-    @Override
-    public void doRebalance() {
-        if (this.rebalanceImpl != null && !this.pause) {
-            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
-        }
-    }
-
-    @Override
-    public void persistConsumerOffset() {
-        try {
-            this.makeSureStateOK();
-            Set<MessageQueue> mqs = new HashSet<MessageQueue>();
-            Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
-            if (allocateMq != null) {
-                mqs.addAll(allocateMq);
-            }
-
-            this.offsetStore.persistAll(mqs);
-        } catch (Exception e) {
-            log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
-        }
-    }
-
-    @Override
-    public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
-        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
-        if (subTable != null) {
-            if (subTable.containsKey(topic)) {
-                this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
-            }
-        }
-    }
-
-    @Override
-    public boolean isSubscribeTopicNeedUpdate(String topic) {
-        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
-        if (subTable != null) {
-            if (subTable.containsKey(topic)) {
-                return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
-            }
-        }
-
-        return false;
-    }
-
-    @Override
-    public boolean isUnitMode() {
-        return this.defaultMQPushConsumer.isUnitMode();
-    }
-
-    @Override
-    public ConsumerRunningInfo consumerRunningInfo() {
-        ConsumerRunningInfo info = new ConsumerRunningInfo();
-
-        Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);
-
-        prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));
-        prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize()));
-        prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
-
-        info.setProperties(prop);
-
-        Set<SubscriptionData> subSet = this.subscriptions();
-        info.getSubscriptionSet().addAll(subSet);
-
-        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<MessageQueue, ProcessQueue> next = it.next();
-            MessageQueue mq = next.getKey();
-            ProcessQueue pq = next.getValue();
-
-            ProcessQueueInfo pqinfo = new ProcessQueueInfo();
-            pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
-            pq.fillProcessQueueInfo(pqinfo);
-            info.getMqTable().put(mq, pqinfo);
-        }
-
-        for (SubscriptionData sd : subSet) {
-            ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());
-            info.getStatusTable().put(sd.getTopic(), consumeStatus);
-        }
-
-        return info;
-    }
-
-    public MQClientInstance getmQClientFactory() {
-        return mQClientFactory;
-    }
-
-    public void setmQClientFactory(MQClientInstance mQClientFactory) {
-        this.mQClientFactory = mQClientFactory;
-    }
-
-    public ServiceState getServiceState() {
-        return serviceState;
-    }
-
-    public void setServiceState(ServiceState serviceState) {
-        this.serviceState = serviceState;
-    }
-
-    public void adjustThreadPool() {
-        long computeAccTotal = this.computeAccumulationTotal();
-        long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
-
-        long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
-
-        long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
-
-        if (computeAccTotal >= incThreshold) {
-            this.consumeMessageService.incCorePoolSize();
-        }
-
-        if (computeAccTotal < decThreshold) {
-            this.consumeMessageService.decCorePoolSize();
-        }
-    }
-
-    private long computeAccumulationTotal() {
-        long msgAccTotal = 0;
-        ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
-        Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<MessageQueue, ProcessQueue> next = it.next();
-            ProcessQueue value = next.getValue();
-            msgAccTotal += value.getMsgAccCnt();
-        }
-
-        return msgAccTotal;
-    }
-
-    public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic)
-            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
-        TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000);
-        for (BrokerData brokerData : routeData.getBrokerDatas()) {
-            String addr = brokerData.selectBrokerAddr();
-            queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, groupName(), 3000));
-        }
-
-        return queueTimeSpan;
-    }
-
-
-    public ConsumeMessageService getConsumeMessageService() {
-        return consumeMessageService;
-    }
-
-
-    public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
-        this.consumeMessageService = consumeMessageService;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java
deleted file mode 100644
index 1ff430b..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-
-import java.util.Set;
-
-
-/**
- * Consumer inner interface
- *
- * @author shijia.wxr
- */
-public interface MQConsumerInner {
-    String groupName();
-
-
-    MessageModel messageModel();
-
-
-    ConsumeType consumeType();
-
-
-    ConsumeFromWhere consumeFromWhere();
-
-
-    Set<SubscriptionData> subscriptions();
-
-
-    void doRebalance();
-
-
-    void persistConsumerOffset();
-
-
-    void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info);
-
-
-    boolean isSubscribeTopicNeedUpdate(final String topic);
-
-
-    boolean isUnitMode();
-
-
-    ConsumerRunningInfo consumerRunningInfo();
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java
deleted file mode 100644
index 9de7ac0..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Message lock,strictly ensure the single queue only one thread at a time consuming
- *
- * @author shijia.wxr
- */
-public class MessageQueueLock {
-    private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
-            new ConcurrentHashMap<MessageQueue, Object>();
-
-
-    public Object fetchLockObject(final MessageQueue mq) {
-        Object objLock = this.mqLockTable.get(mq);
-        if (null == objLock) {
-            objLock = new Object();
-            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
-            if (prevLock != null) {
-                objLock = prevLock;
-            }
-        }
-
-        return objLock;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java
deleted file mode 100644
index 05ffeb7..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-
-/**
- * Queue consumption snapshot
- *
- * @author shijia.wxr
- */
-public class ProcessQueue {
-    public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
-            Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
-    public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
-    private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
-    private final Logger log = ClientLogger.getLog();
-    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
-    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
-    private final AtomicLong msgCount = new AtomicLong();
-    private final Lock lockConsume = new ReentrantLock();
-    private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
-    private final AtomicLong tryUnlockTimes = new AtomicLong(0);
-    private volatile long queueOffsetMax = 0L;
-    private volatile boolean dropped = false;
-    private volatile long lastPullTimestamp = System.currentTimeMillis();
-    private volatile long lastConsumeTimestamp = System.currentTimeMillis();
-    private volatile boolean locked = false;
-    private volatile long lastLockTimestamp = System.currentTimeMillis();
-    private volatile boolean consuming = false;
-    private volatile long msgAccCnt = 0;
-
-    public boolean isLockExpired() {
-        boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
-        return result;
-    }
-
-
-    public boolean isPullExpired() {
-        boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
-        return result;
-    }
-
-    /**
-
-     *
-     * @param pushConsumer
-     */
-    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
-        if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
-            return;
-        }
-        
-        int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
-        for (int i = 0; i < loop; i++) {
-            MessageExt msg = null;
-            try {
-                this.lockTreeMap.readLock().lockInterruptibly();
-                try {
-                    if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
-                        msg = msgTreeMap.firstEntry().getValue();
-                    } else {
-
-                        break;
-                    }
-                } finally {
-                    this.lockTreeMap.readLock().unlock();
-                }
-            } catch (InterruptedException e) {
-                log.error("getExpiredMsg exception", e);
-            }
-
-            try {
-
-                pushConsumer.sendMessageBack(msg, 3);
-                log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
-                try {
-                    this.lockTreeMap.writeLock().lockInterruptibly();
-                    try {
-                        if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
-                            try {
-                                msgTreeMap.remove(msgTreeMap.firstKey());
-                            } catch (Exception e) {
-                                log.error("send expired msg exception", e);
-                            }
-                        }
-                    } finally {
-                        this.lockTreeMap.writeLock().unlock();
-                    }
-                } catch (InterruptedException e) {
-                    log.error("getExpiredMsg exception", e);
-                }
-            } catch (Exception e) {
-                log.error("send expired msg exception", e);
-            }
-        }
-    }
-
-
-    public boolean putMessage(final List<MessageExt> msgs) {
-        boolean dispatchToConsume = false;
-        try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
-            try {
-                int validMsgCnt = 0;
-                for (MessageExt msg : msgs) {
-                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
-                    if (null == old) {
-                        validMsgCnt++;
-                        this.queueOffsetMax = msg.getQueueOffset();
-                    }
-                }
-                msgCount.addAndGet(validMsgCnt);
-
-                if (!msgTreeMap.isEmpty() && !this.consuming) {
-                    dispatchToConsume = true;
-                    this.consuming = true;
-                }
-
-                if (!msgs.isEmpty()) {
-                    MessageExt messageExt = msgs.get(msgs.size() - 1);
-                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
-                    if (property != null) {
-                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
-                        if (accTotal > 0) {
-                            this.msgAccCnt = accTotal;
-                        }
-                    }
-                }
-            } finally {
-                this.lockTreeMap.writeLock().unlock();
-            }
-        } catch (InterruptedException e) {
-            log.error("putMessage exception", e);
-        }
-
-        return dispatchToConsume;
-    }
-
-
-    public long getMaxSpan() {
-        try {
-            this.lockTreeMap.readLock().lockInterruptibly();
-            try {
-                if (!this.msgTreeMap.isEmpty()) {
-                    return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
-                }
-            } finally {
-                this.lockTreeMap.readLock().unlock();
-            }
-        } catch (InterruptedException e) {
-            log.error("getMaxSpan exception", e);
-        }
-
-        return 0;
-    }
-
-
-    public long removeMessage(final List<MessageExt> msgs) {
-        long result = -1;
-        final long now = System.currentTimeMillis();
-        try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
-            this.lastConsumeTimestamp = now;
-            try {
-                if (!msgTreeMap.isEmpty()) {
-                    result = this.queueOffsetMax + 1;
-                    int removedCnt = 0;
-                    for (MessageExt msg : msgs) {
-                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
-                        if (prev != null) {
-                            removedCnt--;
-                        }
-                    }
-                    msgCount.addAndGet(removedCnt);
-
-                    if (!msgTreeMap.isEmpty()) {
-                        result = msgTreeMap.firstKey();
-                    }
-                }
-            } finally {
-                this.lockTreeMap.writeLock().unlock();
-            }
-        } catch (Throwable t) {
-            log.error("removeMessage exception", t);
-        }
-
-        return result;
-    }
-
-
-    public TreeMap<Long, MessageExt> getMsgTreeMap() {
-        return msgTreeMap;
-    }
-
-
-    public AtomicLong getMsgCount() {
-        return msgCount;
-    }
-
-
-    public boolean isDropped() {
-        return dropped;
-    }
-
-
-    public void setDropped(boolean dropped) {
-        this.dropped = dropped;
-    }
-
-    public boolean isLocked() {
-        return locked;
-    }
-
-    public void setLocked(boolean locked) {
-        this.locked = locked;
-    }
-
-    public void rollback() {
-        try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
-            try {
-                this.msgTreeMap.putAll(this.msgTreeMapTemp);
-                this.msgTreeMapTemp.clear();
-            } finally {
-                this.lockTreeMap.writeLock().unlock();
-            }
-        } catch (InterruptedException e) {
-            log.error("rollback exception", e);
-        }
-    }
-
-
-    public long commit() {
-        try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
-            try {
-                Long offset = this.msgTreeMapTemp.lastKey();
-                msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
-                this.msgTreeMapTemp.clear();
-                if (offset != null) {
-                    return offset + 1;
-                }
-            } finally {
-                this.lockTreeMap.writeLock().unlock();
-            }
-        } catch (InterruptedException e) {
-            log.error("commit exception", e);
-        }
-
-        return -1;
-    }
-
-
-    public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
-        try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
-            try {
-                for (MessageExt msg : msgs) {
-                    this.msgTreeMapTemp.remove(msg.getQueueOffset());
-                    this.msgTreeMap.put(msg.getQueueOffset(), msg);
-                }
-            } finally {
-                this.lockTreeMap.writeLock().unlock();
-            }
-        } catch (InterruptedException e) {
-            log.error("makeMessageToCosumeAgain exception", e);
-        }
-    }
-
-
-    public List<MessageExt> takeMessags(final int batchSize) {
-        List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
-        final long now = System.currentTimeMillis();
-        try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
-            this.lastConsumeTimestamp = now;
-            try {
-                if (!this.msgTreeMap.isEmpty()) {
-                    for (int i = 0; i < batchSize; i++) {
-                        Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
-                        if (entry != null) {
-                            result.add(entry.getValue());
-                            msgTreeMapTemp.put(entry.getKey(), entry.getValue());
-                        } else {
-                            break;
-                        }
-                    }
-                }
-
-                if (result.isEmpty()) {
-                    consuming = false;
-                }
-            } finally {
-                this.lockTreeMap.writeLock().unlock();
-            }
-        } catch (InterruptedException e) {
-            log.error("take Messages exception", e);
-        }
-
-        return result;
-    }
-
-
-    public boolean hasTempMessage() {
-        try {
-            this.lockTreeMap.readLock().lockInterruptibly();
-            try {
-                return !this.msgTreeMap.isEmpty();
-            } finally {
-                this.lockTreeMap.readLock().unlock();
-            }
-        } catch (InterruptedException e) {
-        }
-
-        return true;
-    }
-
-
-    public void clear() {
-        try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
-            try {
-                this.msgTreeMap.clear();
-                this.msgTreeMapTemp.clear();
-                this.msgCount.set(0);
-                this.queueOffsetMax = 0L;
-            } finally {
-                this.lockTreeMap.writeLock().unlock();
-            }
-        } catch (InterruptedException e) {
-            log.error("rollback exception", e);
-        }
-    }
-
-
-    public long getLastLockTimestamp() {
-        return lastLockTimestamp;
-    }
-
-
-    public void setLastLockTimestamp(long lastLockTimestamp) {
-        this.lastLockTimestamp = lastLockTimestamp;
-    }
-
-
-    public Lock getLockConsume() {
-        return lockConsume;
-    }
-
-
-    public long getLastPullTimestamp() {
-        return lastPullTimestamp;
-    }
-
-
-    public void setLastPullTimestamp(long lastPullTimestamp) {
-        this.lastPullTimestamp = lastPullTimestamp;
-    }
-
-
-    public long getMsgAccCnt() {
-        return msgAccCnt;
-    }
-
-
-    public void setMsgAccCnt(long msgAccCnt) {
-        this.msgAccCnt = msgAccCnt;
-    }
-
-
-    public long getTryUnlockTimes() {
-        return this.tryUnlockTimes.get();
-    }
-
-
-    public void incTryUnlockTimes() {
-        this.tryUnlockTimes.incrementAndGet();
-    }
-
-
-    public void fillProcessQueueInfo(final ProcessQueueInfo info) {
-        try {
-            this.lockTreeMap.readLock().lockInterruptibly();
-
-            if (!this.msgTreeMap.isEmpty()) {
-                info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
-                info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
-                info.setCachedMsgCount(this.msgTreeMap.size());
-            }
-
-            if (!this.msgTreeMapTemp.isEmpty()) {
-                info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
-                info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
-                info.setTransactionMsgCount(this.msgTreeMapTemp.size());
-            }
-
-            info.setLocked(this.locked);
-            info.setTryUnlockTimes(this.tryUnlockTimes.get());
-            info.setLastLockTimestamp(this.lastLockTimestamp);
-
-            info.setDroped(this.dropped);
-            info.setLastPullTimestamp(this.lastPullTimestamp);
-            info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
-        } catch (Exception e) {
-        } finally {
-            this.lockTreeMap.readLock().unlock();
-        }
-    }
-
-
-    public long getLastConsumeTimestamp() {
-        return lastConsumeTimestamp;
-    }
-
-
-    public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
-        this.lastConsumeTimestamp = lastConsumeTimestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java
deleted file mode 100644
index 730b090..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.PullCallback;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.PullStatus;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.FilterMessageContext;
-import com.alibaba.rocketmq.client.hook.FilterMessageHook;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.FindBrokerResult;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullAPIWrapper {
-    private final Logger log = ClientLogger.getLog();
-    private final MQClientInstance mQClientFactory;
-    private final String consumerGroup;
-    private final boolean unitMode;
-    private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
-            new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
-    private volatile boolean connectBrokerByUser = false;
-    private volatile long defaultBrokerId = MixAll.MASTER_ID;
-    private Random random = new Random(System.currentTimeMillis());
-    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
-
-    public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {
-        this.mQClientFactory = mQClientFactory;
-        this.consumerGroup = consumerGroup;
-        this.unitMode = unitMode;
-    }
-
-    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
-                                        final SubscriptionData subscriptionData) {
-        PullResultExt pullResultExt = (PullResultExt) pullResult;
-
-        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
-        if (PullStatus.FOUND == pullResult.getPullStatus()) {
-            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
-            List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
-
-            List<MessageExt> msgListFilterAgain = msgList;
-            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
-                msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
-                for (MessageExt msg : msgList) {
-                    if (msg.getTags() != null) {
-                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
-                            msgListFilterAgain.add(msg);
-                        }
-                    }
-                }
-            }
-
-            if (this.hasHook()) {
-                FilterMessageContext filterMessageContext = new FilterMessageContext();
-                filterMessageContext.setUnitMode(unitMode);
-                filterMessageContext.setMsgList(msgListFilterAgain);
-                this.executeHook(filterMessageContext);
-            }
-
-            for (MessageExt msg : msgListFilterAgain) {
-                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
-                        Long.toString(pullResult.getMinOffset()));
-                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
-                        Long.toString(pullResult.getMaxOffset()));
-            }
-
-            pullResultExt.setMsgFoundList(msgListFilterAgain);
-        }
-
-        pullResultExt.setMessageBinary(null);
-
-        return pullResult;
-    }
-
-    public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
-        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
-        if (null == suggest) {
-            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
-        } else {
-            suggest.set(brokerId);
-        }
-    }
-
-    public boolean hasHook() {
-        return !this.filterMessageHookList.isEmpty();
-    }
-
-    public void executeHook(final FilterMessageContext context) {
-        if (!this.filterMessageHookList.isEmpty()) {
-            for (FilterMessageHook hook : this.filterMessageHookList) {
-                try {
-                    hook.filterMessage(context);
-                } catch (Throwable e) {
-                    log.error("execute hook error. hookName={}", hook.hookName());
-                }
-            }
-        }
-    }
-
-    public PullResult pullKernelImpl(
-            final MessageQueue mq,
-            final String subExpression,
-            final long subVersion,
-            final long offset,
-            final int maxNums,
-            final int sysFlag,
-            final long commitOffset,
-            final long brokerSuspendMaxTimeMillis,
-            final long timeoutMillis,
-            final CommunicationMode communicationMode,
-            final PullCallback pullCallback
-    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        FindBrokerResult findBrokerResult =
-                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
-                        this.recalculatePullFromWhichNode(mq), false);
-        if (null == findBrokerResult) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult =
-                    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
-                            this.recalculatePullFromWhichNode(mq), false);
-        }
-
-        if (findBrokerResult != null) {
-            int sysFlagInner = sysFlag;
-
-            if (findBrokerResult.isSlave()) {
-                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
-            }
-
-            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
-            requestHeader.setConsumerGroup(this.consumerGroup);
-            requestHeader.setTopic(mq.getTopic());
-            requestHeader.setQueueId(mq.getQueueId());
-            requestHeader.setQueueOffset(offset);
-            requestHeader.setMaxMsgNums(maxNums);
-            requestHeader.setSysFlag(sysFlagInner);
-            requestHeader.setCommitOffset(commitOffset);
-            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
-            requestHeader.setSubscription(subExpression);
-            requestHeader.setSubVersion(subVersion);
-
-            String brokerAddr = findBrokerResult.getBrokerAddr();
-            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
-                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
-            }
-
-            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
-                    brokerAddr,
-                    requestHeader,
-                    timeoutMillis,
-                    communicationMode,
-                    pullCallback);
-
-            return pullResult;
-        }
-
-        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-    }
-
-    public long recalculatePullFromWhichNode(final MessageQueue mq) {
-        if (this.isConnectBrokerByUser()) {
-            return this.defaultBrokerId;
-        }
-
-        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
-        if (suggest != null) {
-            return suggest.get();
-        }
-
-        return MixAll.MASTER_ID;
-    }
-
-    private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
-            throws MQClientException {
-        ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
-        if (topicRouteTable != null) {
-            TopicRouteData topicRouteData = topicRouteTable.get(topic);
-            List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
-
-            if (list != null && !list.isEmpty()) {
-                return list.get(randomNum() % list.size());
-            }
-        }
-
-        throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
-                + topic, null);
-    }
-
-    public boolean isConnectBrokerByUser() {
-        return connectBrokerByUser;
-    }
-
-    public int randomNum() {
-        int value = random.nextInt();
-        if (value < 0) {
-            value = Math.abs(value);
-            if (value < 0)
-                value = 0;
-        }
-        return value;
-    }
-
-    public void setConnectBrokerByUser(boolean connectBrokerByUser) {
-        this.connectBrokerByUser = connectBrokerByUser;
-
-    }
-
-    public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) {
-        this.filterMessageHookList = filterMessageHookList;
-    }
-
-    public long getDefaultBrokerId() {
-        return defaultBrokerId;
-    }
-
-    public void setDefaultBrokerId(long defaultBrokerId) {
-        this.defaultBrokerId = defaultBrokerId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java
deleted file mode 100644
index 161a039..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.ServiceThread;
-import org.slf4j.Logger;
-
-import java.util.concurrent.*;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullMessageService extends ServiceThread {
-    private final Logger log = ClientLogger.getLog();
-    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
-    private final MQClientInstance mQClientFactory;
-    private final ScheduledExecutorService scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "PullMessageServiceScheduledThread");
-                }
-            });
-
-    public PullMessageService(MQClientInstance mQClientFactory) {
-        this.mQClientFactory = mQClientFactory;
-    }
-
-    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
-        this.scheduledExecutorService.schedule(new Runnable() {
-
-            @Override
-            public void run() {
-                PullMessageService.this.executePullRequestImmediately(pullRequest);
-            }
-        }, timeDelay, TimeUnit.MILLISECONDS);
-    }
-
-    public void executePullRequestImmediately(final PullRequest pullRequest) {
-        try {
-            this.pullRequestQueue.put(pullRequest);
-        } catch (InterruptedException e) {
-            log.error("executePullRequestImmediately pullRequestQueue.put", e);
-        }
-    }
-
-    public void executeTaskLater(final Runnable r, final long timeDelay) {
-        this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
-    }
-
-    public ScheduledExecutorService getScheduledExecutorService() {
-        return scheduledExecutorService;
-    }
-
-    private void pullMessage(final PullRequest pullRequest) {
-        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
-        if (consumer != null) {
-            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
-            impl.pullMessage(pullRequest);
-        } else {
-            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
-        }
-    }
-
-
-    @Override
-    public void run() {
-        log.info(this.getServiceName() + " service started");
-
-        while (!this.isStopped()) {
-            try {
-                PullRequest pullRequest = this.pullRequestQueue.take();
-                if (pullRequest != null) {
-                    this.pullMessage(pullRequest);
-                }
-            } catch (InterruptedException e) {
-            } catch (Exception e) {
-                log.error("Pull Message Service Run Method exception", e);
-            }
-        }
-
-        log.info(this.getServiceName() + " service end");
-    }
-
-
-    @Override
-    public String getServiceName() {
-        return PullMessageService.class.getSimpleName();
-    }
-
-
-}