You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:52 UTC

[06/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp code

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/DefaultMQPushConsumer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/DefaultMQPushConsumer.cpp b/rocketmq-cpp/src/consumer/DefaultMQPushConsumer.cpp
new file mode 100755
index 0000000..8407350
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/DefaultMQPushConsumer.cpp
@@ -0,0 +1,897 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQPushConsumer.h"
+#include "CommunicationMode.h"
+#include "ConsumeMsgService.h"
+#include "ConsumerRunningInfo.h"
+#include "FilterAPI.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "MQProtos.h"
+#include "OffsetStore.h"
+#include "PullAPIWrapper.h"
+#include "PullSysFlag.h"
+#include "Rebalance.h"
+#include "UtilAll.h"
+#include "Validators.h"
+#include "task_queue.h"
+
+namespace rocketmq {
+
+class AsyncPullCallback : public PullCallback {
+ public:
+  AsyncPullCallback(DefaultMQPushConsumer* pushConsumer, PullRequest* request)
+      : m_callbackOwner(pushConsumer),
+        m_pullRequest(request),
+        m_bShutdown(false) {}
+  virtual ~AsyncPullCallback() {
+    m_callbackOwner = NULL;
+    m_pullRequest = NULL;
+  }
+  virtual void onSuccess(MQMessageQueue& mq, PullResult& result,
+                         bool bProducePullRequest) {
+    if (m_bShutdown == true) {
+      LOG_INFO("pullrequest for:%s in shutdown, return",
+               (m_pullRequest->m_messageQueue).toString().c_str());
+      return;
+    }
+
+    switch (result.pullStatus) {
+      case FOUND: {
+        if (!m_pullRequest->isDroped())  // if request is setted to dropped,
+                                         // don't add msgFoundList to
+                                         // m_msgTreeMap and don't call
+                                         // producePullMsgTask
+        {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+           // and this request is dropped, and then received pulled msgs.
+          m_pullRequest->setNextOffset(result.nextBeginOffset);
+          m_pullRequest->putMessage(result.msgFoundList);
+
+          m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(
+              m_pullRequest, result.msgFoundList);
+
+          if (bProducePullRequest)
+            m_callbackOwner->producePullMsgTask(m_pullRequest);
+
+          LOG_DEBUG("FOUND:%s with size:%zu,nextBeginOffset:%lld",
+                    (m_pullRequest->m_messageQueue).toString().c_str(),
+                    result.msgFoundList.size(), result.nextBeginOffset);
+        }
+        break;
+      }
+      case NO_NEW_MSG: {
+        m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+        vector<MQMessageExt> msgs;
+        m_pullRequest->getMessage(msgs);
+        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+          is kept, then consumer will enter following situation:
+          1>. get pull offset with 0 when do rebalance, and set
+          m_offsetTable[mq] to 0;
+          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+          offset increase by 800
+          3>. request->getMessage(msgs) always NULL
+          4>. we need update consumerOffset to nextBeginOffset indicated by
+          broker
+          but if really no new msg could be pulled, also go to this CASE
+
+          LOG_INFO("maybe misMatch between broker and client happens, update
+          consumerOffset to nextBeginOffset indicated by broker");*/
+          m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+                                               result.nextBeginOffset);
+        }
+        if (bProducePullRequest)
+          m_callbackOwner->producePullMsgTask(m_pullRequest);
+
+        /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+                 (m_pullRequest->m_messageQueue).toString().c_str(),
+                 result.nextBeginOffset);*/
+        break;
+      }
+      case NO_MATCHED_MSG: {
+        m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+        vector<MQMessageExt> msgs;
+        m_pullRequest->getMessage(msgs);
+        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+          is kept, then consumer will enter following situation:
+          1>. get pull offset with 0 when do rebalance, and set
+          m_offsetTable[mq] to 0;
+          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+          offset increase by 800
+          3>. request->getMessage(msgs) always NULL
+          4>. we need update consumerOffset to nextBeginOffset indicated by
+          broker
+          but if really no new msg could be pulled, also go to this CASE
+
+          LOG_INFO("maybe misMatch between broker and client happens, update
+          consumerOffset to nextBeginOffset indicated by broker");*/
+          m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+                                               result.nextBeginOffset);
+        }
+        if (bProducePullRequest)
+          m_callbackOwner->producePullMsgTask(m_pullRequest);
+        /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+                 (m_pullRequest->m_messageQueue).toString().c_str(),
+                 result.nextBeginOffset);*/
+        break;
+      }
+      case OFFSET_ILLEGAL: {
+        m_pullRequest->setNextOffset(result.nextBeginOffset);
+        if (bProducePullRequest)
+          m_callbackOwner->producePullMsgTask(m_pullRequest);
+
+        /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+                 (m_pullRequest->m_messageQueue).toString().c_str(),
+                 result.nextBeginOffset);*/
+        break;
+      }
+      case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
+                              // will not returns this status, so this case
+                              // could not be entered.
+        LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+        m_pullRequest->setNextOffset(result.nextBeginOffset);
+        if (bProducePullRequest)
+          m_callbackOwner->producePullMsgTask(m_pullRequest);
+        break;
+      }
+    }
+  }
+
+  virtual void onException(MQException& e) {
+    if (m_bShutdown == true) {
+      LOG_INFO("pullrequest for:%s in shutdown, return",
+               (m_pullRequest->m_messageQueue).toString().c_str());
+      return;
+    }
+    LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
+             (m_pullRequest->m_messageQueue).toString().c_str());
+    m_callbackOwner->producePullMsgTask(m_pullRequest);
+  }
+
+  void setShutdownStatus() { m_bShutdown = true; }
+
+ private:
+  DefaultMQPushConsumer* m_callbackOwner;
+  PullRequest* m_pullRequest;
+  bool m_bShutdown;
+};
+
+//<!***************************************************************************
+static boost::mutex m_asyncCallbackLock;
+DefaultMQPushConsumer::DefaultMQPushConsumer(const string& groupname)
+    : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
+      m_pOffsetStore(NULL),
+      m_pPullAPIWrapper(NULL),
+      m_pMessageListener(NULL),
+      m_consumeMessageBatchMaxSize(1),
+      m_maxMsgCacheSize(1000) {
+  //<!set default group name;
+  string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
+  setGroupName(gname);
+  m_asyncPull = true;
+  m_asyncPullTimeout = 30 * 1000;
+  setMessageModel(CLUSTERING);
+
+  m_startTime = UtilAll::currentTimeMillis();
+  m_consumeThreadCount = boost::thread::hardware_concurrency();
+  m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
+  m_async_service_thread.reset(new boost::thread(
+      boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
+}
+
+void DefaultMQPushConsumer::boost_asio_work() {
+  LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
+  boost::asio::io_service::work work(m_async_ioService);  // avoid async io
+                                                          // service stops after
+                                                          // first timer timeout
+                                                          // callback
+  m_async_ioService.run();
+}
+
+DefaultMQPushConsumer::~DefaultMQPushConsumer() {
+  m_pMessageListener = NULL;
+  deleteAndZero(m_pullmsgQueue);
+  deleteAndZero(m_pRebalance);
+  deleteAndZero(m_pOffsetStore);
+  deleteAndZero(m_pPullAPIWrapper);
+  deleteAndZero(m_consumerServeice);
+  PullMAP::iterator it = m_PullCallback.begin();
+  for (; it != m_PullCallback.end(); ++it) {
+    deleteAndZero(it->second);
+  }
+  m_PullCallback.clear();
+  m_subTopics.clear();
+}
+
+void DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+  try {
+    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(
+        msg, getGroupName(), delayLevel, 3000, getSessionCredentials());
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+  }
+}
+
+void DefaultMQPushConsumer::fetchSubscribeMessageQueues(
+    const string& topic, vector<MQMessageQueue>& mqs) {
+  mqs.clear();
+  try {
+    getFactory()->fetchSubscribeMessageQueues(topic, mqs,
+                                              getSessionCredentials());
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+  }
+}
+
+void DefaultMQPushConsumer::doRebalance() {
+  if (isServiceStateOk()) {
+    try {
+      m_pRebalance->doRebalance();
+    } catch (MQException& e) {
+      LOG_ERROR(e.what());
+    }
+  }
+}
+
+void DefaultMQPushConsumer::persistConsumerOffset() {
+  if (isServiceStateOk()) {
+    m_pRebalance->persistConsumerOffset();
+  }
+}
+
+void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
+  if (isServiceStateOk()) {
+    m_pRebalance->persistConsumerOffsetByResetOffset();
+  }
+}
+
+void DefaultMQPushConsumer::start() {
+  /* Ignore the SIGPIPE */
+  struct sigaction sa;
+  sa.sa_handler = SIG_IGN;
+  sa.sa_flags = 0;
+  sigaction(SIGPIPE, &sa, 0);
+
+  switch (m_serviceState) {
+    case CREATE_JUST: {
+      m_serviceState = START_FAILED;
+      MQClient::start();
+      LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
+
+      //<!data;
+      checkConfig();
+
+      //<!create rebalance;
+      m_pRebalance = new RebalancePush(this, getFactory());
+
+      string groupname = getGroupName();
+      m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
+
+      if (m_pMessageListener) {
+        if (m_pMessageListener->getMessageListenerType() ==
+            messageListenerOrderly) {
+          LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
+          m_consumerServeice = new ConsumeMessageOrderlyService(
+              this, m_consumeThreadCount, m_pMessageListener);
+        } else  // for backward compatible, defaultly and concurrently listeners
+                // are allocating ConsumeMessageConcurrentlyService
+        {
+          LOG_INFO("start concurrently consume service:%s",
+                   getGroupName().c_str());
+          m_consumerServeice = new ConsumeMessageConcurrentlyService(
+              this, m_consumeThreadCount, m_pMessageListener);
+        }
+      }
+
+      m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
+      m_pullmsgThread.reset(new boost::thread(boost::bind(
+          &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
+
+      copySubscription();
+
+      //<! registe;
+      bool registerOK = getFactory()->registerConsumer(this);
+      if (!registerOK) {
+        m_serviceState = CREATE_JUST;
+        THROW_MQEXCEPTION(
+            MQClientException,
+            "The cousumer group[" + getGroupName() +
+                "] has been created before, specify another name please.",
+            -1);
+      }
+
+      //<!msg model;
+      switch (getMessageModel()) {
+        case BROADCASTING:
+          m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
+          break;
+        case CLUSTERING:
+          m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
+          break;
+      }
+      m_pOffsetStore->load();
+      m_consumerServeice->start();
+
+      getFactory()->start();
+
+      //<!����ط����ʱ��ܳ���;
+      updateTopicSubscribeInfoWhenSubscriptionChanged();
+      getFactory()->sendHeartbeatToAllBroker();
+
+      m_serviceState = RUNNING;
+      break;
+    }
+    case RUNNING:
+    case START_FAILED:
+    case SHUTDOWN_ALREADY:
+      break;
+    default:
+      break;
+  }
+
+  getFactory()->rebalanceImmediately();
+}
+
+void DefaultMQPushConsumer::shutdown() {
+  switch (m_serviceState) {
+    case RUNNING: {
+      LOG_INFO("DefaultMQPushConsumer shutdown");
+      m_async_ioService.stop();
+      m_async_service_thread->interrupt();
+      m_async_service_thread->join();
+      m_pullmsgQueue->close();
+      m_pullmsgThread->interrupt();
+      m_pullmsgThread->join();
+      m_consumerServeice->shutdown();
+      persistConsumerOffset();
+      shutdownAsyncPullCallBack();  // delete aync pullMsg resources
+      getFactory()->unregisterConsumer(this);
+      getFactory()->shutdown();
+      m_serviceState = SHUTDOWN_ALREADY;
+      break;
+    }
+    case CREATE_JUST:
+    case SHUTDOWN_ALREADY:
+      break;
+    default:
+      break;
+  }
+}
+
+void DefaultMQPushConsumer::registerMessageListener(
+    MQMessageListener* pMessageListener) {
+  if (NULL != pMessageListener) {
+    m_pMessageListener = pMessageListener;
+  }
+}
+
+MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
+  if (NULL != m_pMessageListener) {
+    return m_pMessageListener->getMessageListenerType();
+  }
+  return messageListenerDefaultly;
+}
+
+ConsumeMsgService* DefaultMQPushConsumer::getConsumerMsgService() const {
+  return m_consumerServeice;
+}
+
+OffsetStore* DefaultMQPushConsumer::getOffsetStore() const {
+  return m_pOffsetStore;
+}
+
+Rebalance* DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }
+
+void DefaultMQPushConsumer::subscribe(const string& topic,
+                                      const string& subExpression) {
+  m_subTopics[topic] = subExpression;
+}
+
+void DefaultMQPushConsumer::checkConfig() {
+  string groupname = getGroupName();
+  // check consumerGroup
+  Validators::checkGroup(groupname);
+
+  // consumerGroup
+  if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
+    THROW_MQEXCEPTION(MQClientException,
+                      "consumerGroup can not equal DEFAULT_CONSUMER", -1);
+  }
+
+  if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
+    THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
+  }
+
+  if (m_pMessageListener == NULL) {
+    THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
+  }
+}
+
+void DefaultMQPushConsumer::copySubscription() {
+  map<string, string>::iterator it = m_subTopics.begin();
+  for (; it != m_subTopics.end(); ++it) {
+    LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),
+             it->second.c_str());
+    unique_ptr<SubscriptionData> pSData(
+        FilterAPI::buildSubscriptionData(it->first, it->second));
+
+    m_pRebalance->setSubscriptionData(it->first, pSData.release());
+  }
+
+  switch (getMessageModel()) {
+    case BROADCASTING:
+      break;
+    case CLUSTERING: {
+      string retryTopic = UtilAll::getRetryTopic(getGroupName());
+
+      //<!this sub;
+      unique_ptr<SubscriptionData> pSData(
+          FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
+
+      m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
+      break;
+    }
+    default:
+      break;
+  }
+}
+
+void DefaultMQPushConsumer::updateTopicSubscribeInfo(
+    const string& topic, vector<MQMessageQueue>& info) {
+  m_pRebalance->setTopicSubscribeInfo(topic, info);
+}
+
+void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
+  map<string, SubscriptionData*>& subTable =
+      m_pRebalance->getSubscriptionInner();
+  map<string, SubscriptionData*>::iterator it = subTable.begin();
+  for (; it != subTable.end(); ++it) {
+    bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(
+        it->first, getSessionCredentials());
+    if (btopic == false) {
+      LOG_WARN("The topic:[%s] not exist", it->first.c_str());
+    }
+  }
+}
+
+ConsumeType DefaultMQPushConsumer::getConsumeType() {
+  return CONSUME_PASSIVELY;
+}
+
+ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
+  return m_consumeFromWhere;
+}
+
+void DefaultMQPushConsumer::setConsumeFromWhere(
+    ConsumeFromWhere consumeFromWhere) {
+  m_consumeFromWhere = consumeFromWhere;
+}
+
+void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData>& result) {
+  map<string, SubscriptionData*>& subTable =
+      m_pRebalance->getSubscriptionInner();
+  map<string, SubscriptionData*>::iterator it = subTable.begin();
+  for (; it != subTable.end(); ++it) {
+    result.push_back(*(it->second));
+  }
+}
+
+void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue& mq,
+                                                int64 offset) {
+  if (offset >= 0) {
+    m_pOffsetStore->updateOffset(mq, offset);
+  } else {
+    LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
+  }
+}
+void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue& mq) {
+  m_pOffsetStore->removeOffset(mq);
+}
+
+void DefaultMQPushConsumer::triggerNextPullRequest(
+    boost::asio::deadline_timer* t, PullRequest* request) {
+  // LOG_INFO("trigger pullrequest for:%s",
+  // (request->m_messageQueue).toString().c_str());
+  producePullMsgTask(request);
+  deleteAndZero(t);
+}
+
+void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) {
+  if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+    if (m_asyncPull) {
+      m_pullmsgQueue->produce(TaskBinder::gen(
+          &DefaultMQPushConsumer::pullMessageAsync, this, request));
+    } else {
+      m_pullmsgQueue->produce(
+          TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
+    }
+  }
+}
+
+void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) {
+  pTaskQueue->run();
+}
+
+void DefaultMQPushConsumer::pullMessage(PullRequest* request) {
+  if (request == NULL || request->isDroped()) {
+    LOG_WARN("Pull request is set drop, return");
+    return;
+  }
+
+  MQMessageQueue& messageQueue = request->m_messageQueue;
+  if (m_consumerServeice->getConsumeMsgSerivceListenerType() ==
+      messageListenerOrderly) {
+    if (!request->isLocked() || request->isLockExpired()) {
+      if (!m_pRebalance->lock(messageQueue)) {
+        producePullMsgTask(request);
+        return;
+      }
+    }
+  }
+
+  if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+    // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+    // than:%d",  (request->m_messageQueue).toString().c_str(),
+    // request->getCacheMsgCount(), m_maxMsgCacheSize);
+    boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
+        m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+    t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+                              this, t, request));
+    return;
+  }
+
+  bool commitOffsetEnable = false;
+  int64 commitOffsetValue = 0;
+  if (CLUSTERING == getMessageModel()) {
+    commitOffsetValue = m_pOffsetStore->readOffset(
+        messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+    if (commitOffsetValue > 0) {
+      commitOffsetEnable = true;
+    }
+  }
+
+  string subExpression;
+  SubscriptionData* pSdata =
+      m_pRebalance->getSubscriptionData(messageQueue.getTopic());
+  if (pSdata == NULL) {
+    producePullMsgTask(request);
+    return;
+  }
+  subExpression = pSdata->getSubString();
+
+  int sysFlag =
+      PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
+                                false,                   // suspend
+                                !subExpression.empty(),  // subscription
+                                false);                  // class filter
+
+  try {
+    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+    unique_ptr<PullResult> result(
+        m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
+                                          subExpression,             // 2
+                                          pSdata->getSubVersion(),   // 3
+                                          request->getNextOffset(),  // 4
+                                          32,                        // 5
+                                          sysFlag,                   // 6
+                                          commitOffsetValue,         // 7
+                                          1000 * 15,                 // 8
+                                          1000 * 30,                 // 9
+                                          ComMode_SYNC,              // 10
+                                          NULL, getSessionCredentials()));
+
+    PullResult pullResult = m_pPullAPIWrapper->processPullResult(
+        messageQueue, result.get(), pSdata);
+
+    switch (pullResult.pullStatus) {
+      case FOUND: {
+        if (!request->isDroped())  // if request is setted to dropped, don't add
+                                   // msgFoundList to m_msgTreeMap and don't
+                                   // call producePullMsgTask
+        {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+           // and this request is dropped, and then received pulled msgs.
+          request->setNextOffset(pullResult.nextBeginOffset);
+          request->putMessage(pullResult.msgFoundList);
+
+          m_consumerServeice->submitConsumeRequest(request,
+                                                   pullResult.msgFoundList);
+          producePullMsgTask(request);
+
+          LOG_DEBUG("FOUND:%s with size:%zu,nextBeginOffset:%lld",
+                    messageQueue.toString().c_str(),
+                    pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
+        }
+        break;
+      }
+      case NO_NEW_MSG: {
+        request->setNextOffset(pullResult.nextBeginOffset);
+        vector<MQMessageExt> msgs;
+        request->getMessage(msgs);
+        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+          is kept, then consumer will enter following situation:
+          1>. get pull offset with 0 when do rebalance, and set
+          m_offsetTable[mq] to 0;
+          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+          offset increase by 800
+          3>. request->getMessage(msgs) always NULL
+          4>. we need update consumerOffset to nextBeginOffset indicated by
+          broker
+          but if really no new msg could be pulled, also go to this CASE
+       */
+          // LOG_DEBUG("maybe misMatch between broker and client happens, update
+          // consumerOffset to nextBeginOffset indicated by broker");
+          updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+        }
+        producePullMsgTask(request);
+        LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+        break;
+      }
+      case NO_MATCHED_MSG: {
+        request->setNextOffset(pullResult.nextBeginOffset);
+        vector<MQMessageExt> msgs;
+        request->getMessage(msgs);
+        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+          // LOG_DEBUG("maybe misMatch between broker and client happens, update
+          // consumerOffset to nextBeginOffset indicated by broker");
+          updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+        }
+        producePullMsgTask(request);
+
+        LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+        break;
+      }
+      case OFFSET_ILLEGAL: {
+        request->setNextOffset(pullResult.nextBeginOffset);
+        producePullMsgTask(request);
+
+        LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+        break;
+      }
+      case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
+                              // will not returns this status, so this case
+                              // could not be entered.
+        LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+        request->setNextOffset(pullResult.nextBeginOffset);
+        producePullMsgTask(request);
+        break;
+      }
+    }
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    producePullMsgTask(request);
+  }
+}
+
+AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(
+    PullRequest* request, MQMessageQueue msgQueue) {
+  boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+  if (m_asyncPull && request) {
+    PullMAP::iterator it = m_PullCallback.find(msgQueue);
+    if (it == m_PullCallback.end()) {
+      LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
+      m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
+    }
+    return m_PullCallback[msgQueue];
+  }
+
+  return NULL;
+}
+
+void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
+  boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+  if (m_asyncPull) {
+    PullMAP::iterator it = m_PullCallback.begin();
+    for (; it != m_PullCallback.end(); ++it) {
+      if (it->second) {
+        it->second->setShutdownStatus();
+      } else {
+        LOG_ERROR("could not find asyncPullCallback for:%s",
+                  it->first.toString().c_str());
+      }
+    }
+  }
+}
+
+void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
+  if (request == NULL || request->isDroped()) {
+    LOG_WARN("Pull request is set drop with mq:%s, return",
+             (request->m_messageQueue).toString().c_str());
+    return;
+  }
+
+  MQMessageQueue& messageQueue = request->m_messageQueue;
+  if (m_consumerServeice->getConsumeMsgSerivceListenerType() ==
+      messageListenerOrderly) {
+    if (!request->isLocked() || request->isLockExpired()) {
+      if (!m_pRebalance->lock(messageQueue)) {
+        producePullMsgTask(request);
+        return;
+      }
+    }
+  }
+
+  if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+    // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+    // than:%d",  (request->m_messageQueue).toString().c_str(),
+    // request->getCacheMsgCount(), m_maxMsgCacheSize);
+    boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
+        m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+    t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+                              this, t, request));
+    return;
+  }
+
+  bool commitOffsetEnable = false;
+  int64 commitOffsetValue = 0;
+  if (CLUSTERING == getMessageModel()) {
+    commitOffsetValue = m_pOffsetStore->readOffset(
+        messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+    if (commitOffsetValue > 0) {
+      commitOffsetEnable = true;
+    }
+  }
+
+  string subExpression;
+  SubscriptionData* pSdata =
+      (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
+  if (pSdata == NULL) {
+    producePullMsgTask(request);
+    return;
+  }
+  subExpression = pSdata->getSubString();
+
+  int sysFlag =
+      PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
+                                true,                    // suspend
+                                !subExpression.empty(),  // subscription
+                                false);                  // class filter
+
+  AsyncArg arg;
+  arg.mq = messageQueue;
+  arg.subData = *pSdata;
+  arg.pPullWrapper = m_pPullAPIWrapper;
+
+  try {
+    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+    m_pPullAPIWrapper->pullKernelImpl(
+        messageQueue,                                 // 1
+        subExpression,                                // 2
+        pSdata->getSubVersion(),                      // 3
+        request->getNextOffset(),                     // 4
+        32,                                           // 5
+        sysFlag,                                      // 6
+        commitOffsetValue,                            // 7
+        1000 * 15,                                    // 8
+        m_asyncPullTimeout,                           // 9
+        ComMode_ASYNC,                                // 10
+        getAsyncPullCallBack(request, messageQueue),  // 11
+        getSessionCredentials(),                      // 12
+        &arg);                                        // 13
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    producePullMsgTask(request);
+  }
+}
+
+void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
+  if(asyncFlag) {
+    LOG_INFO("set pushConsumer:%s to async default pull mode", getGroupName().c_str());
+  } else {
+    LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
+  }
+  m_asyncPull = asyncFlag;
+}
+
+void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
+  if (threadCount > 0) {
+    m_consumeThreadCount = threadCount;
+  } else {
+    LOG_ERROR("setConsumeThreadCount with invalid value");
+  }
+}
+
+int DefaultMQPushConsumer::getConsumeThreadCount() const {
+  return m_consumeThreadCount;
+}
+
+void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
+  m_pullMsgThreadPoolNum = threadCount;
+}
+
+int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
+  return m_pullMsgThreadPoolNum;
+}
+
+int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
+  return m_consumeMessageBatchMaxSize;
+}
+
+void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(
+    int consumeMessageBatchMaxSize) {
+  if (consumeMessageBatchMaxSize >= 1)
+    m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+}
+
+void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
+  if (maxCacheSize > 0 && maxCacheSize < 65535) {
+    LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,
+             getGroupName().c_str());
+    m_maxMsgCacheSize = maxCacheSize;
+  }
+}
+
+int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
+  return m_maxMsgCacheSize;
+}
+
+ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() {
+  ConsumerRunningInfo* info = new ConsumerRunningInfo();
+  if (info) {
+    if(m_consumerServeice->getConsumeMsgSerivceListenerType() == messageListenerOrderly)
+      info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
+    else
+      info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
+    info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadCount));
+    info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,
+                      UtilAll::to_string(m_startTime));
+
+    vector<SubscriptionData> result;
+    getSubscriptions(result);
+    info->setSubscriptionSet(result);
+
+    map<MQMessageQueue, PullRequest*> requestTable =
+        m_pRebalance->getPullRequestTable();
+    map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();
+
+    for (; it != requestTable.end(); ++it) {
+      if (!it->second->isDroped()) {
+        map<MessageQueue, ProcessQueueInfo> queueTable;
+        MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),
+                           (it->first).getQueueId());
+        ProcessQueueInfo processQueue;
+        processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
+        processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
+        processQueue.cachedMsgCount = it->second->getCacheMsgCount();
+        processQueue.setCommitOffset(m_pOffsetStore->readOffset(
+            it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
+        processQueue.setDroped(it->second->isDroped());
+        processQueue.setLocked(it->second->isLocked());
+        processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
+        processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
+        processQueue.lastConsumeTimestamp =
+            it->second->getLastConsumeTimestamp();
+        info->setMqTable(queue, processQueue);
+      }
+    }
+
+    return info;
+  }
+  return NULL;
+}
+
+//<!************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/FindBrokerResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/FindBrokerResult.h b/rocketmq-cpp/src/consumer/FindBrokerResult.h
new file mode 100755
index 0000000..a224b14
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/FindBrokerResult.h
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __FINDBROKERRESULT_H__
+#define __FINDBROKERRESULT_H__
+
+namespace rocketmq {
+//<!************************************************************************
+struct FindBrokerResult {
+  FindBrokerResult(const std::string& sbrokerAddr, bool bslave)
+      : brokerAddr(sbrokerAddr), slave(bslave) {}
+
+ public:
+  std::string brokerAddr;
+  bool slave;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/OffsetStore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/OffsetStore.cpp b/rocketmq-cpp/src/consumer/OffsetStore.cpp
new file mode 100755
index 0000000..33cf9ed
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/OffsetStore.cpp
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "OffsetStore.h"
+#include "Logging.h"
+#include "MQClientFactory.h"
+#include "MessageQueue.h"
+
+#include <fstream>
+#include <sstream>
+
+#include <boost/archive/binary_iarchive.hpp>
+#include <boost/archive/binary_oarchive.hpp>
+#include <boost/archive/text_iarchive.hpp>
+#include <boost/archive/text_oarchive.hpp>
+#include <boost/serialization/map.hpp>
+
+namespace rocketmq {
+
+//<!***************************************************************************
+OffsetStore::OffsetStore(const string& groupName, MQClientFactory* pfactory)
+    : m_groupName(groupName), m_pClientFactory(pfactory) {}
+
+OffsetStore::~OffsetStore() {
+  m_pClientFactory = NULL;
+  m_offsetTable.clear();
+}
+
+//<!***************************************************************************
+LocalFileOffsetStore::LocalFileOffsetStore(const string& groupName,
+                                           MQClientFactory* pfactory)
+    : OffsetStore(groupName, pfactory) {
+  MQConsumer* pConsumer = pfactory->selectConsumer(groupName);
+  if (pConsumer) {
+    LOG_INFO("new LocalFileOffsetStore");
+    string directoryName =
+        UtilAll::getLocalAddress() + "@" + pConsumer->getInstanceName();
+    m_storePath = ".rocketmq_offsets/" + directoryName + "/" + groupName + "/";
+    string homeDir(UtilAll::getHomeDirectory());
+    m_storeFile = homeDir + "/" + m_storePath + "offsets.Json";
+
+    string storePath(homeDir);
+    storePath.append(m_storePath);
+    if (access(storePath.c_str(), F_OK) != 0) {
+      if (mkdir(storePath.c_str(), S_IRWXU | S_IRWXG | S_IRWXO) != 0) {
+        LOG_ERROR("create data dir:%s error", storePath.c_str());
+      }
+    }
+  }
+}
+
+LocalFileOffsetStore::~LocalFileOffsetStore() {}
+
+void LocalFileOffsetStore::load() {
+  boost::lock_guard<boost::mutex> lock(m_lock);
+
+  std::ifstream ifs(m_storeFile.c_str(), std::ios::in);
+  if (ifs.good()) {
+    if (ifs.is_open()) {
+      if (ifs.peek() != std::ifstream::traits_type::eof()) {
+        map<string, int64> m_offsetTable_tmp;
+        boost::archive::text_iarchive ia(ifs);
+        ia >> m_offsetTable_tmp;
+        ifs.close();
+
+        for (map<string, int64>::iterator it = m_offsetTable_tmp.begin();
+             it != m_offsetTable_tmp.end(); ++it) {
+          // LOG_INFO("it->first:%s, it->second:%lld", it->first.c_str(),
+          // it->second);
+          Json::Reader reader;
+          Json::Value object;
+          reader.parse(it->first.c_str(), object);
+          MQMessageQueue mq(object["topic"].asString(),
+                            object["brokerName"].asString(),
+                            object["queueId"].asInt());
+          m_offsetTable[mq] = it->second;
+        }
+        m_offsetTable_tmp.clear();
+        /*for(map<MQMessageQueue, int64>::iterator it2 = m_offsetTable.begin();
+        it2!=m_offsetTable.end();++it2 ){
+            LOG_INFO("it->first:%s, it->second:%lld",
+        it2->first.toString().c_str(), it2->second);
+        }*/
+      } else {
+        LOG_ERROR(
+            "open offset store file failed, please check whether file: %s is "
+            "cleared by operator, if so, delete this offsets.Json file and "
+            "then restart consumer",
+            m_storeFile.c_str());
+        THROW_MQEXCEPTION(MQClientException,
+                          "open offset store file failed, please check whether "
+                          "offsets.Json is cleared by operator, if so, delete "
+                          "this offsets.Json file and then restart consumer",
+                          -1);
+      }
+    } else {
+      LOG_ERROR(
+          "open offset store file failed, please check whether file:%s is "
+          "deleted by operator and then restart consumer",
+          m_storeFile.c_str());
+      THROW_MQEXCEPTION(MQClientException,
+                        "open offset store file failed, please check "
+                        "directory:%s is deleted by operator or offset.Json "
+                        "file is cleared by operator, and then restart "
+                        "consumer",
+                        -1);
+    }
+  } else {
+    LOG_WARN(
+        "offsets.Json file not exist, maybe this is the first time "
+        "consumation");
+  }
+}
+
+void LocalFileOffsetStore::updateOffset(const MQMessageQueue& mq,
+                                        int64 offset) {
+  boost::lock_guard<boost::mutex> lock(m_lock);
+  m_offsetTable[mq] = offset;
+}
+
+int64 LocalFileOffsetStore::readOffset(
+    const MQMessageQueue& mq, ReadOffsetType type,
+    const SessionCredentials& session_credentials) {
+  boost::lock_guard<boost::mutex> lock(m_lock);
+
+  switch (type) {
+    case MEMORY_FIRST_THEN_STORE:
+    case READ_FROM_MEMORY: {
+      MQ2OFFSET::iterator it = m_offsetTable.find(mq);
+      if (it != m_offsetTable.end()) {
+        return it->second;
+      } else if (READ_FROM_MEMORY == type) {
+        return -1;
+      }
+    }
+    case READ_FROM_STORE: {
+      try {
+        load();
+      } catch (MQException& e) {
+        LOG_ERROR("catch exception when load local file");
+        return -1;
+      }
+      MQ2OFFSET::iterator it = m_offsetTable.find(mq);
+      if (it != m_offsetTable.end()) {
+        return it->second;
+      }
+    }
+    default:
+      break;
+  }
+  LOG_ERROR(
+      "can not readOffset from offsetStore.json, maybe first time consumation");
+  return -1;
+}
+
+void LocalFileOffsetStore::persist(
+    const MQMessageQueue& mq, const SessionCredentials& session_credentials) {}
+
+void LocalFileOffsetStore::persistAll(const std::vector<MQMessageQueue>& mqs) {
+  boost::lock_guard<boost::mutex> lock(m_lock);
+
+  map<string, int64> m_offsetTable_tmp;
+  vector<MQMessageQueue>::const_iterator it = mqs.begin();
+  for (; it != mqs.end(); ++it) {
+    MessageQueue mq_tmp((*it).getTopic(), (*it).getBrokerName(),
+                        (*it).getQueueId());
+    string mqKey = mq_tmp.toJson().toStyledString();
+    m_offsetTable_tmp[mqKey] = m_offsetTable[*it];
+  }
+
+  std::ofstream s;
+  s.open(m_storeFile.c_str(), std::ios::out);
+  if (s.is_open()) {
+    boost::archive::text_oarchive oa(s);
+    // Boost is nervous that archiving non-const class instances which might
+    // cause a problem with object tracking if different tracked objects use the
+    // same address.
+    oa << const_cast<const map<string, int64>&>(m_offsetTable_tmp);
+    s.close();
+    m_offsetTable_tmp.clear();
+  } else {
+    LOG_ERROR("open offset store file failed");
+    m_offsetTable_tmp.clear();
+    THROW_MQEXCEPTION(MQClientException,
+                      "persistAll:open offset store file failed", -1);
+  }
+}
+
+void LocalFileOffsetStore::removeOffset(const MQMessageQueue& mq) {}
+
+//<!***************************************************************************
+RemoteBrokerOffsetStore::RemoteBrokerOffsetStore(const string& groupName,
+                                                 MQClientFactory* pfactory)
+    : OffsetStore(groupName, pfactory) {}
+
+RemoteBrokerOffsetStore::~RemoteBrokerOffsetStore() {}
+
+void RemoteBrokerOffsetStore::load() {}
+
+void RemoteBrokerOffsetStore::updateOffset(const MQMessageQueue& mq,
+                                           int64 offset) {
+  boost::lock_guard<boost::mutex> lock(m_lock);
+  m_offsetTable[mq] = offset;
+}
+
+int64 RemoteBrokerOffsetStore::readOffset(
+    const MQMessageQueue& mq, ReadOffsetType type,
+    const SessionCredentials& session_credentials) {
+  switch (type) {
+    case MEMORY_FIRST_THEN_STORE:
+    case READ_FROM_MEMORY: {
+      boost::lock_guard<boost::mutex> lock(m_lock);
+
+      MQ2OFFSET::iterator it = m_offsetTable.find(mq);
+      if (it != m_offsetTable.end()) {
+        return it->second;
+      } else if (READ_FROM_MEMORY == type) {
+        return -1;
+      }
+    }
+    case READ_FROM_STORE: {
+      try {
+        int64 brokerOffset =
+            fetchConsumeOffsetFromBroker(mq, session_credentials);
+        //<!update;
+        updateOffset(mq, brokerOffset);
+        return brokerOffset;
+      } catch (MQBrokerException& e) {
+        LOG_ERROR(e.what());
+        return -1;
+      } catch (MQException& e) {
+        LOG_ERROR(e.what());
+        return -2;
+      }
+    }
+    default:
+      break;
+  }
+  return -1;
+}
+
+void RemoteBrokerOffsetStore::persist(
+    const MQMessageQueue& mq, const SessionCredentials& session_credentials) {
+  MQ2OFFSET offsetTable;
+  {
+    boost::lock_guard<boost::mutex> lock(m_lock);
+    offsetTable = m_offsetTable;
+  }
+
+  MQ2OFFSET::iterator it = offsetTable.find(mq);
+  if (it != offsetTable.end()) {
+    try {
+      updateConsumeOffsetToBroker(mq, it->second, session_credentials);
+    } catch (MQException& e) {
+      LOG_ERROR("updateConsumeOffsetToBroker error");
+    }
+  }
+}
+
+void RemoteBrokerOffsetStore::persistAll(
+    const std::vector<MQMessageQueue>& mq) {}
+
+void RemoteBrokerOffsetStore::removeOffset(const MQMessageQueue& mq) {
+  boost::lock_guard<boost::mutex> lock(m_lock);
+  if (m_offsetTable.find(mq) != m_offsetTable.end()) m_offsetTable.erase(mq);
+}
+
+void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(
+    const MQMessageQueue& mq, int64 offset,
+    const SessionCredentials& session_credentials) {
+  unique_ptr<FindBrokerResult> pFindBrokerResult(
+      m_pClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()));
+
+  if (pFindBrokerResult == NULL) {
+    m_pClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(),
+                                                         session_credentials);
+    pFindBrokerResult.reset(
+        m_pClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()));
+  }
+
+  if (pFindBrokerResult != NULL) {
+    UpdateConsumerOffsetRequestHeader* pRequestHeader =
+        new UpdateConsumerOffsetRequestHeader();
+    pRequestHeader->topic = mq.getTopic();
+    pRequestHeader->consumerGroup = m_groupName;
+    pRequestHeader->queueId = mq.getQueueId();
+    pRequestHeader->commitOffset = offset;
+
+    try {
+      LOG_INFO(
+          "oneway updateConsumeOffsetToBroker of mq:%s, its offset is:%lld",
+          mq.toString().c_str(), offset);
+      return m_pClientFactory->getMQClientAPIImpl()->updateConsumerOffsetOneway(
+          pFindBrokerResult->brokerAddr, pRequestHeader, 1000 * 5,
+          session_credentials);
+    } catch (MQException& e) {
+      LOG_ERROR(e.what());
+    }
+  }
+  LOG_WARN("The broker not exist");
+}
+
+int64 RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(
+    const MQMessageQueue& mq, const SessionCredentials& session_credentials) {
+  unique_ptr<FindBrokerResult> pFindBrokerResult(
+      m_pClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()));
+
+  if (pFindBrokerResult == NULL) {
+    m_pClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(),
+                                                         session_credentials);
+    pFindBrokerResult.reset(
+        m_pClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()));
+  }
+
+  if (pFindBrokerResult != NULL) {
+    QueryConsumerOffsetRequestHeader* pRequestHeader =
+        new QueryConsumerOffsetRequestHeader();
+    pRequestHeader->topic = mq.getTopic();
+    pRequestHeader->consumerGroup = m_groupName;
+    pRequestHeader->queueId = mq.getQueueId();
+
+    return m_pClientFactory->getMQClientAPIImpl()->queryConsumerOffset(
+        pFindBrokerResult->brokerAddr, pRequestHeader, 1000 * 5,
+        session_credentials);
+  } else {
+    LOG_ERROR("The broker not exist when fetchConsumeOffsetFromBroker");
+    THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
+  }
+}
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/OffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/OffsetStore.h b/rocketmq-cpp/src/consumer/OffsetStore.h
new file mode 100755
index 0000000..269198f
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/OffsetStore.h
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __OFFSETSTORE_H__
+#define __OFFSETSTORE_H__
+
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include <map>
+#include "MQMessageQueue.h"
+#include "RocketMQClient.h"
+#include "SessionCredentials.h"
+
+namespace rocketmq {
+class MQClientFactory;
+//<!***************************************************************************
+enum ReadOffsetType {
+  //read offset from memory
+  READ_FROM_MEMORY,
+  //read offset from remoting
+  READ_FROM_STORE,
+  //read offset from memory firstly, then from remoting
+  MEMORY_FIRST_THEN_STORE,
+};
+
+//<!***************************************************************************
+class OffsetStore {
+ public:
+  OffsetStore(const std::string& groupName, MQClientFactory*);
+  virtual ~OffsetStore();
+
+  virtual void load() = 0;
+  virtual void updateOffset(const MQMessageQueue& mq, int64 offset) = 0;
+  virtual int64 readOffset(const MQMessageQueue& mq, ReadOffsetType type,
+                           const SessionCredentials& session_credentials) = 0;
+  virtual void persist(const MQMessageQueue& mq,
+                       const SessionCredentials& session_credentials) = 0;
+  virtual void persistAll(const std::vector<MQMessageQueue>& mq) = 0;
+  virtual void removeOffset(const MQMessageQueue& mq) = 0;
+
+ protected:
+  std::string m_groupName;
+  typedef std::map<MQMessageQueue, int64> MQ2OFFSET;
+  MQ2OFFSET m_offsetTable;
+  MQClientFactory* m_pClientFactory;
+  boost::mutex m_lock;
+};
+
+//<!***************************************************************************
+class LocalFileOffsetStore : public OffsetStore {
+ public:
+  LocalFileOffsetStore(const std::string& groupName, MQClientFactory*);
+  virtual ~LocalFileOffsetStore();
+
+  virtual void load();
+  virtual void updateOffset(const MQMessageQueue& mq, int64 offset);
+  virtual int64 readOffset(const MQMessageQueue& mq, ReadOffsetType type,
+                           const SessionCredentials& session_credentials);
+  virtual void persist(const MQMessageQueue& mq,
+                       const SessionCredentials& session_credentials);
+  virtual void persistAll(const std::vector<MQMessageQueue>& mq);
+  virtual void removeOffset(const MQMessageQueue& mq);
+
+ private:
+  std::string m_storePath;
+  std::string m_storeFile;
+};
+
+//<!***************************************************************************
+class RemoteBrokerOffsetStore : public OffsetStore {
+ public:
+  RemoteBrokerOffsetStore(const std::string& groupName, MQClientFactory*);
+  virtual ~RemoteBrokerOffsetStore();
+
+  virtual void load();
+  virtual void updateOffset(const MQMessageQueue& mq, int64 offset);
+  virtual int64 readOffset(const MQMessageQueue& mq, ReadOffsetType type,
+                           const SessionCredentials& session_credentials);
+  virtual void persist(const MQMessageQueue& mq,
+                       const SessionCredentials& session_credentials);
+  virtual void persistAll(const std::vector<MQMessageQueue>& mq);
+  virtual void removeOffset(const MQMessageQueue& mq);
+
+ private:
+  void updateConsumeOffsetToBroker(
+      const MQMessageQueue& mq, int64 offset,
+      const SessionCredentials& session_credentials);
+  int64 fetchConsumeOffsetFromBroker(
+      const MQMessageQueue& mq, const SessionCredentials& session_credentials);
+};
+//<!***************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullAPIWrapper.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullAPIWrapper.cpp b/rocketmq-cpp/src/consumer/PullAPIWrapper.cpp
new file mode 100755
index 0000000..6a4b507
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullAPIWrapper.cpp
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullAPIWrapper.h"
+#include "CommunicationMode.h"
+#include "MQClientFactory.h"
+#include "PullResultExt.h"
+#include "PullSysFlag.h"
+namespace rocketmq {
+//<!************************************************************************
+PullAPIWrapper::PullAPIWrapper(MQClientFactory* mQClientFactory,
+                               const string& consumerGroup) {
+  m_MQClientFactory = mQClientFactory;
+  m_consumerGroup = consumerGroup;
+}
+
+PullAPIWrapper::~PullAPIWrapper() {
+  m_MQClientFactory = NULL;
+  m_pullFromWhichNodeTable.clear();
+}
+
+void PullAPIWrapper::updatePullFromWhichNode(const MQMessageQueue& mq,
+                                             int brokerId) {
+  boost::lock_guard<boost::mutex> lock(m_lock);
+  m_pullFromWhichNodeTable[mq] = brokerId;
+}
+
+int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue& mq) {
+  boost::lock_guard<boost::mutex> lock(m_lock);
+  if (m_pullFromWhichNodeTable.find(mq) != m_pullFromWhichNodeTable.end()) {
+    return m_pullFromWhichNodeTable[mq];
+  }
+  return MASTER_ID;
+}
+
+PullResult PullAPIWrapper::processPullResult(
+    const MQMessageQueue& mq, PullResult* pullResult,
+    SubscriptionData* subscriptionData) {
+  PullResultExt* pResultExt = static_cast<PullResultExt*>(pullResult);
+  if (pResultExt == NULL) {
+    string errMsg("The pullResult NULL of");
+    errMsg.append(mq.toString());
+    THROW_MQEXCEPTION(MQClientException, errMsg, -1);
+  }
+
+  //<!update;
+  updatePullFromWhichNode(mq, pResultExt->suggestWhichBrokerId);
+
+  vector<MQMessageExt> msgFilterList;
+  if (pResultExt->pullStatus == FOUND) {
+    //<!decode all msg list;
+    vector<MQMessageExt> msgAllList;
+    MQDecoder::decodes(&pResultExt->msgMemBlock, msgAllList);
+
+    //<!filter msg list again;
+    if (subscriptionData != NULL && !subscriptionData->getTagsSet().empty()) {
+      msgFilterList.reserve(msgAllList.size());
+      vector<MQMessageExt>::iterator it = msgAllList.begin();
+      for (; it != msgAllList.end(); ++it) {
+        string msgTag = (*it).getTags();
+        if (subscriptionData->containTag(msgTag)) {
+          msgFilterList.push_back(*it);
+        }
+      }
+    } else
+    {
+      msgFilterList.swap(msgAllList);
+    }
+  }
+
+  return PullResult(pResultExt->pullStatus, pResultExt->nextBeginOffset,
+                    pResultExt->minOffset, pResultExt->maxOffset,
+                    msgFilterList);
+}
+
+PullResult* PullAPIWrapper::pullKernelImpl(
+    const MQMessageQueue& mq,        // 1
+    string subExpression,            // 2
+    int64 subVersion,                // 3
+    int64 offset,                    // 4
+    int maxNums,                     // 5
+    int sysFlag,                     // 6
+    int64 commitOffset,              // 7
+    int brokerSuspendMaxTimeMillis,  // 8
+    int timeoutMillis,               // 9
+    int communicationMode,           // 10
+    PullCallback* pullCallback, const SessionCredentials& session_credentials,
+    void* pArg /*= NULL*/) {
+  unique_ptr<FindBrokerResult> pFindBrokerResult(
+      m_MQClientFactory->findBrokerAddressInSubscribe(
+          mq.getBrokerName(), recalculatePullFromWhichNode(mq), false));
+  //<!goto nameserver;
+  if (pFindBrokerResult == NULL) {
+    m_MQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(),
+                                                          session_credentials);
+    pFindBrokerResult.reset(m_MQClientFactory->findBrokerAddressInSubscribe(
+        mq.getBrokerName(), recalculatePullFromWhichNode(mq), false));
+  }
+
+  if (pFindBrokerResult != NULL) {
+    int sysFlagInner = sysFlag;
+
+    if (pFindBrokerResult->slave) {
+      sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner);
+    }
+
+    PullMessageRequestHeader* pRequestHeader = new PullMessageRequestHeader();
+    pRequestHeader->consumerGroup = m_consumerGroup;
+    pRequestHeader->topic = mq.getTopic();
+    pRequestHeader->queueId = mq.getQueueId();
+    pRequestHeader->queueOffset = offset;
+    pRequestHeader->maxMsgNums = maxNums;
+    pRequestHeader->sysFlag = sysFlagInner;
+    pRequestHeader->commitOffset = commitOffset;
+    pRequestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis;
+    pRequestHeader->subscription = subExpression;
+    pRequestHeader->subVersion = subVersion;
+
+    return m_MQClientFactory->getMQClientAPIImpl()->pullMessage(
+        pFindBrokerResult->brokerAddr, pRequestHeader, timeoutMillis,
+        communicationMode, pullCallback, pArg, session_credentials);
+  }
+  THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
+}
+
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullAPIWrapper.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullAPIWrapper.h b/rocketmq-cpp/src/consumer/PullAPIWrapper.h
new file mode 100755
index 0000000..e3d0a1e
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullAPIWrapper.h
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _PULLAPIWRAPPER_H_
+#define _PULLAPIWRAPPER_H_
+
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include "AsyncCallback.h"
+#include "MQMessageQueue.h"
+#include "SessionCredentials.h"
+#include "SubscriptionData.h"
+
+namespace rocketmq {
+class MQClientFactory;
+//<!***************************************************************************
+class PullAPIWrapper {
+ public:
+  PullAPIWrapper(MQClientFactory* mQClientFactory, const string& consumerGroup);
+  ~PullAPIWrapper();
+
+  PullResult processPullResult(const MQMessageQueue& mq, PullResult* pullResult,
+                               SubscriptionData* subscriptionData);
+
+  PullResult* pullKernelImpl(const MQMessageQueue& mq,        // 1
+                             string subExpression,            // 2
+                             int64 subVersion,                // 3
+                             int64 offset,                    // 4
+                             int maxNums,                     // 5
+                             int sysFlag,                     // 6
+                             int64 commitOffset,              // 7
+                             int brokerSuspendMaxTimeMillis,  // 8
+                             int timeoutMillis,               // 9
+                             int communicationMode,           // 10
+                             PullCallback* pullCallback,
+                             const SessionCredentials& session_credentials,
+                             void* pArg = NULL);
+
+ private:
+  void updatePullFromWhichNode(const MQMessageQueue& mq, int brokerId);
+
+  int recalculatePullFromWhichNode(const MQMessageQueue& mq);
+
+ private:
+  MQClientFactory* m_MQClientFactory;
+  string m_consumerGroup;
+  boost::mutex m_lock;
+  map<MQMessageQueue, int /* brokerId */> m_pullFromWhichNodeTable;
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+
+#endif  //<! _PULLAPIWRAPPER_H_

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullRequest.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullRequest.cpp b/rocketmq-cpp/src/consumer/PullRequest.cpp
new file mode 100755
index 0000000..d9b953f
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullRequest.cpp
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullRequest.h"
+#include "Logging.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+const uint64 PullRequest::RebalanceLockInterval = 20 * 1000;
+const uint64 PullRequest::RebalanceLockMaxLiveTime = 30 * 1000;
+
+PullRequest::PullRequest(const string& groupname)
+    : m_groupname(groupname), m_nextOffset(0), m_queueOffsetMax(0), m_bDroped(false), m_bLocked(false) {}
+
+PullRequest::~PullRequest() {
+  m_msgTreeMapTemp.clear();
+  m_msgTreeMap.clear();
+}
+
+PullRequest& PullRequest::operator=(const PullRequest& other) {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+  if (this != &other) {
+    m_groupname = other.m_groupname;
+    m_nextOffset = other.m_nextOffset;
+    m_bDroped.store(other.m_bDroped.load());
+    m_queueOffsetMax = other.m_queueOffsetMax;
+    m_messageQueue = other.m_messageQueue;
+    m_msgTreeMap = other.m_msgTreeMap;
+    m_msgTreeMapTemp = other.m_msgTreeMapTemp;
+  }
+  return *this;
+}
+
+void PullRequest::putMessage(vector<MQMessageExt>& msgs) {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+  vector<MQMessageExt>::iterator it = msgs.begin();
+  for (; it != msgs.end(); it++) {
+    m_msgTreeMap[it->getQueueOffset()] = *it;
+    m_queueOffsetMax = std::max(m_queueOffsetMax, it->getQueueOffset());
+  }
+  LOG_DEBUG("PullRequest: putMessage m_queueOffsetMax:%lld ", m_queueOffsetMax);
+}
+
+void PullRequest::getMessage(vector<MQMessageExt>& msgs) {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+  map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
+  for (; it != m_msgTreeMap.end(); it++) {
+    msgs.push_back(it->second);
+  }
+}
+
+int64 PullRequest::getCacheMinOffset() {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+  if (m_msgTreeMap.empty()) {
+    return 0;
+  } else {
+    map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
+    MQMessageExt msg = it->second;
+    return msg.getQueueOffset();
+  }
+}
+
+int64 PullRequest::getCacheMaxOffset() { return m_queueOffsetMax; }
+
+int PullRequest::getCacheMsgCount() {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+  return m_msgTreeMap.size();
+}
+
+void PullRequest::getMessageByQueueOffset(vector<MQMessageExt>& msgs,
+                                          int64 minQueueOffset,
+                                          int64 maxQueueOffset) {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+  int64 it = minQueueOffset;
+  for (; it <= maxQueueOffset; it++) {
+    msgs.push_back(m_msgTreeMap[it]);
+  }
+}
+
+int64 PullRequest::removeMessage(vector<MQMessageExt>& msgs) {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+  int64 result = -1;
+  LOG_DEBUG("m_queueOffsetMax is:%lld", m_queueOffsetMax);
+  if (!m_msgTreeMap.empty()) {
+    result = m_queueOffsetMax + 1;
+    LOG_DEBUG(" offset result is:%lld, m_queueOffsetMax is:%lld, msgs size:%zu",
+              result, m_queueOffsetMax, msgs.size());
+    vector<MQMessageExt>::iterator it = msgs.begin();
+    for (; it != msgs.end(); it++) {
+      LOG_DEBUG("remove these msg from m_msgTreeMap, its offset:%lld",
+                it->getQueueOffset());
+      m_msgTreeMap.erase(it->getQueueOffset());
+    }
+
+    if (!m_msgTreeMap.empty()) {
+      map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
+      result = it->first;
+      LOG_INFO("cache msg size:%zu of pullRequest:%s, return offset result is:%lld",
+               m_msgTreeMap.size(), m_messageQueue.toString().c_str(), result);
+    }
+  }
+
+  return result;
+}
+
+void PullRequest::clearAllMsgs() {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+  if (isDroped()) {
+    LOG_DEBUG("clear m_msgTreeMap as PullRequest had been dropped.");
+    m_msgTreeMap.clear();
+    m_msgTreeMapTemp.clear();
+  }
+}
+
+void PullRequest::updateQueueMaxOffset(int64 queueOffset) {
+  // following 2 cases which may set queueOffset smaller than m_queueOffsetMax:
+  // 1. resetOffset cmd
+  // 2. during  rebalance, if configured with CONSUMER_FROM_FIRST_OFFSET, when
+  // readOffset called by computePullFromWhere was failed,  m_nextOffset will be
+  // setted to 0
+  m_queueOffsetMax = queueOffset;
+}
+
+void PullRequest::setDroped(bool droped) {
+  int temp = (droped == true ? 1 : 0);
+  m_bDroped.store(temp);
+  /*
+  m_queueOffsetMax = 0;
+  m_nextOffset = 0;
+  //the reason why not clear m_queueOffsetMax and m_nextOffset is due to
+  ConsumeMsgService and drop mq are concurrent running.
+      consider following situation:
+      1>. ConsumeMsgService running
+      2>. dorebalance, drop mq, reset m_nextOffset and m_queueOffsetMax
+      3>. ConsumeMsgService calls removeMessages, if no other msgs in
+  m_msgTreeMap, m_queueOffsetMax(0)+1 will return;
+      4>. updateOffset with 1, which is more smaller than correct offset.
+  */
+}
+
+bool PullRequest::isDroped() const { return m_bDroped.load() == 1; }
+
+int64 PullRequest::getNextOffset() {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+  return m_nextOffset;
+}
+
+void PullRequest::setLocked(bool Locked) {
+  int temp = (Locked == true ? 1 : 0);
+  m_bLocked.store(temp);
+}
+bool PullRequest::isLocked() const { return m_bLocked.load() == 1; }
+
+bool PullRequest::isLockExpired() const {
+  return (UtilAll::currentTimeMillis() - m_lastLockTimestamp) >
+         RebalanceLockMaxLiveTime;
+}
+
+void PullRequest::setLastLockTimestamp(int64 time) {
+  m_lastLockTimestamp = time;
+}
+
+int64 PullRequest::getLastLockTimestamp() const { return m_lastLockTimestamp; }
+
+void PullRequest::setLastPullTimestamp(uint64 time) {
+  m_lastPullTimestamp = time;
+}
+
+uint64 PullRequest::getLastPullTimestamp() const { return m_lastPullTimestamp; }
+
+void PullRequest::setLastConsumeTimestamp(uint64 time) {
+  m_lastConsumeTimestamp = time;
+}
+
+uint64 PullRequest::getLastConsumeTimestamp() const {
+  return m_lastConsumeTimestamp;
+}
+
+void PullRequest::setTryUnlockTimes(int time) { m_lastLockTimestamp = time; }
+
+int PullRequest::getTryUnlockTimes() const { return m_lastLockTimestamp; }
+
+void PullRequest::setNextOffset(int64 nextoffset) {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+  m_nextOffset = nextoffset;
+}
+
+string PullRequest::getGroupName() const { return m_groupname; }
+
+boost::timed_mutex& PullRequest::getPullRequestCriticalSection() {
+  return m_consumeLock;
+}
+
+void PullRequest::takeMessages(vector<MQMessageExt>& msgs, int batchSize) {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+  for (int i = 0; i != batchSize; i++) {
+    map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
+    if (it != m_msgTreeMap.end()) {
+      msgs.push_back(it->second);
+      m_msgTreeMapTemp[it->first] = it->second;
+      m_msgTreeMap.erase(it);
+    }
+  }
+}
+
+void PullRequest::makeMessageToCosumeAgain(vector<MQMessageExt>& msgs) {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+  for (unsigned int it = 0; it != msgs.size(); ++it) {
+    m_msgTreeMap[msgs[it].getQueueOffset()] = msgs[it];
+    m_msgTreeMapTemp.erase(msgs[it].getQueueOffset());
+  }
+}
+
+int64 PullRequest::commit() {
+  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+  if (!m_msgTreeMapTemp.empty()) {
+    int64 offset = (--m_msgTreeMapTemp.end())->first;
+    m_msgTreeMapTemp.clear();
+    return offset + 1;
+  } else {
+    return -1;
+  }
+}
+
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullRequest.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullRequest.h b/rocketmq-cpp/src/consumer/PullRequest.h
new file mode 100755
index 0000000..6cd2180
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullRequest.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PULLREQUEST_H__
+#define __PULLREQUEST_H__
+
+#include <boost/atomic.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include "ByteOrder.h"
+#include "MQMessageExt.h"
+#include "MQMessageQueue.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class PullRequest {
+ public:
+  PullRequest(const string& groupname);
+  virtual ~PullRequest();
+
+  void putMessage(vector<MQMessageExt>& msgs);
+  void getMessage(vector<MQMessageExt>& msgs);
+  int64 getCacheMinOffset();
+  int64 getCacheMaxOffset();
+  int getCacheMsgCount();
+  void getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset,
+                               int64 maxQueueOffset);
+  int64 removeMessage(vector<MQMessageExt>& msgs);
+  void clearAllMsgs();
+
+  PullRequest& operator=(const PullRequest& other);
+
+  void setDroped(bool droped);
+  bool isDroped() const;
+
+  int64 getNextOffset();
+  void setNextOffset(int64 nextoffset);
+
+  string getGroupName() const;
+
+  void updateQueueMaxOffset(int64 queueOffset);
+
+  void setLocked(bool Locked);
+  bool isLocked() const;
+  bool isLockExpired() const;
+  void setLastLockTimestamp(int64 time);
+  int64 getLastLockTimestamp() const;
+  void setLastPullTimestamp(uint64 time);
+  uint64 getLastPullTimestamp() const;
+  void setLastConsumeTimestamp(uint64 time);
+  uint64 getLastConsumeTimestamp() const;
+  void setTryUnlockTimes(int time);
+  int getTryUnlockTimes() const;
+  void takeMessages(vector<MQMessageExt>& msgs, int batchSize);
+  int64 commit();
+  void makeMessageToCosumeAgain(vector<MQMessageExt>& msgs);
+  boost::timed_mutex& getPullRequestCriticalSection();
+
+ public:
+  MQMessageQueue m_messageQueue;
+  static const uint64 RebalanceLockInterval;     // ms
+  static const uint64 RebalanceLockMaxLiveTime;  // ms
+
+ private:
+  string m_groupname;
+  int64 m_nextOffset;
+  int64 m_queueOffsetMax;
+  boost::atomic<bool> m_bDroped;
+  boost::atomic<bool> m_bLocked;
+  map<int64, MQMessageExt> m_msgTreeMap;
+  map<int64, MQMessageExt> m_msgTreeMapTemp;
+  boost::mutex m_pullRequestLock;
+  uint64 m_lastLockTimestamp;  // ms
+  uint64 m_tryUnlockTimes;
+  uint64 m_lastPullTimestamp;
+  uint64 m_lastConsumeTimestamp;
+  boost::timed_mutex m_consumeLock;
+};
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullResult.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullResult.cpp b/rocketmq-cpp/src/consumer/PullResult.cpp
new file mode 100755
index 0000000..8648abe
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullResult.cpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullResult.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!************************************************************************
+PullResult::PullResult()
+    : pullStatus(NO_MATCHED_MSG),
+      nextBeginOffset(0),
+      minOffset(0),
+      maxOffset(0) {}
+
+PullResult::PullResult(PullStatus status)
+    : pullStatus(status), nextBeginOffset(0), minOffset(0), maxOffset(0) {}
+
+PullResult::PullResult(PullStatus pullStatus, int64 nextBeginOffset,
+                       int64 minOffset, int64 maxOffset)
+    : pullStatus(pullStatus),
+      nextBeginOffset(nextBeginOffset),
+      minOffset(minOffset),
+      maxOffset(maxOffset) {}
+
+PullResult::PullResult(PullStatus pullStatus, int64 nextBeginOffset,
+                       int64 minOffset, int64 maxOffset,
+                       const vector<MQMessageExt>& src)
+    : pullStatus(pullStatus),
+      nextBeginOffset(nextBeginOffset),
+      minOffset(minOffset),
+      maxOffset(maxOffset) {
+  msgFoundList.reserve(src.size());
+  for (size_t i = 0; i < src.size(); i++) {
+    msgFoundList.push_back(src[i]);
+  }
+}
+
+PullResult::~PullResult() { msgFoundList.clear(); }
+
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullResultExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullResultExt.h b/rocketmq-cpp/src/consumer/PullResultExt.h
new file mode 100755
index 0000000..ac6b8e9
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullResultExt.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullResult.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+
+namespace rocketmq {
+/**
+ * ֻ���ڲ�ʹ�ã������⹫��
+ */
+//<!***************************************************************************
+class PullResultExt : public PullResult {
+ public:
+  PullResultExt(PullStatus pullStatus, int64 nextBeginOffset, int64 minOffset,
+                int64 maxOffset, int suggestWhichBrokerId,
+                const MemoryBlock& messageBinary)
+      : PullResult(pullStatus, nextBeginOffset, minOffset, maxOffset),
+        suggestWhichBrokerId(suggestWhichBrokerId),
+        msgMemBlock(messageBinary) {}
+  PullResultExt(PullStatus pullStatus, int64 nextBeginOffset, int64 minOffset,
+                int64 maxOffset, int suggestWhichBrokerId)
+      : PullResult(pullStatus, nextBeginOffset, minOffset, maxOffset),
+        suggestWhichBrokerId(suggestWhichBrokerId) {}
+  virtual ~PullResultExt() {}
+
+ public:
+  int suggestWhichBrokerId;
+  MemoryBlock msgMemBlock;
+};
+
+}  //<!end namespace;