You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/01/07 02:46:47 UTC

[GitHub] duhenglucky closed pull request #53: [ISSUE #45]Fix unsafe code , build warnings and format code style

duhenglucky closed pull request #53: [ISSUE #45]Fix unsafe code ,build warnings and format code style
URL: https://github.com/apache/rocketmq-client-cpp/pull/53
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3a21ab4f..e9d5ca20 100755
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -139,6 +139,8 @@ set(CXX_FLAGS
  -fPIC
  -fno-strict-aliasing
  -std=c++11
+ -Wno-unused-local-typedef
+ -Wno-expansion-to-defined
  # -finline-limit=1000
  # -Wextra
  # -pedantic
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 9a8f41f3..b3ba2b2e 100755
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -97,7 +97,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
   int m_sendMsgTimeout;
   int m_compressMsgBodyOverHowmuch;
   int m_maxMessageSize;  //<! default:128K;
-  bool m_retryAnotherBrokerWhenNotStoreOK;
+  //bool m_retryAnotherBrokerWhenNotStoreOK;
   int m_compressLevel;
   int m_retryTimes;
 };
diff --git a/src/common/sync_http_client.h b/src/common/sync_http_client.h
index b25cc77d..98366059 100755
--- a/src/common/sync_http_client.h
+++ b/src/common/sync_http_client.h
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 #ifndef ROCKETMQ_CLIENT4CPP__SYNC_HTTP_CLIENT_H_
-#define ROCKETMQ_CLIENT4CPP_SYNC_HTTP_CLIENT_H_
+#define ROCKETMQ_CLIENT4CPP__SYNC_HTTP_CLIENT_H_
 
 #include <string>
 
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 4aa33f65..073a8017 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -58,7 +58,8 @@ DefaultMQPullConsumer::~DefaultMQPullConsumer() {
 void DefaultMQPullConsumer::start() {
 #ifndef WIN32
   /* Ignore the SIGPIPE */
-  struct sigaction sa = {0};
+  struct sigaction sa;
+  memset(&sa,0, sizeof(struct sigaction));
   sa.sa_handler = SIG_IGN;
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 96e5ec20..159e0ff9 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -1,946 +1,963 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "DefaultMQPushConsumer.h"
-#include "CommunicationMode.h"
-#include "ConsumeMsgService.h"
-#include "ConsumerRunningInfo.h"
-#include "FilterAPI.h"
-#include "Logging.h"
-#include "MQClientAPIImpl.h"
-#include "MQClientFactory.h"
-#include "MQClientManager.h"
-#include "MQProtos.h"
-#include "OffsetStore.h"
-#include "PullAPIWrapper.h"
-#include "PullSysFlag.h"
-#include "Rebalance.h"
-#include "UtilAll.h"
-#include "Validators.h"
-#include "task_queue.h"
-
-namespace rocketmq {
-
-class AsyncPullCallback : public PullCallback {
- public:
-  AsyncPullCallback(DefaultMQPushConsumer* pushConsumer, PullRequest* request)
-      : m_callbackOwner(pushConsumer),
-        m_pullRequest(request),
-        m_bShutdown(false) {}
-  virtual ~AsyncPullCallback() {
-    m_callbackOwner = NULL;
-    m_pullRequest = NULL;
-  }
-  virtual void onSuccess(MQMessageQueue& mq, PullResult& result,
-                         bool bProducePullRequest) {
-    if (m_bShutdown == true) {
-      LOG_INFO("pullrequest for:%s in shutdown, return",
-               (m_pullRequest->m_messageQueue).toString().c_str());
-      m_pullRequest->removePullMsgEvent();
-      return;
-    }
-
-    switch (result.pullStatus) {
-      case FOUND: {
-        if (!m_pullRequest->isDroped())  // if request is setted to dropped,
-                                         // don't add msgFoundList to
-                                         // m_msgTreeMap and don't call
-                                         // producePullMsgTask
-        {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
-           // and this request is dropped, and then received pulled msgs.
-          m_pullRequest->setNextOffset(result.nextBeginOffset);
-          m_pullRequest->putMessage(result.msgFoundList);
-
-          m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(
-              m_pullRequest, result.msgFoundList);
-
-          if (bProducePullRequest)
-            m_callbackOwner->producePullMsgTask(m_pullRequest);
-          else
-            m_pullRequest->removePullMsgEvent();
-
-          LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",
-                    (m_pullRequest->m_messageQueue).toString().c_str(),
-                    result.msgFoundList.size(), result.nextBeginOffset);
-        } else {
-          LOG_INFO("remove pullmsg event of mq:%s",
-                   (m_pullRequest->m_messageQueue).toString().c_str());
-          m_pullRequest->removePullMsgEvent();
-        }
-        break;
-      }
-      case NO_NEW_MSG: {
-        m_pullRequest->setNextOffset(result.nextBeginOffset);
-
-        vector<MQMessageExt> msgs;
-        m_pullRequest->getMessage(msgs);
-        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
-          is kept, then consumer will enter following situation:
-          1>. get pull offset with 0 when do rebalance, and set
-          m_offsetTable[mq] to 0;
-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
-          offset increase by 800
-          3>. request->getMessage(msgs) always NULL
-          4>. we need update consumerOffset to nextBeginOffset indicated by
-          broker
-          but if really no new msg could be pulled, also go to this CASE
-
-          LOG_INFO("maybe misMatch between broker and client happens, update
-          consumerOffset to nextBeginOffset indicated by broker");*/
-          m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
-                                               result.nextBeginOffset);
-        }
-        if (bProducePullRequest)
-          m_callbackOwner->producePullMsgTask(m_pullRequest);
-        else
-          m_pullRequest->removePullMsgEvent();
-
-        /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
-                 (m_pullRequest->m_messageQueue).toString().c_str(),
-                 result.nextBeginOffset);*/
-        break;
-      }
-      case NO_MATCHED_MSG: {
-        m_pullRequest->setNextOffset(result.nextBeginOffset);
-
-        vector<MQMessageExt> msgs;
-        m_pullRequest->getMessage(msgs);
-        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
-          is kept, then consumer will enter following situation:
-          1>. get pull offset with 0 when do rebalance, and set
-          m_offsetTable[mq] to 0;
-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
-          offset increase by 800
-          3>. request->getMessage(msgs) always NULL
-          4>. we need update consumerOffset to nextBeginOffset indicated by
-          broker
-          but if really no new msg could be pulled, also go to this CASE
-
-          LOG_INFO("maybe misMatch between broker and client happens, update
-          consumerOffset to nextBeginOffset indicated by broker");*/
-          m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
-                                               result.nextBeginOffset);
-        }
-        if (bProducePullRequest)
-          m_callbackOwner->producePullMsgTask(m_pullRequest);
-        else
-          m_pullRequest->removePullMsgEvent();
-        /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
-                 (m_pullRequest->m_messageQueue).toString().c_str(),
-                 result.nextBeginOffset);*/
-        break;
-      }
-      case OFFSET_ILLEGAL: {
-        m_pullRequest->setNextOffset(result.nextBeginOffset);
-        if (bProducePullRequest)
-          m_callbackOwner->producePullMsgTask(m_pullRequest);
-        else
-          m_pullRequest->removePullMsgEvent();
-
-        /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
-                 (m_pullRequest->m_messageQueue).toString().c_str(),
-                 result.nextBeginOffset);*/
-        break;
-      }
-      case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
-                              // will not returns this status, so this case
-                              // could not be entered.
-        LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
-        m_pullRequest->setNextOffset(result.nextBeginOffset);
-        if (bProducePullRequest)
-          m_callbackOwner->producePullMsgTask(m_pullRequest);
-        else
-          m_pullRequest->removePullMsgEvent();
-        break;
-      }
-    }
-  }
-
-  virtual void onException(MQException& e) {
-    if (m_bShutdown == true) {
-      LOG_INFO("pullrequest for:%s in shutdown, return",
-               (m_pullRequest->m_messageQueue).toString().c_str());
-      m_pullRequest->removePullMsgEvent();
-      return;
-    }
-    LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
-             (m_pullRequest->m_messageQueue).toString().c_str());
-    m_callbackOwner->producePullMsgTask(m_pullRequest);
-  }
-
-  void setShutdownStatus() { m_bShutdown = true; }
-
- private:
-  DefaultMQPushConsumer* m_callbackOwner;
-  PullRequest* m_pullRequest;
-  bool m_bShutdown;
-};
-
-//<!***************************************************************************
-static boost::mutex m_asyncCallbackLock;
-DefaultMQPushConsumer::DefaultMQPushConsumer(const string& groupname)
-    : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
-      m_pOffsetStore(NULL),
-      m_pPullAPIWrapper(NULL),
-      m_pMessageListener(NULL),
-      m_consumeMessageBatchMaxSize(1),
-      m_maxMsgCacheSize(1000) {
-  //<!set default group name;
-  string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
-  setGroupName(gname);
-  m_asyncPull = true;
-  m_asyncPullTimeout = 30 * 1000;
-  setMessageModel(CLUSTERING);
-
-  m_startTime = UtilAll::currentTimeMillis();
-  m_consumeThreadCount = boost::thread::hardware_concurrency();
-  m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
-  m_async_service_thread.reset(new boost::thread(
-      boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
-}
-
-void DefaultMQPushConsumer::boost_asio_work() {
-  LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
-  boost::asio::io_service::work work(m_async_ioService);  // avoid async io
-                                                          // service stops after
-                                                          // first timer timeout
-                                                          // callback
-  m_async_ioService.run();
-}
-
-DefaultMQPushConsumer::~DefaultMQPushConsumer() {
-  m_pMessageListener = NULL;
-  if (m_pullmsgQueue != NULL) {
-    deleteAndZero(m_pullmsgQueue);
-  }
-  if (m_pRebalance != NULL) {
-    deleteAndZero(m_pRebalance);
-  }
-  if (m_pOffsetStore != NULL) {
-    deleteAndZero(m_pOffsetStore);
-  }
-  if (m_pPullAPIWrapper != NULL) {
-    deleteAndZero(m_pPullAPIWrapper);
-  }
-  if (m_consumerService != NULL) {
-    deleteAndZero(m_consumerService);
-  }
-  PullMAP::iterator it = m_PullCallback.begin();
-  for (; it != m_PullCallback.end(); ++it) {
-    deleteAndZero(it->second);
-  }
-  m_PullCallback.clear();
-  m_subTopics.clear();
-}
-
-void DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
-  try {
-    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(
-        msg, getGroupName(), delayLevel, 3000, getSessionCredentials());
-  } catch (MQException& e) {
-    LOG_ERROR(e.what());
-  }
-}
-
-void DefaultMQPushConsumer::fetchSubscribeMessageQueues(
-    const string& topic, vector<MQMessageQueue>& mqs) {
-  mqs.clear();
-  try {
-    getFactory()->fetchSubscribeMessageQueues(topic, mqs,
-                                              getSessionCredentials());
-  } catch (MQException& e) {
-    LOG_ERROR(e.what());
-  }
-}
-
-void DefaultMQPushConsumer::doRebalance() {
-  if (isServiceStateOk()) {
-    try {
-      m_pRebalance->doRebalance();
-    } catch (MQException& e) {
-      LOG_ERROR(e.what());
-    }
-  }
-}
-
-void DefaultMQPushConsumer::persistConsumerOffset() {
-  if (isServiceStateOk()) {
-    m_pRebalance->persistConsumerOffset();
-  }
-}
-
-void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
-  if (isServiceStateOk()) {
-    m_pRebalance->persistConsumerOffsetByResetOffset();
-  }
-}
-
-void DefaultMQPushConsumer::start() {
-#ifndef WIN32
-  /* Ignore the SIGPIPE */
-  struct sigaction sa ={0};
-  sa.sa_handler = SIG_IGN;
-  sa.sa_flags = 0;
-  sigaction(SIGPIPE, &sa, 0);
-#endif
-  switch (m_serviceState) {
-    case CREATE_JUST: {
-      m_serviceState = START_FAILED;
-      MQClient::start();
-      LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
-
-      //<!data;
-      checkConfig();
-
-      //<!create rebalance;
-      m_pRebalance = new RebalancePush(this, getFactory());
-
-      string groupname = getGroupName();
-      m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
-
-      if (m_pMessageListener) {
-        if (m_pMessageListener->getMessageListenerType() ==
-            messageListenerOrderly) {
-          LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
-          m_consumerService = new ConsumeMessageOrderlyService(
-              this, m_consumeThreadCount, m_pMessageListener);
-        } else  // for backward compatible, defaultly and concurrently listeners
-                // are allocating ConsumeMessageConcurrentlyService
-        {
-          LOG_INFO("start concurrently consume service:%s",
-                   getGroupName().c_str());
-          m_consumerService = new ConsumeMessageConcurrentlyService(
-              this, m_consumeThreadCount, m_pMessageListener);
-        }
-      }
-
-      m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
-      m_pullmsgThread.reset(new boost::thread(boost::bind(
-          &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
-
-      copySubscription();
-
-      //<! registe;
-      bool registerOK = getFactory()->registerConsumer(this);
-      if (!registerOK) {
-        m_serviceState = CREATE_JUST;
-        THROW_MQEXCEPTION(
-            MQClientException,
-            "The cousumer group[" + getGroupName() +
-                "] has been created before, specify another name please.",
-            -1);
-      }
-
-      //<!msg model;
-      switch (getMessageModel()) {
-        case BROADCASTING:
-          m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
-          break;
-        case CLUSTERING:
-          m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
-          break;
-      }
-      bool bStartFailed = false;
-      string errorMsg;
-      try {
-        m_pOffsetStore->load();
-      } catch (MQClientException& e) {
-        bStartFailed = true;
-        errorMsg = std::string(e.what());
-      }
-      m_consumerService->start();
-
-      getFactory()->start();
-
-      updateTopicSubscribeInfoWhenSubscriptionChanged();
-      getFactory()->sendHeartbeatToAllBroker();
-
-      m_serviceState = RUNNING;
-      if (bStartFailed) {
-        shutdown();
-        THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
-      }
-      break;
-    }
-    case RUNNING:
-    case START_FAILED:
-    case SHUTDOWN_ALREADY:
-      break;
-    default:
-      break;
-  }
-
-  getFactory()->rebalanceImmediately();
-}
-
-void DefaultMQPushConsumer::shutdown() {
-  switch (m_serviceState) {
-    case RUNNING: {
-      LOG_INFO("DefaultMQPushConsumer shutdown");
-      m_async_ioService.stop();
-      m_async_service_thread->interrupt();
-      m_async_service_thread->join();
-      m_pullmsgQueue->close();
-      m_pullmsgThread->interrupt();
-      m_pullmsgThread->join();
-      m_consumerService->shutdown();
-      persistConsumerOffset();
-      shutdownAsyncPullCallBack();  // delete aync pullMsg resources
-      getFactory()->unregisterConsumer(this);
-      getFactory()->shutdown();
-      m_serviceState = SHUTDOWN_ALREADY;
-      break;
-    }
-    case CREATE_JUST:
-    case SHUTDOWN_ALREADY:
-      break;
-    default:
-      break;
-  }
-}
-
-void DefaultMQPushConsumer::registerMessageListener(
-    MQMessageListener* pMessageListener) {
-  if (NULL != pMessageListener) {
-    m_pMessageListener = pMessageListener;
-  }
-}
-
-MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
-  if (NULL != m_pMessageListener) {
-    return m_pMessageListener->getMessageListenerType();
-  }
-  return messageListenerDefaultly;
-}
-
-ConsumeMsgService* DefaultMQPushConsumer::getConsumerMsgService() const {
-  return m_consumerService;
-}
-
-OffsetStore* DefaultMQPushConsumer::getOffsetStore() const {
-  return m_pOffsetStore;
-}
-
-Rebalance* DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }
-
-void DefaultMQPushConsumer::subscribe(const string& topic,
-                                      const string& subExpression) {
-  m_subTopics[topic] = subExpression;
-}
-
-void DefaultMQPushConsumer::checkConfig() {
-  string groupname = getGroupName();
-  // check consumerGroup
-  Validators::checkGroup(groupname);
-
-  // consumerGroup
-  if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
-    THROW_MQEXCEPTION(MQClientException,
-                      "consumerGroup can not equal DEFAULT_CONSUMER", -1);
-  }
-
-  if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
-    THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
-  }
-
-  if (m_pMessageListener == NULL) {
-    THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
-  }
-}
-
-void DefaultMQPushConsumer::copySubscription() {
-  map<string, string>::iterator it = m_subTopics.begin();
-  for (; it != m_subTopics.end(); ++it) {
-    LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),
-             it->second.c_str());
-    unique_ptr<SubscriptionData> pSData(
-        FilterAPI::buildSubscriptionData(it->first, it->second));
-
-    m_pRebalance->setSubscriptionData(it->first, pSData.release());
-  }
-
-  switch (getMessageModel()) {
-    case BROADCASTING:
-      break;
-    case CLUSTERING: {
-      string retryTopic = UtilAll::getRetryTopic(getGroupName());
-
-      //<!this sub;
-      unique_ptr<SubscriptionData> pSData(
-          FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
-
-      m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
-      break;
-    }
-    default:
-      break;
-  }
-}
-
-void DefaultMQPushConsumer::updateTopicSubscribeInfo(
-    const string& topic, vector<MQMessageQueue>& info) {
-  m_pRebalance->setTopicSubscribeInfo(topic, info);
-}
-
-void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
-  map<string, SubscriptionData*>& subTable =
-      m_pRebalance->getSubscriptionInner();
-  map<string, SubscriptionData*>::iterator it = subTable.begin();
-  for (; it != subTable.end(); ++it) {
-    bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(
-        it->first, getSessionCredentials());
-    if (btopic == false) {
-      LOG_WARN("The topic:[%s] not exist", it->first.c_str());
-    }
-  }
-}
-
-ConsumeType DefaultMQPushConsumer::getConsumeType() {
-  return CONSUME_PASSIVELY;
-}
-
-ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
-  return m_consumeFromWhere;
-}
-
-void DefaultMQPushConsumer::setConsumeFromWhere(
-    ConsumeFromWhere consumeFromWhere) {
-  m_consumeFromWhere = consumeFromWhere;
-}
-
-void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData>& result) {
-  map<string, SubscriptionData*>& subTable =
-      m_pRebalance->getSubscriptionInner();
-  map<string, SubscriptionData*>::iterator it = subTable.begin();
-  for (; it != subTable.end(); ++it) {
-    result.push_back(*(it->second));
-  }
-}
-
-void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue& mq,
-                                                int64 offset) {
-  if (offset >= 0) {
-    m_pOffsetStore->updateOffset(mq, offset);
-  } else {
-    LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
-  }
-}
-void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue& mq) {
-  m_pOffsetStore->removeOffset(mq);
-}
-
-void DefaultMQPushConsumer::triggerNextPullRequest(
-    boost::asio::deadline_timer* t, PullRequest* request) {
-  // LOG_INFO("trigger pullrequest for:%s",
-  // (request->m_messageQueue).toString().c_str());
-  producePullMsgTask(request);
-  deleteAndZero(t);
-}
-
-void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) {
-  if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
-    request->addPullMsgEvent();
-    if (m_asyncPull) {
-      m_pullmsgQueue->produce(TaskBinder::gen(
-          &DefaultMQPushConsumer::pullMessageAsync, this, request));
-    } else {
-      m_pullmsgQueue->produce(
-          TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
-    }
-  } else {
-    LOG_WARN("produce pullmsg of mq:%s failed",
-             request->m_messageQueue.toString().c_str());
-  }
-}
-
-void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) {
-  pTaskQueue->run();
-}
-
-void DefaultMQPushConsumer::pullMessage(PullRequest* request) {
-  if (request == NULL || request->isDroped()) {
-    LOG_WARN("Pull request is set drop with mq:%s, return",
-             (request->m_messageQueue).toString().c_str());
-    request->removePullMsgEvent();
-    return;
-  }
-
-  MQMessageQueue& messageQueue = request->m_messageQueue;
-  if (m_consumerService->getConsumeMsgSerivceListenerType() ==
-      messageListenerOrderly) {
-    if (!request->isLocked() || request->isLockExpired()) {
-      if (!m_pRebalance->lock(messageQueue)) {
-        producePullMsgTask(request);
-        return;
-      }
-    }
-  }
-
-  if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
-    // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
-    // than:%d",  (request->m_messageQueue).toString().c_str(),
-    // request->getCacheMsgCount(), m_maxMsgCacheSize);
-    boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
-        m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
-    t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
-                              this, t, request));
-    return;
-  }
-
-  bool commitOffsetEnable = false;
-  int64 commitOffsetValue = 0;
-  if (CLUSTERING == getMessageModel()) {
-    commitOffsetValue = m_pOffsetStore->readOffset(
-        messageQueue, READ_FROM_MEMORY, getSessionCredentials());
-    if (commitOffsetValue > 0) {
-      commitOffsetEnable = true;
-    }
-  }
-
-  string subExpression;
-  SubscriptionData* pSdata =
-      m_pRebalance->getSubscriptionData(messageQueue.getTopic());
-  if (pSdata == NULL) {
-    producePullMsgTask(request);
-    return;
-  }
-  subExpression = pSdata->getSubString();
-
-  int sysFlag =
-      PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
-                                false,                   // suspend
-                                !subExpression.empty(),  // subscription
-                                false);                  // class filter
-
-  try {
-    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
-    unique_ptr<PullResult> result(
-        m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
-                                          subExpression,             // 2
-                                          pSdata->getSubVersion(),   // 3
-                                          request->getNextOffset(),  // 4
-                                          32,                        // 5
-                                          sysFlag,                   // 6
-                                          commitOffsetValue,         // 7
-                                          1000 * 15,                 // 8
-                                          1000 * 30,                 // 9
-                                          ComMode_SYNC,              // 10
-                                          NULL, getSessionCredentials()));
-
-    PullResult pullResult = m_pPullAPIWrapper->processPullResult(
-        messageQueue, result.get(), pSdata);
-
-    switch (pullResult.pullStatus) {
-      case FOUND: {
-        if (!request->isDroped())  // if request is setted to dropped, don't add
-                                   // msgFoundList to m_msgTreeMap and don't
-                                   // call producePullMsgTask
-        {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
-           // and this request is dropped, and then received pulled msgs.
-          request->setNextOffset(pullResult.nextBeginOffset);
-          request->putMessage(pullResult.msgFoundList);
-
-          m_consumerService->submitConsumeRequest(request,
-                                                  pullResult.msgFoundList);
-          producePullMsgTask(request);
-
-          LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld",
-                    messageQueue.toString().c_str(),
-                    pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
-        } else {
-          request->removePullMsgEvent();
-        }
-        break;
-      }
-      case NO_NEW_MSG: {
-        request->setNextOffset(pullResult.nextBeginOffset);
-        vector<MQMessageExt> msgs;
-        request->getMessage(msgs);
-        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
-          is kept, then consumer will enter following situation:
-          1>. get pull offset with 0 when do rebalance, and set
-          m_offsetTable[mq] to 0;
-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
-          offset increase by 800
-          3>. request->getMessage(msgs) always NULL
-          4>. we need update consumerOffset to nextBeginOffset indicated by
-          broker
-          but if really no new msg could be pulled, also go to this CASE
-       */
-          // LOG_DEBUG("maybe misMatch between broker and client happens, update
-          // consumerOffset to nextBeginOffset indicated by broker");
-          updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
-        }
-        producePullMsgTask(request);
-        LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
-                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);
-        break;
-      }
-      case NO_MATCHED_MSG: {
-        request->setNextOffset(pullResult.nextBeginOffset);
-        vector<MQMessageExt> msgs;
-        request->getMessage(msgs);
-        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
-          // LOG_DEBUG("maybe misMatch between broker and client happens, update
-          // consumerOffset to nextBeginOffset indicated by broker");
-          updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
-        }
-        producePullMsgTask(request);
-
-        LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
-                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);
-        break;
-      }
-      case OFFSET_ILLEGAL: {
-        request->setNextOffset(pullResult.nextBeginOffset);
-        producePullMsgTask(request);
-
-        LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
-                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);
-        break;
-      }
-      case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
-                              // will not returns this status, so this case
-                              // could not be entered.
-        LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
-        request->setNextOffset(pullResult.nextBeginOffset);
-        producePullMsgTask(request);
-        break;
-      }
-    }
-  } catch (MQException& e) {
-    LOG_ERROR(e.what());
-    producePullMsgTask(request);
-  }
-}
-
-AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(
-    PullRequest* request, MQMessageQueue msgQueue) {
-  boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
-  if (m_asyncPull && request) {
-    PullMAP::iterator it = m_PullCallback.find(msgQueue);
-    if (it == m_PullCallback.end()) {
-      LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
-      m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
-    }
-    return m_PullCallback[msgQueue];
-  }
-
-  return NULL;
-}
-
-void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
-  boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
-  if (m_asyncPull) {
-    PullMAP::iterator it = m_PullCallback.begin();
-    for (; it != m_PullCallback.end(); ++it) {
-      if (it->second) {
-        it->second->setShutdownStatus();
-      } else {
-        LOG_ERROR("could not find asyncPullCallback for:%s",
-                  it->first.toString().c_str());
-      }
-    }
-  }
-}
-
-void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
-  if (request == NULL || request->isDroped()) {
-    LOG_WARN("Pull request is set drop with mq:%s, return",
-             (request->m_messageQueue).toString().c_str());
-    request->removePullMsgEvent();
-    return;
-  }
-
-  MQMessageQueue& messageQueue = request->m_messageQueue;
-  if (m_consumerService->getConsumeMsgSerivceListenerType() ==
-      messageListenerOrderly) {
-    if (!request->isLocked() || request->isLockExpired()) {
-      if (!m_pRebalance->lock(messageQueue)) {
-        producePullMsgTask(request);
-        return;
-      }
-    }
-  }
-
-  if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
-    // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
-    // than:%d",  (request->m_messageQueue).toString().c_str(),
-    // request->getCacheMsgCount(), m_maxMsgCacheSize);
-    boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
-        m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
-    t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
-                              this, t, request));
-    return;
-  }
-
-  bool commitOffsetEnable = false;
-  int64 commitOffsetValue = 0;
-  if (CLUSTERING == getMessageModel()) {
-    commitOffsetValue = m_pOffsetStore->readOffset(
-        messageQueue, READ_FROM_MEMORY, getSessionCredentials());
-    if (commitOffsetValue > 0) {
-      commitOffsetEnable = true;
-    }
-  }
-
-  string subExpression;
-  SubscriptionData* pSdata =
-      (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
-  if (pSdata == NULL) {
-    producePullMsgTask(request);
-    return;
-  }
-  subExpression = pSdata->getSubString();
-
-  int sysFlag =
-      PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
-                                true,                    // suspend
-                                !subExpression.empty(),  // subscription
-                                false);                  // class filter
-
-  AsyncArg arg;
-  arg.mq = messageQueue;
-  arg.subData = *pSdata;
-  arg.pPullWrapper = m_pPullAPIWrapper;
-
-  try {
-    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
-    m_pPullAPIWrapper->pullKernelImpl(
-        messageQueue,                                 // 1
-        subExpression,                                // 2
-        pSdata->getSubVersion(),                      // 3
-        request->getNextOffset(),                     // 4
-        32,                                           // 5
-        sysFlag,                                      // 6
-        commitOffsetValue,                            // 7
-        1000 * 15,                                    // 8
-        m_asyncPullTimeout,                           // 9
-        ComMode_ASYNC,                                // 10
-        getAsyncPullCallBack(request, messageQueue),  // 11
-        getSessionCredentials(),                      // 12
-        &arg);                                        // 13
-  } catch (MQException& e) {
-    LOG_ERROR(e.what());
-    producePullMsgTask(request);
-  }
-}
-
-void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
-  if (asyncFlag) {
-    LOG_INFO("set pushConsumer:%s to async default pull mode",
-             getGroupName().c_str());
-  } else {
-    LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
-  }
-  m_asyncPull = asyncFlag;
-}
-
-void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
-  if (threadCount > 0) {
-    m_consumeThreadCount = threadCount;
-  } else {
-    LOG_ERROR("setConsumeThreadCount with invalid value");
-  }
-}
-
-int DefaultMQPushConsumer::getConsumeThreadCount() const {
-  return m_consumeThreadCount;
-}
-
-void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
-  m_pullMsgThreadPoolNum = threadCount;
-}
-
-int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
-  return m_pullMsgThreadPoolNum;
-}
-
-int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
-  return m_consumeMessageBatchMaxSize;
-}
-
-void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(
-    int consumeMessageBatchMaxSize) {
-  if (consumeMessageBatchMaxSize >= 1)
-    m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
-}
-
-void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
-  if (maxCacheSize > 0 && maxCacheSize < 65535) {
-    LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,
-             getGroupName().c_str());
-    m_maxMsgCacheSize = maxCacheSize;
-  }
-}
-
-int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
-  return m_maxMsgCacheSize;
-}
-
-ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() {
-  ConsumerRunningInfo* info = new ConsumerRunningInfo();
-  if (info) {
-    if (m_consumerService->getConsumeMsgSerivceListenerType() ==
-        messageListenerOrderly)
-      info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
-    else
-      info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
-    info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
-                      UtilAll::to_string(m_consumeThreadCount));
-    info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,
-                      UtilAll::to_string(m_startTime));
-
-    vector<SubscriptionData> result;
-    getSubscriptions(result);
-    info->setSubscriptionSet(result);
-
-    map<MQMessageQueue, PullRequest*> requestTable =
-        m_pRebalance->getPullRequestTable();
-    map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();
-
-    for (; it != requestTable.end(); ++it) {
-      if (!it->second->isDroped()) {
-        map<MessageQueue, ProcessQueueInfo> queueTable;
-        MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),
-                           (it->first).getQueueId());
-        ProcessQueueInfo processQueue;
-        processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
-        processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
-        processQueue.cachedMsgCount = it->second->getCacheMsgCount();
-        processQueue.setCommitOffset(m_pOffsetStore->readOffset(
-            it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
-        processQueue.setDroped(it->second->isDroped());
-        processQueue.setLocked(it->second->isLocked());
-        processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
-        processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
-        processQueue.lastConsumeTimestamp =
-            it->second->getLastConsumeTimestamp();
-        info->setMqTable(queue, processQueue);
-      }
-    }
-
-    return info;
-  }
-  return NULL;
-}
-
-//<!************************************************************************
-}  //<!end namespace;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQPushConsumer.h"
+#include "CommunicationMode.h"
+#include "ConsumeMsgService.h"
+#include "ConsumerRunningInfo.h"
+#include "FilterAPI.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "MQProtos.h"
+#include "OffsetStore.h"
+#include "PullAPIWrapper.h"
+#include "PullSysFlag.h"
+#include "Rebalance.h"
+#include "UtilAll.h"
+#include "Validators.h"
+#include "task_queue.h"
+
+namespace rocketmq {
+
+    class AsyncPullCallback : public PullCallback {
+    public:
+        AsyncPullCallback(DefaultMQPushConsumer *pushConsumer, PullRequest *request)
+                : m_callbackOwner(pushConsumer),
+                  m_pullRequest(request),
+                  m_bShutdown(false) {}
+
+        virtual ~AsyncPullCallback() {
+            m_callbackOwner = NULL;
+            m_pullRequest = NULL;
+        }
+
+        virtual void onSuccess(MQMessageQueue &mq, PullResult &result,
+                               bool bProducePullRequest) {
+            if (m_bShutdown == true) {
+                LOG_INFO("pullrequest for:%s in shutdown, return",
+                         (m_pullRequest->m_messageQueue).toString().c_str());
+                m_pullRequest->removePullMsgEvent();
+                return;
+            }
+
+            switch (result.pullStatus) {
+                case FOUND: {
+                    if (!m_pullRequest->isDroped())  // if request is setted to dropped,
+                        // don't add msgFoundList to
+                        // m_msgTreeMap and don't call
+                        // producePullMsgTask
+                    {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+                        // and this request is dropped, and then received pulled msgs.
+                        m_pullRequest->setNextOffset(result.nextBeginOffset);
+                        m_pullRequest->putMessage(result.msgFoundList);
+
+                        m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(
+                                m_pullRequest, result.msgFoundList);
+
+                        if (bProducePullRequest)
+                            m_callbackOwner->producePullMsgTask(m_pullRequest);
+                        else
+                            m_pullRequest->removePullMsgEvent();
+
+                        LOG_DEBUG("FOUND:%s with size:"
+                                          SIZET_FMT
+                                          ", nextBeginOffset:%lld",
+                                  (m_pullRequest->m_messageQueue).toString().c_str(),
+                                  result.msgFoundList.size(), result.nextBeginOffset);
+                    } else {
+                        LOG_INFO("remove pullmsg event of mq:%s",
+                                 (m_pullRequest->m_messageQueue).toString().c_str());
+                        m_pullRequest->removePullMsgEvent();
+                    }
+                    break;
+                }
+                case NO_NEW_MSG: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+                    vector<MQMessageExt> msgs;
+                    m_pullRequest->getMessage(msgs);
+                    if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+
+                        LOG_INFO("maybe misMatch between broker and client happens, update
+                        consumerOffset to nextBeginOffset indicated by broker");*/
+                        m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+                                                             result.nextBeginOffset);
+                    }
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+
+                    /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case NO_MATCHED_MSG: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+                    vector<MQMessageExt> msgs;
+                    m_pullRequest->getMessage(msgs);
+                    if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+
+                        LOG_INFO("maybe misMatch between broker and client happens, update
+                        consumerOffset to nextBeginOffset indicated by broker");*/
+                        m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+                                                             result.nextBeginOffset);
+                    }
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+                    /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case OFFSET_ILLEGAL: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+
+                    /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
+                    // will not returns this status, so this case
+                    // could not be entered.
+                    LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+                    break;
+                }
+            }
+        }
+
+        virtual void onException(MQException &e) {
+            if (m_bShutdown == true) {
+                LOG_INFO("pullrequest for:%s in shutdown, return",
+                         (m_pullRequest->m_messageQueue).toString().c_str());
+                m_pullRequest->removePullMsgEvent();
+                return;
+            }
+            LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
+                     (m_pullRequest->m_messageQueue).toString().c_str());
+            m_callbackOwner->producePullMsgTask(m_pullRequest);
+        }
+
+        void setShutdownStatus() { m_bShutdown = true; }
+
+    private:
+        DefaultMQPushConsumer *m_callbackOwner;
+        PullRequest *m_pullRequest;
+        bool m_bShutdown;
+    };
+
+//<!***************************************************************************
+    static boost::mutex m_asyncCallbackLock;
+
+    DefaultMQPushConsumer::DefaultMQPushConsumer(const string &groupname)
+            : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
+              m_pOffsetStore(NULL),
+              m_pPullAPIWrapper(NULL),
+              m_pMessageListener(NULL),
+              m_consumeMessageBatchMaxSize(1),
+              m_maxMsgCacheSize(1000) {
+        //<!set default group name;
+        string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
+        setGroupName(gname);
+        m_asyncPull = true;
+        m_asyncPullTimeout = 30 * 1000;
+        setMessageModel(CLUSTERING);
+
+        m_startTime = UtilAll::currentTimeMillis();
+        m_consumeThreadCount = boost::thread::hardware_concurrency();
+        m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
+        m_async_service_thread.reset(new boost::thread(
+                boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
+    }
+
+    void DefaultMQPushConsumer::boost_asio_work() {
+        LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
+        boost::asio::io_service::work work(m_async_ioService);  // avoid async io
+        // service stops after
+        // first timer timeout
+        // callback
+        m_async_ioService.run();
+    }
+
+    DefaultMQPushConsumer::~DefaultMQPushConsumer() {
+        m_pMessageListener = NULL;
+        if (m_pullmsgQueue != NULL) {
+            deleteAndZero(m_pullmsgQueue);
+        }
+        if (m_pRebalance != NULL) {
+            deleteAndZero(m_pRebalance);
+        }
+        if (m_pOffsetStore != NULL) {
+            deleteAndZero(m_pOffsetStore);
+        }
+        if (m_pPullAPIWrapper != NULL) {
+            deleteAndZero(m_pPullAPIWrapper);
+        }
+        if (m_consumerService != NULL) {
+            deleteAndZero(m_consumerService);
+        }
+        PullMAP::iterator it = m_PullCallback.begin();
+        for (; it != m_PullCallback.end(); ++it) {
+            deleteAndZero(it->second);
+        }
+        m_PullCallback.clear();
+        m_subTopics.clear();
+    }
+
+    void DefaultMQPushConsumer::sendMessageBack(MQMessageExt &msg, int delayLevel) {
+        try {
+            getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(
+                    msg, getGroupName(), delayLevel, 3000, getSessionCredentials());
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+        }
+    }
+
+    void DefaultMQPushConsumer::fetchSubscribeMessageQueues(
+            const string &topic, vector<MQMessageQueue> &mqs) {
+        mqs.clear();
+        try {
+            getFactory()->fetchSubscribeMessageQueues(topic, mqs,
+                                                      getSessionCredentials());
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+        }
+    }
+
+    void DefaultMQPushConsumer::doRebalance() {
+        if (isServiceStateOk()) {
+            try {
+                m_pRebalance->doRebalance();
+            } catch (MQException &e) {
+                LOG_ERROR(e.what());
+            }
+        }
+    }
+
+    void DefaultMQPushConsumer::persistConsumerOffset() {
+        if (isServiceStateOk()) {
+            m_pRebalance->persistConsumerOffset();
+        }
+    }
+
+    void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
+        if (isServiceStateOk()) {
+            m_pRebalance->persistConsumerOffsetByResetOffset();
+        }
+    }
+
+    void DefaultMQPushConsumer::start() {
+#ifndef WIN32
+        /* Ignore the SIGPIPE */
+        struct sigaction sa;
+        memset(&sa,0, sizeof(struct sigaction));
+        sa.sa_handler = SIG_IGN;
+        sa.sa_flags = 0;
+        sigaction(SIGPIPE, &sa, 0);
+#endif
+        switch (m_serviceState) {
+            case CREATE_JUST: {
+                m_serviceState = START_FAILED;
+                MQClient::start();
+                LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
+
+                //<!data;
+                checkConfig();
+
+                //<!create rebalance;
+                m_pRebalance = new RebalancePush(this, getFactory());
+
+                string groupname = getGroupName();
+                m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
+
+                if (m_pMessageListener) {
+                    if (m_pMessageListener->getMessageListenerType() ==
+                        messageListenerOrderly) {
+                        LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
+                        m_consumerService = new ConsumeMessageOrderlyService(
+                                this, m_consumeThreadCount, m_pMessageListener);
+                    } else  // for backward compatible, defaultly and concurrently listeners
+                        // are allocating ConsumeMessageConcurrentlyService
+                    {
+                        LOG_INFO("start concurrently consume service:%s",
+                                 getGroupName().c_str());
+                        m_consumerService = new ConsumeMessageConcurrentlyService(
+                                this, m_consumeThreadCount, m_pMessageListener);
+                    }
+                }
+
+                m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
+                m_pullmsgThread.reset(new boost::thread(boost::bind(
+                        &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
+
+                copySubscription();
+
+                //<! registe;
+                bool registerOK = getFactory()->registerConsumer(this);
+                if (!registerOK) {
+                    m_serviceState = CREATE_JUST;
+                    THROW_MQEXCEPTION(
+                            MQClientException,
+                            "The cousumer group[" + getGroupName() +
+                            "] has been created before, specify another name please.",
+                            -1);
+                }
+
+                //<!msg model;
+                switch (getMessageModel()) {
+                    case BROADCASTING:
+                        m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
+                        break;
+                    case CLUSTERING:
+                        m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
+                        break;
+                }
+                bool bStartFailed = false;
+                string errorMsg;
+                try {
+                    m_pOffsetStore->load();
+                } catch (MQClientException &e) {
+                    bStartFailed = true;
+                    errorMsg = std::string(e.what());
+                }
+                m_consumerService->start();
+
+                getFactory()->start();
+
+                updateTopicSubscribeInfoWhenSubscriptionChanged();
+                getFactory()->sendHeartbeatToAllBroker();
+
+                m_serviceState = RUNNING;
+                if (bStartFailed) {
+                    shutdown();
+                    THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
+                }
+                break;
+            }
+            case RUNNING:
+            case START_FAILED:
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+
+        getFactory()->rebalanceImmediately();
+    }
+
+    void DefaultMQPushConsumer::shutdown() {
+        switch (m_serviceState) {
+            case RUNNING: {
+                LOG_INFO("DefaultMQPushConsumer shutdown");
+                m_async_ioService.stop();
+                m_async_service_thread->interrupt();
+                m_async_service_thread->join();
+                m_pullmsgQueue->close();
+                m_pullmsgThread->interrupt();
+                m_pullmsgThread->join();
+                m_consumerService->shutdown();
+                persistConsumerOffset();
+                shutdownAsyncPullCallBack();  // delete aync pullMsg resources
+                getFactory()->unregisterConsumer(this);
+                getFactory()->shutdown();
+                m_serviceState = SHUTDOWN_ALREADY;
+                break;
+            }
+            case CREATE_JUST:
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+    }
+
+    void DefaultMQPushConsumer::registerMessageListener(
+            MQMessageListener *pMessageListener) {
+        if (NULL != pMessageListener) {
+            m_pMessageListener = pMessageListener;
+        }
+    }
+
+    MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
+        if (NULL != m_pMessageListener) {
+            return m_pMessageListener->getMessageListenerType();
+        }
+        return messageListenerDefaultly;
+    }
+
+    ConsumeMsgService *DefaultMQPushConsumer::getConsumerMsgService() const {
+        return m_consumerService;
+    }
+
+    OffsetStore *DefaultMQPushConsumer::getOffsetStore() const {
+        return m_pOffsetStore;
+    }
+
+    Rebalance *DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }
+
+    void DefaultMQPushConsumer::subscribe(const string &topic,
+                                          const string &subExpression) {
+        m_subTopics[topic] = subExpression;
+    }
+
+    void DefaultMQPushConsumer::checkConfig() {
+        string groupname = getGroupName();
+        // check consumerGroup
+        Validators::checkGroup(groupname);
+
+        // consumerGroup
+        if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
+            THROW_MQEXCEPTION(MQClientException,
+                              "consumerGroup can not equal DEFAULT_CONSUMER", -1);
+        }
+
+        if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
+            THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
+        }
+
+        if (m_pMessageListener == NULL) {
+            THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
+        }
+    }
+
+    void DefaultMQPushConsumer::copySubscription() {
+        map<string, string>::iterator it = m_subTopics.begin();
+        for (; it != m_subTopics.end(); ++it) {
+            LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),
+                     it->second.c_str());
+            unique_ptr<SubscriptionData> pSData(
+                    FilterAPI::buildSubscriptionData(it->first, it->second));
+
+            m_pRebalance->setSubscriptionData(it->first, pSData.release());
+        }
+
+        switch (getMessageModel()) {
+            case BROADCASTING:
+                break;
+            case CLUSTERING: {
+                string retryTopic = UtilAll::getRetryTopic(getGroupName());
+
+                //<!this sub;
+                unique_ptr<SubscriptionData> pSData(
+                        FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
+
+                m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
+                break;
+            }
+            default:
+                break;
+        }
+    }
+
+    void DefaultMQPushConsumer::updateTopicSubscribeInfo(
+            const string &topic, vector<MQMessageQueue> &info) {
+        m_pRebalance->setTopicSubscribeInfo(topic, info);
+    }
+
+    void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        map<string, SubscriptionData *> &subTable =
+                m_pRebalance->getSubscriptionInner();
+        map<string, SubscriptionData *>::iterator it = subTable.begin();
+        for (; it != subTable.end(); ++it) {
+            bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(
+                    it->first, getSessionCredentials());
+            if (btopic == false) {
+                LOG_WARN("The topic:[%s] not exist", it->first.c_str());
+            }
+        }
+    }
+
+    ConsumeType DefaultMQPushConsumer::getConsumeType() {
+        return CONSUME_PASSIVELY;
+    }
+
+    ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
+        return m_consumeFromWhere;
+    }
+
+    void DefaultMQPushConsumer::setConsumeFromWhere(
+            ConsumeFromWhere consumeFromWhere) {
+        m_consumeFromWhere = consumeFromWhere;
+    }
+
+    void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData> &result) {
+        map<string, SubscriptionData *> &subTable =
+                m_pRebalance->getSubscriptionInner();
+        map<string, SubscriptionData *>::iterator it = subTable.begin();
+        for (; it != subTable.end(); ++it) {
+            result.push_back(*(it->second));
+        }
+    }
+
+    void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue &mq,
+                                                    int64 offset) {
+        if (offset >= 0) {
+            m_pOffsetStore->updateOffset(mq, offset);
+        } else {
+            LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
+        }
+    }
+
+    void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue &mq) {
+        m_pOffsetStore->removeOffset(mq);
+    }
+
+    void DefaultMQPushConsumer::triggerNextPullRequest(
+            boost::asio::deadline_timer *t, PullRequest *request) {
+        // LOG_INFO("trigger pullrequest for:%s",
+        // (request->m_messageQueue).toString().c_str());
+        producePullMsgTask(request);
+        deleteAndZero(t);
+    }
+
+    void DefaultMQPushConsumer::producePullMsgTask(PullRequest *request) {
+        if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+            request->addPullMsgEvent();
+            if (m_asyncPull) {
+                m_pullmsgQueue->produce(TaskBinder::gen(
+                        &DefaultMQPushConsumer::pullMessageAsync, this, request));
+            } else {
+                m_pullmsgQueue->produce(
+                        TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
+            }
+        } else {
+            LOG_WARN("produce pullmsg of mq:%s failed",
+                     request->m_messageQueue.toString().c_str());
+        }
+    }
+
+    void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue *pTaskQueue) {
+        pTaskQueue->run();
+    }
+
+    void DefaultMQPushConsumer::pullMessage(PullRequest *request) {
+        if (request == NULL) {
+            LOG_ERROR("Pull request is NULL, return");
+            return;
+        }
+        if (request->isDroped()) {
+            LOG_WARN("Pull request is set drop with mq:%s, return",
+                     (request->m_messageQueue).toString().c_str());
+            request->removePullMsgEvent();
+            return;
+        }
+
+        MQMessageQueue &messageQueue = request->m_messageQueue;
+        if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+            messageListenerOrderly) {
+            if (!request->isLocked() || request->isLockExpired()) {
+                if (!m_pRebalance->lock(messageQueue)) {
+                    producePullMsgTask(request);
+                    return;
+                }
+            }
+        }
+
+        if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+            // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+            // than:%d",  (request->m_messageQueue).toString().c_str(),
+            // request->getCacheMsgCount(), m_maxMsgCacheSize);
+            boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+                    m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+            t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+                                      this, t, request));
+            return;
+        }
+
+        bool commitOffsetEnable = false;
+        int64 commitOffsetValue = 0;
+        if (CLUSTERING == getMessageModel()) {
+            commitOffsetValue = m_pOffsetStore->readOffset(
+                    messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+            if (commitOffsetValue > 0) {
+                commitOffsetEnable = true;
+            }
+        }
+
+        string subExpression;
+        SubscriptionData *pSdata =
+                m_pRebalance->getSubscriptionData(messageQueue.getTopic());
+        if (pSdata == NULL) {
+            producePullMsgTask(request);
+            return;
+        }
+        subExpression = pSdata->getSubString();
+
+        int sysFlag =
+                PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
+                                          false,                   // suspend
+                                          !subExpression.empty(),  // subscription
+                                          false);                  // class filter
+
+        try {
+            request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+            unique_ptr<PullResult> result(
+                    m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
+                                                      subExpression,             // 2
+                                                      pSdata->getSubVersion(),   // 3
+                                                      request->getNextOffset(),  // 4
+                                                      32,                        // 5
+                                                      sysFlag,                   // 6
+                                                      commitOffsetValue,         // 7
+                                                      1000 * 15,                 // 8
+                                                      1000 * 30,                 // 9
+                                                      ComMode_SYNC,              // 10
+                                                      NULL, getSessionCredentials()));
+
+            PullResult pullResult = m_pPullAPIWrapper->processPullResult(
+                    messageQueue, result.get(), pSdata);
+
+            switch (pullResult.pullStatus) {
+                case FOUND: {
+                    if (!request->isDroped())  // if request is setted to dropped, don't add
+                        // msgFoundList to m_msgTreeMap and don't
+                        // call producePullMsgTask
+                    {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+                        // and this request is dropped, and then received pulled msgs.
+                        request->setNextOffset(pullResult.nextBeginOffset);
+                        request->putMessage(pullResult.msgFoundList);
+
+                        m_consumerService->submitConsumeRequest(request,
+                                                                pullResult.msgFoundList);
+                        producePullMsgTask(request);
+
+                        LOG_DEBUG("FOUND:%s with size:"
+                                          SIZET_FMT
+                                          ",nextBeginOffset:%lld",
+                                  messageQueue.toString().c_str(),
+                                  pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
+                    } else {
+                        request->removePullMsgEvent();
+                    }
+                    break;
+                }
+                case NO_NEW_MSG: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    vector<MQMessageExt> msgs;
+                    request->getMessage(msgs);
+                    if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+                     */
+                        // LOG_DEBUG("maybe misMatch between broker and client happens, update
+                        // consumerOffset to nextBeginOffset indicated by broker");
+                        updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+                    }
+                    producePullMsgTask(request);
+                    LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case NO_MATCHED_MSG: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    vector<MQMessageExt> msgs;
+                    request->getMessage(msgs);
+                    if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+                        // LOG_DEBUG("maybe misMatch between broker and client happens, update
+                        // consumerOffset to nextBeginOffset indicated by broker");
+                        updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+                    }
+                    producePullMsgTask(request);
+
+                    LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case OFFSET_ILLEGAL: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    producePullMsgTask(request);
+
+                    LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
+                    // will not returns this status, so this case
+                    // could not be entered.
+                    LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    producePullMsgTask(request);
+                    break;
+                }
+            }
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+            producePullMsgTask(request);
+        }
+    }
+
+    AsyncPullCallback *DefaultMQPushConsumer::getAsyncPullCallBack(
+            PullRequest *request, MQMessageQueue msgQueue) {
+        boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+        if (m_asyncPull && request) {
+            PullMAP::iterator it = m_PullCallback.find(msgQueue);
+            if (it == m_PullCallback.end()) {
+                LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
+                m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
+            }
+            return m_PullCallback[msgQueue];
+        }
+
+        return NULL;
+    }
+
+    void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
+        boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+        if (m_asyncPull) {
+            PullMAP::iterator it = m_PullCallback.begin();
+            for (; it != m_PullCallback.end(); ++it) {
+                if (it->second) {
+                    it->second->setShutdownStatus();
+                } else {
+                    LOG_ERROR("could not find asyncPullCallback for:%s",
+                              it->first.toString().c_str());
+                }
+            }
+        }
+    }
+
+    void DefaultMQPushConsumer::pullMessageAsync(PullRequest *request) {
+        if (request == NULL) {
+            LOG_ERROR("Pull request is NULL, return");
+            return;
+        }
+        if (request->isDroped()) {
+            LOG_WARN("Pull request is set drop with mq:%s, return",
+                     (request->m_messageQueue).toString().c_str());
+            request->removePullMsgEvent();
+            return;
+        }
+
+        MQMessageQueue &messageQueue = request->m_messageQueue;
+        if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+            messageListenerOrderly) {
+            if (!request->isLocked() || request->isLockExpired()) {
+                if (!m_pRebalance->lock(messageQueue)) {
+                    producePullMsgTask(request);
+                    return;
+                }
+            }
+        }
+
+        if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+            // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+            // than:%d",  (request->m_messageQueue).toString().c_str(),
+            // request->getCacheMsgCount(), m_maxMsgCacheSize);
+            boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+                    m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+            t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+                                      this, t, request));
+            return;
+        }
+
+        bool commitOffsetEnable = false;
+        int64 commitOffsetValue = 0;
+        if (CLUSTERING == getMessageModel()) {
+            commitOffsetValue = m_pOffsetStore->readOffset(
+                    messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+            if (commitOffsetValue > 0) {
+                commitOffsetEnable = true;
+            }
+        }
+
+        string subExpression;
+        SubscriptionData *pSdata =
+                (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
+        if (pSdata == NULL) {
+            producePullMsgTask(request);
+            return;
+        }
+        subExpression = pSdata->getSubString();
+
+        int sysFlag =
+                PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
+                                          true,                    // suspend
+                                          !subExpression.empty(),  // subscription
+                                          false);                  // class filter
+
+        AsyncArg arg;
+        arg.mq = messageQueue;
+        arg.subData = *pSdata;
+        arg.pPullWrapper = m_pPullAPIWrapper;
+
+        try {
+            request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+            m_pPullAPIWrapper->pullKernelImpl(
+                    messageQueue,                                 // 1
+                    subExpression,                                // 2
+                    pSdata->getSubVersion(),                      // 3
+                    request->getNextOffset(),                     // 4
+                    32,                                           // 5
+                    sysFlag,                                      // 6
+                    commitOffsetValue,                            // 7
+                    1000 * 15,                                    // 8
+                    m_asyncPullTimeout,                           // 9
+                    ComMode_ASYNC,                                // 10
+                    getAsyncPullCallBack(request, messageQueue),  // 11
+                    getSessionCredentials(),                      // 12
+                    &arg);                                        // 13
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+            producePullMsgTask(request);
+        }
+    }
+
+    void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
+        if (asyncFlag) {
+            LOG_INFO("set pushConsumer:%s to async default pull mode",
+                     getGroupName().c_str());
+        } else {
+            LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
+        }
+        m_asyncPull = asyncFlag;
+    }
+
+    void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
+        if (threadCount > 0) {
+            m_consumeThreadCount = threadCount;
+        } else {
+            LOG_ERROR("setConsumeThreadCount with invalid value");
+        }
+    }
+
+    int DefaultMQPushConsumer::getConsumeThreadCount() const {
+        return m_consumeThreadCount;
+    }
+
+    void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
+        m_pullMsgThreadPoolNum = threadCount;
+    }
+
+    int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
+        return m_pullMsgThreadPoolNum;
+    }
+
+    int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
+        return m_consumeMessageBatchMaxSize;
+    }
+
+    void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(
+            int consumeMessageBatchMaxSize) {
+        if (consumeMessageBatchMaxSize >= 1)
+            m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+    }
+
+    void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
+        if (maxCacheSize > 0 && maxCacheSize < 65535) {
+            LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,
+                     getGroupName().c_str());
+            m_maxMsgCacheSize = maxCacheSize;
+        }
+    }
+
+    int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
+        return m_maxMsgCacheSize;
+    }
+
+    ConsumerRunningInfo *DefaultMQPushConsumer::getConsumerRunningInfo() {
+        ConsumerRunningInfo *info = new ConsumerRunningInfo();
+        if (info) {
+            if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+                messageListenerOrderly)
+                info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
+            else
+                info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
+            info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
+                              UtilAll::to_string(m_consumeThreadCount));
+            info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,
+                              UtilAll::to_string(m_startTime));
+
+            vector<SubscriptionData> result;
+            getSubscriptions(result);
+            info->setSubscriptionSet(result);
+
+            map<MQMessageQueue, PullRequest *> requestTable =
+                    m_pRebalance->getPullRequestTable();
+            map<MQMessageQueue, PullRequest *>::iterator it = requestTable.begin();
+
+            for (; it != requestTable.end(); ++it) {
+                if (!it->second->isDroped()) {
+                    map<MessageQueue, ProcessQueueInfo> queueTable;
+                    MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),
+                                       (it->first).getQueueId());
+                    ProcessQueueInfo processQueue;
+                    processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
+                    processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
+                    processQueue.cachedMsgCount = it->second->getCacheMsgCount();
+                    processQueue.setCommitOffset(m_pOffsetStore->readOffset(
+                            it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
+                    processQueue.setDroped(it->second->isDroped());
+                    processQueue.setLocked(it->second->isLocked());
+                    processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
+                    processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
+                    processQueue.lastConsumeTimestamp =
+                            it->second->getLastConsumeTimestamp();
+                    info->setMqTable(queue, processQueue);
+                }
+            }
+
+            return info;
+        }
+        return NULL;
+    }
+
+//<!************************************************************************
+}  //<!end namespace;
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index c62bd7d9..f7abfaf8 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -85,7 +85,7 @@ class PullRequest {
   map<int64, MQMessageExt> m_msgTreeMapTemp;
   boost::mutex m_pullRequestLock;
   uint64 m_lastLockTimestamp;  // ms
-  uint64 m_tryUnlockTimes;
+  //uint64 m_tryUnlockTimes;
   uint64 m_lastPullTimestamp;
   uint64 m_lastConsumeTimestamp;
   boost::timed_mutex m_consumeLock;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 1411ebdf..f0b53cfa 100755
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -37,7 +37,7 @@ DefaultMQProducer::DefaultMQProducer(const string& groupname)
     : m_sendMsgTimeout(3000),
       m_compressMsgBodyOverHowmuch(4 * 1024),
       m_maxMessageSize(1024 * 128),
-      m_retryAnotherBrokerWhenNotStoreOK(false),
+      //m_retryAnotherBrokerWhenNotStoreOK(false),
       m_compressLevel(5),
       m_retryTimes(5) {
   //<!set default group name;
@@ -50,7 +50,8 @@ DefaultMQProducer::~DefaultMQProducer() {}
 void DefaultMQProducer::start() {
 #ifndef WIN32
   /* Ignore the SIGPIPE */
-  struct sigaction sa = {0};
+  struct sigaction sa;
+  memset(&sa,0, sizeof(struct sigaction));
   sa.sa_handler = SIG_IGN;
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
diff --git a/src/thread/disruptor/sequence.h b/src/thread/disruptor/sequence.h
index b3e58761..4fb61a4f 100755
--- a/src/thread/disruptor/sequence.h
+++ b/src/thread/disruptor/sequence.h
@@ -102,9 +102,9 @@ class PaddedSequence : public Sequence {
     PaddedSequence(int64_t initial_value = kInitialCursorValue) :
             Sequence(initial_value) {}
 
- private:
+ //private:
     // padding
-    int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
+    //int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
 
 };
 
@@ -133,8 +133,8 @@ class PaddedLong : public MutableLong {
  public:
      PaddedLong(int64_t initial_value = kInitialCursorValue) :
          MutableLong(initial_value) {}
- private:
-     int64_t padding_[SEQUENCE_PADDING_LENGTH];
+ //private:
+     //int64_t padding_[SEQUENCE_PADDING_LENGTH];
 };
 
 int64_t GetMinimumSequence(
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 62cc19e0..011c4202 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -30,12 +30,12 @@ namespace rocketmq {
 TcpTransport::TcpTransport(TcpRemotingClient *pTcpRemointClient,
                            READ_CALLBACK handle /* = NULL */)
     : m_tcpConnectStatus(e_connectInit),
-      m_ReadDatathread(NULL),
-      m_readcallback(handle),
-      m_tcpRemotingClient(pTcpRemointClient),
       m_event_base_status(false),
       m_event_base_mtx(),
-      m_event_base_cv() {
+      m_event_base_cv(),
+      m_ReadDatathread(NULL),
+      m_readcallback(handle),
+      m_tcpRemotingClient(pTcpRemointClient){
   m_startTime = UtilAll::currentTimeMillis();
 #ifdef WIN32
   evthread_use_windows_threads();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services