You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:51 UTC

[05/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp code

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/Rebalance.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/Rebalance.cpp b/rocketmq-cpp/src/consumer/Rebalance.cpp
new file mode 100755
index 0000000..a19e7a7
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/Rebalance.cpp
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Rebalance.h"
+#include "DefaultMQPushConsumer.h"
+#include "LockBatchBody.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientFactory.h"
+#include "OffsetStore.h"
+
+namespace rocketmq {
+//<!************************************************************************
+Rebalance::Rebalance(MQConsumer* consumer, MQClientFactory* pfactory)
+    : m_pConsumer(consumer), m_pClientFactory(pfactory) {
+  m_pAllocateMQStrategy = new AllocateMQAveragely();
+}
+
+Rebalance::~Rebalance() {
+  {
+    map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
+    for (; it != m_subscriptionData.end(); ++it) deleteAndZero(it->second);
+    m_subscriptionData.clear();
+  }
+  {
+    MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+    for (; it != m_requestQueueTable.end(); ++it) {
+      delete it->second;
+      it->second = NULL;
+    }
+    m_requestQueueTable.clear();
+  }
+  m_topicSubscribeInfoTable.clear();
+  m_pConsumer = NULL;
+  m_pClientFactory = NULL;
+  deleteAndZero(m_pAllocateMQStrategy);
+}
+
+void Rebalance::doRebalance() {
+  LOG_DEBUG("start doRebalance");
+  try {
+    map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
+    for (; it != m_subscriptionData.end(); ++it) {
+      string topic = (it->first);
+      LOG_INFO("current topic is:%s", topic.c_str());
+      //<!topic -> mqs
+      vector<MQMessageQueue> mqAll;
+      if (!getTopicSubscribeInfo(topic, mqAll)) {
+        continue;
+      }
+      if (mqAll.empty()) {
+        if (!UtilAll::startsWith_retry(topic))
+          THROW_MQEXCEPTION(MQClientException, "doRebalance the topic is empty",
+                            -1);
+      }
+
+      //<!msg model;
+      switch (m_pConsumer->getMessageModel()) {
+        case BROADCASTING: {
+          bool changed = updateRequestTableInRebalance(topic, mqAll);
+          if (changed) {
+            messageQueueChanged(topic, mqAll, mqAll);
+          }
+          break;
+        }
+        case CLUSTERING: {
+          vector<string> cidAll;
+          m_pClientFactory->findConsumerIds(
+              topic, m_pConsumer->getGroupName(), cidAll,
+              m_pConsumer->getSessionCredentials());
+
+          if (cidAll.empty()) {
+            /*remove the droping pullRequest changes for recovery consume fastly
+                from network broken
+                //drop all pullRequest
+                MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+                for (; it != m_requestQueueTable.end(); ++it)
+                {
+                    if(!(it->second->isDroped()))
+                    {
+                        MQMessageQueue mqtemp = it->first;
+                        it->second->setDroped(true);
+                        removeUnnecessaryMessageQueue(mqtemp);
+                        it->second->clearAllMsgs();//add clear operation to
+            avoid bad
+                state when dropped pullRequest returns normal
+                        LOG_INFO("find consumer failed, drop undropped mq:%s",
+                mqtemp.toString().c_str());
+                    }
+            }*/
+
+            THROW_MQEXCEPTION(MQClientException,
+                              "doRebalance the cidAll is empty", -1);
+          }
+          // log
+          for (int i = 0; i < (int)cidAll.size(); ++i) {
+            LOG_INFO("client id:%s of topic:%s", cidAll[i].c_str(),
+                     topic.c_str());
+          }
+          //<! sort;
+          sort(mqAll.begin(), mqAll.end());
+          sort(cidAll.begin(), cidAll.end());
+
+          //<! allocate;
+          vector<MQMessageQueue> allocateResult;
+          try {
+            m_pAllocateMQStrategy->allocate(m_pConsumer->getMQClientId(), mqAll,
+                                            cidAll, allocateResult);
+          } catch (MQException& e) {
+            THROW_MQEXCEPTION(MQClientException, "allocate error", -1);
+          }
+
+          // log
+          for (int i = 0; i < (int)allocateResult.size(); ++i) {
+            LOG_INFO("allocate mq:%s", allocateResult[i].toString().c_str());
+          }
+
+          //<!update local;
+          bool changed = updateRequestTableInRebalance(topic, allocateResult);
+          if (changed) {
+            messageQueueChanged(topic, mqAll, allocateResult);
+            break;
+          }
+        }
+        default:
+          break;
+      }
+    }
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+  }
+}
+
+void Rebalance::persistConsumerOffset() {
+  DefaultMQPushConsumer* pConsumer =
+      static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
+  vector<MQMessageQueue> mqs;
+  {
+    boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+    MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+    for (; it != m_requestQueueTable.end(); ++it) {
+      if (it->second && (!it->second->isDroped())) {
+        mqs.push_back(it->first);
+      }
+    }
+  }
+
+  if (pConsumer->getMessageModel() == BROADCASTING) {
+    pOffsetStore->persistAll(mqs);
+  } else {
+    vector<MQMessageQueue>::iterator it2 = mqs.begin();
+    for (; it2 != mqs.end(); ++it2) {
+      pOffsetStore->persist(*it2, m_pConsumer->getSessionCredentials());
+    }
+  }
+}
+
+void Rebalance::persistConsumerOffsetByResetOffset() {
+  boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+  DefaultMQPushConsumer* pConsumer =
+      static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
+  vector<MQMessageQueue> mqs;
+  {
+    boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+    MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+    for (; it != m_requestQueueTable.end(); ++it) {
+      if (it->second) {  // even if it was dropped, also need update offset when
+                         // rcv resetOffset cmd
+        mqs.push_back(it->first);
+      }
+    }
+  }
+  vector<MQMessageQueue>::iterator it2 = mqs.begin();
+  for (; it2 != mqs.end(); ++it2) {
+    pOffsetStore->persist(*it2, m_pConsumer->getSessionCredentials());
+  }
+}
+
+SubscriptionData* Rebalance::getSubscriptionData(const string& topic) {
+  if (m_subscriptionData.find(topic) != m_subscriptionData.end()) {
+    return m_subscriptionData[topic];
+  }
+  return NULL;
+}
+
+map<string, SubscriptionData*>& Rebalance::getSubscriptionInner() {
+  return m_subscriptionData;
+}
+
+void Rebalance::setSubscriptionData(const string& topic,
+                                    SubscriptionData* pdata) {
+  if (pdata != NULL &&
+      m_subscriptionData.find(topic) == m_subscriptionData.end())
+    m_subscriptionData[topic] = pdata;
+}
+
+void Rebalance::setTopicSubscribeInfo(const string& topic,
+                                      vector<MQMessageQueue>& mqs) {
+  if (m_subscriptionData.find(topic) != m_subscriptionData.end()) {
+    {
+      boost::lock_guard<boost::mutex> lock(m_topicSubscribeInfoTableMutex);
+      if (m_topicSubscribeInfoTable.find(topic) !=
+          m_topicSubscribeInfoTable.end())
+        m_topicSubscribeInfoTable.erase(topic);
+      m_topicSubscribeInfoTable[topic] = mqs;
+    }
+    // log
+    vector<MQMessageQueue>::iterator it = mqs.begin();
+    for (; it != mqs.end(); ++it) {
+      LOG_DEBUG("topic [%s] has :%s", topic.c_str(), (*it).toString().c_str());
+    }
+  }
+}
+
+bool Rebalance::getTopicSubscribeInfo(const string& topic,
+                                      vector<MQMessageQueue>& mqs) {
+  boost::lock_guard<boost::mutex> lock(m_topicSubscribeInfoTableMutex);
+  if (m_topicSubscribeInfoTable.find(topic) !=
+      m_topicSubscribeInfoTable.end()) {
+    mqs = m_topicSubscribeInfoTable[topic];
+    return true;
+  }
+  return false;
+}
+
+void Rebalance::addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest) {
+  boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+  m_requestQueueTable[mq] = pPullRequest;
+}
+
+PullRequest* Rebalance::getPullRequest(MQMessageQueue mq) {
+  boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+  if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
+    return m_requestQueueTable[mq];
+  }
+  return NULL;
+}
+
+map<MQMessageQueue, PullRequest*> Rebalance::getPullRequestTable() {
+  boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+  return m_requestQueueTable;
+}
+
+void Rebalance::unlockAll(bool oneway) {
+  map<string, vector<MQMessageQueue>*> brokerMqs;
+  MQ2PULLREQ requestQueueTable = getPullRequestTable();
+  for (MQ2PULLREQ::iterator it = requestQueueTable.begin();
+       it != requestQueueTable.end(); ++it) {
+    if (!(it->second->isDroped())) {
+      if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) {
+        vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
+        brokerMqs[it->first.getBrokerName()] = mqs;
+      } else {
+        brokerMqs[it->first.getBrokerName()]->push_back(it->first);
+      }
+    }
+  }
+  LOG_INFO("unLockAll %zu broker mqs", brokerMqs.size());
+  for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin();
+       itb != brokerMqs.end(); ++itb) {
+    unique_ptr<FindBrokerResult> pFindBrokerResult(
+        m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID,
+                                                       true));
+    unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
+        new UnlockBatchRequestBody());
+    vector<MQMessageQueue> mqs(*(itb->second));
+    unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
+    unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
+    unlockBatchRequest->setMqSet(mqs);
+
+    try {
+      m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(
+          pFindBrokerResult->brokerAddr, unlockBatchRequest.get(), 1000,
+          m_pConsumer->getSessionCredentials());
+      for (unsigned int i = 0; i != mqs.size(); ++i) {
+        PullRequest* pullreq = getPullRequest(mqs[i]);
+        if (pullreq) {
+          LOG_INFO("unlockBatchMQ success of mq:%s", mqs[i].toString().c_str());
+          pullreq->setLocked(true);
+        } else {
+          LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str());
+        }
+      }
+    } catch (MQException& e) {
+      LOG_ERROR("unlockBatchMQ fails");
+    }
+    deleteAndZero(itb->second);
+  }
+  brokerMqs.clear();
+}
+
+void Rebalance::unlock(MQMessageQueue mq) {
+  unique_ptr<FindBrokerResult> pFindBrokerResult(
+      m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
+                                                     MASTER_ID, true));
+  unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
+      new UnlockBatchRequestBody());
+  vector<MQMessageQueue> mqs;
+  mqs.push_back(mq);
+  unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
+  unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
+  unlockBatchRequest->setMqSet(mqs);
+
+  try {
+    m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(
+        pFindBrokerResult->brokerAddr, unlockBatchRequest.get(), 1000,
+        m_pConsumer->getSessionCredentials());
+    for (unsigned int i = 0; i != mqs.size(); ++i) {
+      PullRequest* pullreq = getPullRequest(mqs[i]);
+      if (pullreq) {
+        LOG_INFO("unlock success of mq:%s", mqs[i].toString().c_str());
+        pullreq->setLocked(true);
+      } else {
+        LOG_ERROR("unlock fails of mq:%s", mqs[i].toString().c_str());
+      }
+    }
+  } catch (MQException& e) {
+    LOG_ERROR("unlock fails of mq:%s", mq.toString().c_str());
+  }
+}
+
+void Rebalance::lockAll() {
+  map<string, vector<MQMessageQueue>*> brokerMqs;
+  MQ2PULLREQ requestQueueTable = getPullRequestTable();
+  for (MQ2PULLREQ::iterator it = requestQueueTable.begin();
+       it != requestQueueTable.end(); ++it) {
+    if (!(it->second->isDroped())) {
+      string brokerKey = it->first.getBrokerName() + it->first.getTopic();
+      if (brokerMqs.find(brokerKey) == brokerMqs.end()) {
+        vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
+        brokerMqs[brokerKey] = mqs;
+        brokerMqs[brokerKey]->push_back(it->first);
+      } else {
+        brokerMqs[brokerKey]->push_back(it->first);
+      }
+    }
+  }
+  LOG_INFO("LockAll %zu broker mqs", brokerMqs.size());
+  for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin();
+       itb != brokerMqs.end(); ++itb) {
+    unique_ptr<FindBrokerResult> pFindBrokerResult(
+        m_pClientFactory->findBrokerAddressInSubscribe(
+            (*(itb->second))[0].getBrokerName(), MASTER_ID, true));
+    unique_ptr<LockBatchRequestBody> lockBatchRequest(
+        new LockBatchRequestBody());
+    lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
+    lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
+    lockBatchRequest->setMqSet(*(itb->second));
+    LOG_INFO("try to lock:%zu mqs of broker:%s", itb->second->size(),
+             itb->first.c_str());
+    try {
+      vector<MQMessageQueue> messageQueues;
+      m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(
+          pFindBrokerResult->brokerAddr, lockBatchRequest.get(), messageQueues,
+          1000, m_pConsumer->getSessionCredentials());
+      for (unsigned int i = 0; i != messageQueues.size(); ++i) {
+        PullRequest* pullreq = getPullRequest(messageQueues[i]);
+        if (pullreq) {
+          LOG_INFO("lockBatchMQ success of mq:%s",
+                   messageQueues[i].toString().c_str());
+          pullreq->setLocked(true);
+          pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
+        } else {
+          LOG_ERROR("lockBatchMQ fails of mq:%s",
+                    messageQueues[i].toString().c_str());
+        }
+      }
+      messageQueues.clear();
+    } catch (MQException& e) {
+      LOG_ERROR("lockBatchMQ fails");
+    }
+    deleteAndZero(itb->second);
+  }
+  brokerMqs.clear();
+}
+bool Rebalance::lock(MQMessageQueue mq) {
+  unique_ptr<FindBrokerResult> pFindBrokerResult(
+      m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
+                                                     MASTER_ID, true));
+  unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
+  lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
+  lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
+  vector<MQMessageQueue> in_mqSet;
+  in_mqSet.push_back(mq);
+  lockBatchRequest->setMqSet(in_mqSet);
+  bool lockResult = false;
+
+  try {
+    vector<MQMessageQueue> messageQueues;
+    LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
+    m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(
+        pFindBrokerResult->brokerAddr, lockBatchRequest.get(), messageQueues,
+        1000, m_pConsumer->getSessionCredentials());
+    if (messageQueues.size() == 0) {
+      LOG_ERROR("lock mq on broker:%s failed",
+                pFindBrokerResult->brokerAddr.c_str());
+      return false;
+    }
+    for (unsigned int i = 0; i != messageQueues.size(); ++i) {
+      PullRequest* pullreq = getPullRequest(messageQueues[i]);
+      if (pullreq) {
+        LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str());
+        pullreq->setLocked(true);
+        pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
+        lockResult = true;
+      } else {
+        LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str());
+      }
+    }
+    messageQueues.clear();
+    return lockResult;
+  } catch (MQException& e) {
+    LOG_ERROR("lock fails of mq:%s", mq.toString().c_str());
+    return false;
+  }
+}
+
+//<!************************************************************************
+RebalancePull::RebalancePull(MQConsumer* consumer, MQClientFactory* pfactory)
+    : Rebalance(consumer, pfactory) {}
+
+bool RebalancePull::updateRequestTableInRebalance(
+    const string& topic, vector<MQMessageQueue>& mqsSelf) {
+  return false;
+}
+
+int64 RebalancePull::computePullFromWhere(const MQMessageQueue& mq) {
+  return 0;
+}
+
+void RebalancePull::messageQueueChanged(const string& topic,
+                                        vector<MQMessageQueue>& mqAll,
+                                        vector<MQMessageQueue>& mqDivided) {}
+
+void RebalancePull::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {}
+
+//<!***************************************************************************
+RebalancePush::RebalancePush(MQConsumer* consumer, MQClientFactory* pfactory)
+    : Rebalance(consumer, pfactory) {}
+
+bool RebalancePush::updateRequestTableInRebalance(
+    const string& topic, vector<MQMessageQueue>& mqsSelf) {
+  LOG_DEBUG("updateRequestTableInRebalance Enter");
+  if (mqsSelf.empty()) {
+    LOG_WARN("allocated queue is empty for topic:%s", topic.c_str());
+  }
+
+  bool changed = false;
+
+  //<!remove
+  MQ2PULLREQ requestQueueTable(getPullRequestTable());
+  MQ2PULLREQ::iterator it = requestQueueTable.begin();
+  for (; it != requestQueueTable.end(); ++it) {
+    MQMessageQueue mqtemp = it->first;
+    if (mqtemp.getTopic().compare(topic) == 0) {
+      if (mqsSelf.empty() ||
+          (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
+        if (!(it->second->isDroped())) {
+          it->second->setDroped(true);
+          removeUnnecessaryMessageQueue(mqtemp);
+          it->second->clearAllMsgs();  // add clear operation to avoid bad state
+                                       // when dropped pullRequest returns
+                                       // normal
+          LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
+        }
+        changed = true;
+      }
+    }
+  }
+
+  //<!add
+  vector<PullRequest*> pullrequestAdd;
+  DefaultMQPushConsumer* pConsumer =
+      static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  vector<MQMessageQueue>::iterator it2 = mqsSelf.begin();
+  for (; it2 != mqsSelf.end(); ++it2) {
+    PullRequest* pPullRequest(getPullRequest(*it2));
+    if (pPullRequest && pPullRequest->isDroped()) {
+      LOG_DEBUG(
+          "before resume the pull handle of this pullRequest, its mq is:%s, "
+          "its offset is:%lld",
+          (it2->toString()).c_str(), pPullRequest->getNextOffset());
+      pConsumer->getOffsetStore()->removeOffset(
+          *it2);  // remove dirty offset which maybe update to
+                  // OffsetStore::m_offsetTable by consuming After last
+                  // drop
+      int64 nextOffset = computePullFromWhere(*it2);
+      if (nextOffset >= 0) {
+        pPullRequest->setDroped(false);
+        pPullRequest->clearAllMsgs();  // avoid consume accumulation and consume
+                                       // dumplication issues
+        pPullRequest->setNextOffset(nextOffset);
+        pPullRequest->updateQueueMaxOffset(nextOffset);
+        LOG_INFO(
+            "after resume the pull handle of this pullRequest, its mq is:%s, "
+            "its offset is:%lld",
+            (it2->toString()).c_str(), pPullRequest->getNextOffset());
+        changed = true;
+        pConsumer->producePullMsgTask(pPullRequest);
+      } else {
+        LOG_ERROR(
+            "get fatel error QueryOffset of mq:%s, do not reconsume this queue",
+            (it2->toString()).c_str());
+      }
+    }
+
+    if (!pPullRequest) {
+      LOG_INFO("updateRequestTableInRebalance Doesn't find old mq");
+      PullRequest* pullRequest = new PullRequest(m_pConsumer->getGroupName());
+      pullRequest->m_messageQueue = *it2;
+
+      int64 nextOffset = computePullFromWhere(*it2);
+      if (nextOffset >= 0) {
+        pullRequest->setNextOffset(nextOffset);
+        pullRequest->clearAllMsgs();  // avoid consume accumulation and consume
+                                      // dumplication issues
+        changed = true;
+        //<! mq-> pq;
+        addPullRequest(*it2, pullRequest);
+        pullrequestAdd.push_back(pullRequest);
+        LOG_INFO("add mq:%s, request initiall offset:%lld",
+                 (*it2).toString().c_str(), nextOffset);
+      }
+    }
+  }
+
+  vector<PullRequest*>::iterator it3 = pullrequestAdd.begin();
+  for (; it3 != pullrequestAdd.end(); ++it3) {
+    LOG_DEBUG("start pull request");
+    pConsumer->producePullMsgTask(*it3);
+  }
+
+  LOG_DEBUG("updateRequestTableInRebalance exit");
+  return changed;
+}
+
+int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) {
+  int64 result = -1;
+  DefaultMQPushConsumer* pConsumer =
+      static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere();
+  OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
+  switch (consumeFromWhere) {
+    case CONSUME_FROM_LAST_OFFSET: {
+      int64 lastOffset = pOffsetStore->readOffset(
+          mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
+      if (lastOffset >= 0) {
+        LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is:%lld",
+                 mq.toString().c_str(), lastOffset);
+        result = lastOffset;
+      }
+      else if (-1 == lastOffset) {
+        LOG_WARN("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is -1",
+                 mq.toString().c_str());
+        if (UtilAll::startsWith_retry(mq.getTopic())) {
+          LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is 0",
+                   mq.toString().c_str());
+          result = 0;
+        }
+        else {
+          try {
+            result = pConsumer->maxOffset(mq);
+            LOG_INFO("CONSUME_FROM_LAST_OFFSET, maxOffset of mq:%s is:%lld",
+                     mq.toString().c_str(), result);
+          } catch (MQException& e) {
+            LOG_ERROR(
+                "CONSUME_FROM_LAST_OFFSET error, lastOffset  of mq:%s is -1",
+                mq.toString().c_str());
+            result = -1;
+          }
+        }
+      }
+      else {
+        LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset  of mq:%s is -1",
+                  mq.toString().c_str());
+        result = -1;
+      }
+      break;
+    }
+    case CONSUME_FROM_FIRST_OFFSET: {
+      int64 lastOffset = pOffsetStore->readOffset(
+          mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
+      if (lastOffset >= 0) {
+        LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s is:%lld",
+                 mq.toString().c_str(), lastOffset);
+        result = lastOffset;
+      } else if (-1 == lastOffset)
+      {
+        LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return 0",
+                 mq.toString().c_str());
+        result = 0;
+      }
+      else {
+        LOG_ERROR("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return -1",
+                  mq.toString().c_str());
+        result = -1;
+      }
+      break;
+    }
+    case CONSUME_FROM_TIMESTAMP: {
+      int64 lastOffset = pOffsetStore->readOffset(
+          mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
+      if (lastOffset >= 0) {
+        LOG_INFO("CONSUME_FROM_TIMESTAMP, lastOffset of mq:%s is:%lld",
+                 mq.toString().c_str(), lastOffset);
+        result = lastOffset;
+      }
+      else if (-1 == lastOffset) {
+        if (UtilAll::startsWith_retry(mq.getTopic())) {
+          try {
+            result = pConsumer->maxOffset(mq);
+            LOG_INFO("CONSUME_FROM_TIMESTAMP, maxOffset  of mq:%s is:%lld",
+                     mq.toString().c_str(), result);
+          } catch (MQException& e) {
+            LOG_ERROR(
+                "CONSUME_FROM_TIMESTAMP error, lastOffset  of mq:%s is -1",
+                mq.toString().c_str());
+            result = -1;
+          }
+        }
+        else {
+          try {
+          } catch (MQException& e) {
+            LOG_ERROR(
+                "CONSUME_FROM_TIMESTAMP error, lastOffset  of mq:%s, return 0",
+                mq.toString().c_str());
+            result = -1;
+          }
+        }
+      }
+      else {
+        LOG_ERROR(
+            "CONSUME_FROM_TIMESTAMP error, lastOffset  of mq:%s, return -1",
+            mq.toString().c_str());
+        result = -1;
+      }
+      break;
+    }
+    default:
+      break;
+  }
+  return result;
+}
+
+void RebalancePush::messageQueueChanged(const string& topic,
+                                        vector<MQMessageQueue>& mqAll,
+                                        vector<MQMessageQueue>& mqDivided) {}
+
+void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
+  DefaultMQPushConsumer* pConsumer =
+      static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
+
+  pOffsetStore->persist(mq, m_pConsumer->getSessionCredentials());
+  pOffsetStore->removeOffset(mq);
+  if (pConsumer->getMessageListenerType() == messageListenerOrderly) {
+    unlock(mq);
+  }
+}
+
+//<!************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/Rebalance.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/Rebalance.h b/rocketmq-cpp/src/consumer/Rebalance.h
new file mode 100755
index 0000000..42f8667
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/Rebalance.h
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REBALANCEIMPL_H__
+#define __REBALANCEIMPL_H__
+
+#include "AllocateMQStrategy.h"
+#include "ConsumeType.h"
+#include "MQConsumer.h"
+#include "MQMessageQueue.h"
+#include "PullRequest.h"
+#include "SubscriptionData.h"
+
+#include <boost/thread/mutex.hpp>
+
+namespace rocketmq {
+class MQClientFactory;
+//<!************************************************************************
+class Rebalance {
+ public:
+  Rebalance(MQConsumer*, MQClientFactory*);
+  virtual ~Rebalance();
+
+  virtual void messageQueueChanged(const string& topic,
+                                   vector<MQMessageQueue>& mqAll,
+                                   vector<MQMessageQueue>& mqDivided) = 0;
+
+  virtual void removeUnnecessaryMessageQueue(const MQMessageQueue& mq) = 0;
+
+  virtual int64 computePullFromWhere(const MQMessageQueue& mq) = 0;
+
+  virtual bool updateRequestTableInRebalance(
+      const string& topic, vector<MQMessageQueue>& mqsSelf) = 0;
+
+ public:
+  void doRebalance();
+  void persistConsumerOffset();
+  void persistConsumerOffsetByResetOffset();
+  //<!m_subscriptionInner;
+  SubscriptionData* getSubscriptionData(const string& topic);
+  void setSubscriptionData(const string& topic, SubscriptionData* pdata);
+
+  map<string, SubscriptionData*>& getSubscriptionInner();
+
+  //<!m_topicSubscribeInfoTable;
+  void setTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs);
+  bool getTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs);
+
+  void addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest);
+  PullRequest* getPullRequest(MQMessageQueue mq);
+  map<MQMessageQueue, PullRequest*> getPullRequestTable();
+  void lockAll();
+  bool lock(MQMessageQueue mq);
+  void unlockAll(bool oneway = false);
+  void unlock(MQMessageQueue mq);
+
+ protected:
+  map<string, SubscriptionData*> m_subscriptionData;
+
+  boost::mutex m_topicSubscribeInfoTableMutex;
+  map<string, vector<MQMessageQueue>> m_topicSubscribeInfoTable;
+  typedef map<MQMessageQueue, PullRequest*> MQ2PULLREQ;
+  MQ2PULLREQ m_requestQueueTable;
+  boost::mutex m_requestTableMutex;
+
+  AllocateMQStrategy* m_pAllocateMQStrategy;
+  MQConsumer* m_pConsumer;
+  MQClientFactory* m_pClientFactory;
+};
+
+//<!************************************************************************
+class RebalancePull : public Rebalance {
+ public:
+  RebalancePull(MQConsumer*, MQClientFactory*);
+  virtual ~RebalancePull(){};
+
+  virtual void messageQueueChanged(const string& topic,
+                                   vector<MQMessageQueue>& mqAll,
+                                   vector<MQMessageQueue>& mqDivided);
+
+  virtual void removeUnnecessaryMessageQueue(const MQMessageQueue& mq);
+
+  virtual int64 computePullFromWhere(const MQMessageQueue& mq);
+
+  virtual bool updateRequestTableInRebalance(const string& topic,
+                                             vector<MQMessageQueue>& mqsSelf);
+};
+
+//<!***************************************************************************
+class RebalancePush : public Rebalance {
+ public:
+  RebalancePush(MQConsumer*, MQClientFactory*);
+  virtual ~RebalancePush(){};
+
+  virtual void messageQueueChanged(const string& topic,
+                                   vector<MQMessageQueue>& mqAll,
+                                   vector<MQMessageQueue>& mqDivided);
+
+  virtual void removeUnnecessaryMessageQueue(const MQMessageQueue& mq);
+
+  virtual int64 computePullFromWhere(const MQMessageQueue& mq);
+
+  virtual bool updateRequestTableInRebalance(const string& topic,
+                                             vector<MQMessageQueue>& mqsSelf);
+};
+
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/SubscriptionData.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/SubscriptionData.cpp b/rocketmq-cpp/src/consumer/SubscriptionData.cpp
new file mode 100755
index 0000000..9b20642
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/SubscriptionData.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SubscriptionData.h"
+#include <algorithm>
+#include <sstream>
+#include <vector>
+#include "UtilAll.h"
+#include "Logging.h"
+namespace rocketmq {
+//<!************************************************************************
+SubscriptionData::SubscriptionData() {
+  m_subVersion = UtilAll::currentTimeMillis();
+}
+
+SubscriptionData::SubscriptionData(const string& topic, const string& subString)
+    : m_topic(topic), m_subString(subString) {
+  m_subVersion = UtilAll::currentTimeMillis();
+}
+
+SubscriptionData::SubscriptionData(const SubscriptionData& other) {
+  m_subString = other.m_subString;
+  m_subVersion = other.m_subVersion;
+  m_tagSet = other.m_tagSet;
+  m_topic = other.m_topic;
+  m_codeSet = other.m_codeSet;
+}
+
+const string& SubscriptionData::getTopic() const { return m_topic; }
+
+const string& SubscriptionData::getSubString() const { return m_subString; }
+
+void SubscriptionData::setSubString(const string& sub) { m_subString = sub; }
+
+int64 SubscriptionData::getSubVersion() const { return m_subVersion; }
+
+void SubscriptionData::putTagsSet(const string& tag) {
+  m_tagSet.push_back(tag);
+}
+
+bool SubscriptionData::containTag(const string& tag) {
+  return std::find(m_tagSet.begin(), m_tagSet.end(), tag) != m_tagSet.end();
+}
+
+vector<string>& SubscriptionData::getTagsSet() { return m_tagSet; }
+
+bool SubscriptionData::operator==(const SubscriptionData& other) const {
+  if (!m_subString.compare(other.m_subString)) {
+    return false;
+  }
+  if (m_subVersion != other.m_subVersion) {
+    return false;
+  }
+  if (m_tagSet.size() != other.m_tagSet.size()) {
+    return false;
+  }
+  if (!m_topic.compare(other.m_topic)) {
+    return false;
+  }
+  return true;
+}
+
+bool SubscriptionData::operator<(const SubscriptionData& other) const {
+  int ret = m_topic.compare(other.m_topic);
+  if (ret < 0) {
+    return true;
+  } else if (ret == 0) {
+    ret = m_subString.compare(other.m_subString);
+    if (ret < 0) {
+      return true;
+    } else {
+      return false;
+    }
+  } else {
+    return false;
+  }
+}
+
+void SubscriptionData::putCodeSet(const string& tag) {
+  int value = atoi(tag.c_str());
+  m_codeSet.push_back(value);
+}
+
+Json::Value SubscriptionData::toJson() const {
+  Json::Value outJson;
+  outJson["subString"] = m_subString;
+  outJson["subVersion"] = UtilAll::to_string(m_subVersion);
+  outJson["topic"] = m_topic;
+
+  {
+    vector<string>::const_iterator it = m_tagSet.begin();
+    for (; it != m_tagSet.end(); it++) {
+      outJson["tagsSet"].append(*it);
+    }
+  }
+
+  {
+    vector<int>::const_iterator it = m_codeSet.begin();
+    for (; it != m_codeSet.end(); it++) {
+      outJson["codeSet"].append(*it);
+    }
+  }
+  return outJson;
+}
+
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/SubscriptionData.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/SubscriptionData.h b/rocketmq-cpp/src/consumer/SubscriptionData.h
new file mode 100755
index 0000000..89be74f
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/SubscriptionData.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SUBSCRIPTIONDATA_H__
+#define __SUBSCRIPTIONDATA_H__
+
+#include <string>
+#include "UtilAll.h"
+#include "json/json.h"
+
+namespace rocketmq {
+//<!************************************************************************
+class SubscriptionData {
+ public:
+  SubscriptionData();
+  virtual ~SubscriptionData() {
+    m_tagSet.clear();
+    m_codeSet.clear();
+  }
+  SubscriptionData(const string& topic, const string& subString);
+  SubscriptionData(const SubscriptionData& other);
+
+  const string& getTopic() const;
+  const string& getSubString() const;
+  void setSubString(const string& sub);
+  int64 getSubVersion() const;
+
+  void putTagsSet(const string& tag);
+  bool containTag(const string& tag);
+  vector<string>& getTagsSet();
+
+  void putCodeSet(const string& tag);
+
+  bool operator==(const SubscriptionData& other) const;
+  bool operator<(const SubscriptionData& other) const;
+
+  Json::Value toJson() const;
+
+ private:
+  string m_topic;
+  string m_subString;
+  int64 m_subVersion;
+  vector<string> m_tagSet;
+  vector<int> m_codeSet;
+};
+//<!***************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/dllmain.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/dllmain.cpp b/rocketmq-cpp/src/dllmain.cpp
new file mode 100755
index 0000000..72f61fd
--- /dev/null
+++ b/rocketmq-cpp/src/dllmain.cpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include "windows.h"
+
+BOOL APIENTRY DllMain(HMODULE hModule, DWORD ul_reason_for_call,
+                      LPVOID lpReserved) {
+  switch (ul_reason_for_call) {
+    case DLL_PROCESS_ATTACH:
+      break;
+    case DLL_THREAD_ATTACH:
+      break;
+    case DLL_THREAD_DETACH:
+      break;
+    case DLL_PROCESS_DETACH:
+      break;
+  }
+  return TRUE;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/log/Logging.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/log/Logging.cpp b/rocketmq-cpp/src/log/Logging.cpp
new file mode 100644
index 0000000..9ac812f
--- /dev/null
+++ b/rocketmq-cpp/src/log/Logging.cpp
@@ -0,0 +1,96 @@
+#include "Logging.h"
+#include <boost/date_time/gregorian/gregorian.hpp>
+#include "UtilAll.h"
+#define BOOST_DATE_TIME_SOURCE
+
+namespace rocketmq {
+
+logAdapter::~logAdapter() { logging::core::get()->remove_all_sinks(); }
+
+logAdapter& logAdapter::getLogInstance() {
+  static logAdapter alogInstance;
+  return alogInstance;
+}
+
+logAdapter::logAdapter() : m_logLevel(eLOG_LEVEL_INFO) {
+  string homeDir(UtilAll::getHomeDirectory());
+  homeDir.append("/logs/metaq-client4cpp/");
+  m_logFile += homeDir;
+  std::string fileName =
+      UtilAll::to_string(getpid()) + "_" + "rocketmq-cpp.log.%N";
+  m_logFile += fileName;
+
+  // boost::log::expressions::attr<
+  // boost::log::attributes::current_thread_id::value_type>("ThreadID");
+  boost::log::register_simple_formatter_factory<
+      boost::log::trivial::severity_level, char>("Severity");
+  m_logSink = logging::add_file_log(
+      keywords::file_name = m_logFile,
+      keywords::rotation_size = 10 * 1024 * 1024,
+      keywords::time_based_rotation =
+          sinks::file::rotation_at_time_point(0, 0, 0),
+      keywords::format = "[%TimeStamp%](%Severity%):%Message%",
+      keywords::min_free_space = 300 * 1024 * 1024, keywords::target = homeDir,
+      keywords::max_size = 20 * 1024 * 1024,  // max keep 3 log file defaultly
+      keywords::auto_flush = true);
+  logging::core::get()->set_filter(logging::trivial::severity >=
+                                   logging::trivial::info);
+
+  logging::add_common_attributes();
+}
+
+void logAdapter::setLogLevel(elogLevel logLevel) {
+  m_logLevel = logLevel;
+  switch (logLevel) {
+    case eLOG_LEVEL_DISABLE:
+      logging::core::get()->set_filter(logging::trivial::severity >
+                                       logging::trivial::fatal);
+
+      break;
+    case eLOG_LEVEL_FATAL:
+      logging::core::get()->set_filter(logging::trivial::severity >=
+                                       logging::trivial::fatal);
+      break;
+    case eLOG_LEVEL_ERROR:
+      logging::core::get()->set_filter(logging::trivial::severity >=
+                                       logging::trivial::error);
+
+      break;
+    case eLOG_LEVEL_WARN:
+      logging::core::get()->set_filter(logging::trivial::severity >=
+                                       logging::trivial::warning);
+
+      break;
+    case eLOG_LEVEL_INFO:
+      logging::core::get()->set_filter(logging::trivial::severity >=
+                                       logging::trivial::info);
+
+      break;
+    case eLOG_LEVEL_DEBUG:
+      logging::core::get()->set_filter(logging::trivial::severity >=
+                                       logging::trivial::debug);
+
+      break;
+    case eLOG_LEVEL_TRACE:
+      logging::core::get()->set_filter(logging::trivial::severity >=
+                                       logging::trivial::trace);
+
+      break;
+    default:
+      logging::core::get()->set_filter(logging::trivial::severity >=
+                                       logging::trivial::info);
+
+      break;
+  }
+}
+
+elogLevel logAdapter::getLogLevel() { return m_logLevel; }
+
+void logAdapter::setLogFileNumAndSize(int logNum, int sizeOfPerFile) {
+  string homeDir(UtilAll::getHomeDirectory());
+  homeDir.append("/logs/metaq-client4cpp/");
+  m_logSink->locked_backend()->set_file_collector(sinks::file::make_collector(
+      keywords::target = homeDir,
+      keywords::max_size = logNum * sizeOfPerFile * 1024 * 1024));
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/log/Logging.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/log/Logging.h b/rocketmq-cpp/src/log/Logging.h
new file mode 100644
index 0000000..98e9659
--- /dev/null
+++ b/rocketmq-cpp/src/log/Logging.h
@@ -0,0 +1,75 @@
+#ifndef _ALOG_ADAPTER_H_
+#define _ALOG_ADAPTER_H_
+
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <boost/log/core.hpp>
+#include <boost/log/expressions.hpp>
+#include <boost/log/sinks/text_file_backend.hpp>
+#include <boost/log/sources/record_ostream.hpp>
+#include <boost/log/sources/severity_logger.hpp>
+#include <boost/log/trivial.hpp>
+#include <boost/log/utility/manipulators/add_value.hpp>
+#include <boost/log/utility/setup/common_attributes.hpp>
+#include <boost/log/utility/setup/file.hpp>
+#include <boost/scoped_array.hpp>
+#include <boost/shared_ptr.hpp>
+#include "MQClient.h"
+
+namespace logging = boost::log;
+namespace src = boost::log::sources;
+namespace sinks = boost::log::sinks;
+namespace expr = boost::log::expressions;
+namespace keywords = boost::log::keywords;
+using namespace boost::log::trivial;
+namespace rocketmq {
+
+class logAdapter {
+ public:
+  ~logAdapter();
+  static logAdapter& getLogInstance();
+  void setLogLevel(elogLevel logLevel);
+  elogLevel getLogLevel();
+  void setLogFileNumAndSize(int logNum, int sizeOfPerFile);
+  src::severity_logger<boost::log::trivial::severity_level>&
+  getSeverityLogger() {
+    return m_severityLogger;
+  }
+
+ private:
+  logAdapter();
+  elogLevel m_logLevel;
+  std::string m_logFile;
+  src::severity_logger<boost::log::trivial::severity_level> m_severityLogger;
+  typedef sinks::synchronous_sink<sinks::text_file_backend> logSink_t;
+  boost::shared_ptr<logSink_t> m_logSink;
+};
+
+#define ALOG_ADAPTER logAdapter::getLogInstance()
+
+#define AGENT_LOGGER ALOG_ADAPTER.getSeverityLogger()
+
+class LogUtil {
+ public:
+  static void VLogError(boost::log::trivial::severity_level level,
+                        const char* format, ...) {
+    va_list arg_ptr;
+    va_start(arg_ptr, format);
+    boost::scoped_array<char> formattedString(new char[1024]);
+    vsnprintf(formattedString.get(), 1024, format, arg_ptr);
+    BOOST_LOG_SEV(AGENT_LOGGER, level) << formattedString.get();
+    va_end(arg_ptr);
+  }
+};
+
+#define LOG_FATAL(format, args...) \
+  LogUtil::VLogError(boost::log::trivial::fatal, format, ##args)
+#define LOG_ERROR(format, args...) \
+  LogUtil::VLogError(boost::log::trivial::error, format, ##args)
+#define LOG_WARN(format, args...) \
+  LogUtil::VLogError(boost::log::trivial::warning, format, ##args)
+#define LOG_INFO(format, args...) \
+  LogUtil::VLogError(boost::log::trivial::info, format, ##args)
+#define LOG_DEBUG(format, args...) \
+  LogUtil::VLogError(boost::log::trivial::debug, format, ##args)
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQDecoder.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQDecoder.cpp b/rocketmq-cpp/src/message/MQDecoder.cpp
new file mode 100755
index 0000000..4dde1f5
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQDecoder.cpp
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQDecoder.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sstream>
+#include "Logging.h"
+#include "MemoryOutputStream.h"
+#include "MessageSysFlag.h"
+#include "UtilAll.h"
+namespace rocketmq {
+//<!***************************************************************************
+const int MQDecoder::MSG_ID_LENGTH = 8 + 8;
+
+const char MQDecoder::NAME_VALUE_SEPARATOR = 1;
+const char MQDecoder::PROPERTY_SEPARATOR = 2;
+
+int MQDecoder::MessageMagicCodePostion = 4;
+int MQDecoder::MessageFlagPostion = 16;
+int MQDecoder::MessagePhysicOffsetPostion = 28;
+int MQDecoder::MessageStoreTimestampPostion = 56;
+//<!***************************************************************************
+string MQDecoder::createMessageId(sockaddr addr, int64 offset) {
+  int host, port;
+  socketAddress2IPPort(addr, host, port);
+
+  MemoryOutputStream outputmen(MSG_ID_LENGTH);
+  outputmen.writeIntBigEndian(host);
+  outputmen.writeIntBigEndian(port);
+  outputmen.writeInt64BigEndian(offset);
+
+  const char* bytes = static_cast<const char*>(outputmen.getData());
+  int len = outputmen.getDataSize();
+
+  return UtilAll::bytes2string(bytes, len);
+}
+
+MQMessageId MQDecoder::decodeMessageId(const string& msgId) {
+
+  string ipstr = msgId.substr(0, 8);
+  string portstr = msgId.substr(8, 8);
+  string offsetstr = msgId.substr(16);
+
+  char* end;
+  int ipint = strtoul(ipstr.c_str(), &end, 16);
+  int portint = strtoul(portstr.c_str(), &end, 16);
+
+  int64 offset = UtilAll::hexstr2ull(offsetstr.c_str());
+
+  offset = n2hll(offset);
+
+  portint = ntohl(portint);
+  short port = portint;
+
+  struct sockaddr_in sa;
+  sa.sin_family = AF_INET;
+  sa.sin_port = htons(port);
+  sa.sin_addr.s_addr = ipint;
+
+  sockaddr addr;
+  memcpy(&addr, &sa, sizeof(sockaddr));
+
+  MQMessageId id(addr, offset);
+
+  return id;
+}
+
+MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer) {
+  return decode(byteBuffer, true);
+}
+
+MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) {
+  MQMessageExt* msgExt = new MQMessageExt();
+
+  // 1 TOTALSIZE
+  int storeSize = byteBuffer.readIntBigEndian();
+  msgExt->setStoreSize(storeSize);
+
+  // 2 MAGICCODE sizeof(int)
+  byteBuffer.skipNextBytes(sizeof(int));
+
+  // 3 BODYCRC
+  int bodyCRC = byteBuffer.readIntBigEndian();
+  msgExt->setBodyCRC(bodyCRC);
+
+  // 4 QUEUEID
+  int queueId = byteBuffer.readIntBigEndian();
+  msgExt->setQueueId(queueId);
+
+  // 5 FLAG
+  int flag = byteBuffer.readIntBigEndian();
+  msgExt->setFlag(flag);
+
+  // 6 QUEUEOFFSET
+  int64 queueOffset = byteBuffer.readInt64BigEndian();
+  msgExt->setQueueOffset(queueOffset);
+
+  // 7 PHYSICALOFFSET
+  int64 physicOffset = byteBuffer.readInt64BigEndian();
+  msgExt->setCommitLogOffset(physicOffset);
+
+  // 8 SYSFLAG
+  int sysFlag = byteBuffer.readIntBigEndian();
+  msgExt->setSysFlag(sysFlag);
+
+  // 9 BORNTIMESTAMP
+  int64 bornTimeStamp = byteBuffer.readInt64BigEndian();
+  msgExt->setBornTimestamp(bornTimeStamp);
+
+  // 10 BORNHOST
+  int bornHost = byteBuffer.readIntBigEndian();
+  int port = byteBuffer.readIntBigEndian();
+  sockaddr bornAddr = IPPort2socketAddress(bornHost, port);
+  msgExt->setBornHost(bornAddr);
+
+  // 11 STORETIMESTAMP
+  int64 storeTimestamp = byteBuffer.readInt64BigEndian();
+  msgExt->setStoreTimestamp(storeTimestamp);
+
+  // // 12 STOREHOST
+  int storeHost = byteBuffer.readIntBigEndian();
+  port = byteBuffer.readIntBigEndian();
+  sockaddr storeAddr = IPPort2socketAddress(storeHost, port);
+  msgExt->setStoreHost(storeAddr);
+
+  // 13 RECONSUMETIMES
+  int reconsumeTimes = byteBuffer.readIntBigEndian();
+  msgExt->setReconsumeTimes(reconsumeTimes);
+
+  // 14 Prepared Transaction Offset
+  int64 preparedTransactionOffset = byteBuffer.readInt64BigEndian();
+  msgExt->setPreparedTransactionOffset(preparedTransactionOffset);
+
+  // 15 BODY
+  int bodyLen = byteBuffer.readIntBigEndian();
+  if (bodyLen > 0) {
+    if (readBody) {
+      MemoryBlock block;
+      byteBuffer.readIntoMemoryBlock(block, bodyLen);
+
+      const char* const pBody = static_cast<const char*>(block.getData());
+      int len = block.getSize();
+      string msgbody(pBody, len);
+
+      // decompress body
+      if ((sysFlag & MessageSysFlag::CompressedFlag) ==
+          MessageSysFlag::CompressedFlag) {
+        string outbody;
+        if (UtilAll::inflate(msgbody, outbody)) {
+          msgExt->setBody(outbody);
+        }
+      } else {
+        msgExt->setBody(msgbody);
+      }
+    } else {
+      byteBuffer.skipNextBytes(bodyLen);
+    }
+  }
+
+  // 16 TOPIC
+  int topicLen = (int)byteBuffer.readByte();
+  MemoryBlock block;
+  byteBuffer.readIntoMemoryBlock(block, topicLen);
+  const char* const pTopic = static_cast<const char*>(block.getData());
+  topicLen = block.getSize();
+  msgExt->setTopic(pTopic, topicLen);
+
+  // 17 properties
+  short propertiesLen = byteBuffer.readShortBigEndian();
+  if (propertiesLen > 0) {
+    MemoryBlock block;
+    byteBuffer.readIntoMemoryBlock(block, propertiesLen);
+    const char* const pProperty = static_cast<const char*>(block.getData());
+    int len = block.getSize();
+    string propertiesString(pProperty, len);
+
+    map<string, string> propertiesMap;
+    string2messageProperties(propertiesString, propertiesMap);
+    msgExt->setProperties(propertiesMap);
+    propertiesMap.clear();
+  }
+
+  // 18 msg ID
+  string msgId = createMessageId(msgExt->getStoreHost(),
+                                 (int64)msgExt->getCommitLogOffset());
+  msgExt->setMsgId(msgId);
+
+  // LOG_INFO("get msgExt from remote server, its contents
+  // are:%s",msgExt->toString().c_str());
+  return msgExt;
+}
+
+void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec) {
+  mqvec.clear();
+  decodes(mem, mqvec, true);
+}
+
+void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec,
+                        bool readBody) {
+  MemoryInputStream rawInput(*mem, true);
+
+  while (rawInput.getNumBytesRemaining() > 0) {
+    unique_ptr<MQMessageExt> msg(decode(rawInput, readBody));
+    mqvec.push_back(*msg);
+  }
+}
+
+string MQDecoder::messageProperties2String(
+    const map<string, string>& properties) {
+  string os;
+  map<string, string>::const_iterator it = properties.begin();
+
+  for (; it != properties.end(); ++it) {
+    // os << it->first << NAME_VALUE_SEPARATOR << it->second <<
+    // PROPERTY_SEPARATOR;
+    os.append(it->first);
+    os += NAME_VALUE_SEPARATOR;
+    os.append(it->second);
+    os += PROPERTY_SEPARATOR;
+  }
+
+  return os;
+}
+
+void MQDecoder::string2messageProperties(const string& propertiesString,
+                                         map<string, string>& properties) {
+  vector<string> out;
+  UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR);
+
+  for (size_t i = 0; i < out.size(); i++) {
+    vector<string> outValue;
+    UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR);
+
+    if (outValue.size() == 2) {
+      properties[outValue[0]] = outValue[1];
+    }
+  }
+}
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQDecoder.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQDecoder.h b/rocketmq-cpp/src/message/MQDecoder.h
new file mode 100755
index 0000000..393e4c7
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQDecoder.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGEDECODER_H__
+#define __MESSAGEDECODER_H__
+
+#include "MQClientException.h"
+#include "MQMessageExt.h"
+#include "MQMessageId.h"
+#include "MemoryInputStream.h"
+#include "SocketUtil.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class MQDecoder {
+ public:
+  static string createMessageId(sockaddr addr, int64 offset);
+  static MQMessageId decodeMessageId(const string& msgId);
+
+  static void decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec);
+
+  static void decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec,
+                      bool readBody);
+
+  static string messageProperties2String(const map<string, string>& properties);
+  static void string2messageProperties(const string& propertiesString,
+                                       map<string, string>& properties);
+
+ private:
+  static MQMessageExt* decode(MemoryInputStream& byteBuffer);
+  static MQMessageExt* decode(MemoryInputStream& byteBuffer, bool readBody);
+
+ public:
+  static const char NAME_VALUE_SEPARATOR;
+  static const char PROPERTY_SEPARATOR;
+  static const int MSG_ID_LENGTH;
+  static int MessageMagicCodePostion;
+  static int MessageFlagPostion;
+  static int MessagePhysicOffsetPostion;
+  static int MessageStoreTimestampPostion;
+};
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQMessage.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQMessage.cpp b/rocketmq-cpp/src/message/MQMessage.cpp
new file mode 100755
index 0000000..db5487f
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQMessage.cpp
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQMessage.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+
+const string MQMessage::PROPERTY_KEYS = "KEYS";
+const string MQMessage::PROPERTY_TAGS = "TAGS";
+const string MQMessage::PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
+const string MQMessage::PROPERTY_DELAY_TIME_LEVEL = "DELAY";
+const string MQMessage::PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
+const string MQMessage::PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+const string MQMessage::PROPERTY_REAL_QUEUE_ID = "REAL_QID";
+const string MQMessage::PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
+const string MQMessage::PROPERTY_PRODUCER_GROUP = "PGROUP";
+const string MQMessage::PROPERTY_MIN_OFFSET = "MIN_OFFSET";
+const string MQMessage::PROPERTY_MAX_OFFSET = "MAX_OFFSET";
+const string MQMessage::KEY_SEPARATOR = " ";
+//<!************************************************************************
+MQMessage::MQMessage() { Init("", "", "", 0, "", true); }
+
+MQMessage::MQMessage(const string& topic, const string& body) {
+  Init(topic, "", "", 0, body, true);
+}
+
+MQMessage::MQMessage(const string& topic, const string& tags,
+                     const string& body) {
+  Init(topic, tags, "", 0, body, true);
+}
+
+MQMessage::MQMessage(const string& topic, const string& tags,
+                     const string& keys, const string& body) {
+  Init(topic, tags, keys, 0, body, true);
+}
+
+MQMessage::MQMessage(const string& topic, const string& tags,
+                     const string& keys, const int flag, const string& body,
+                     bool waitStoreMsgOK) {
+  Init(topic, tags, keys, flag, body, waitStoreMsgOK);
+}
+
+MQMessage::~MQMessage() { m_properties.clear(); }
+
+MQMessage::MQMessage(const MQMessage& other) {
+  m_body = other.m_body;
+  m_topic = other.m_topic;
+  m_flag = other.m_flag;
+  m_properties = other.m_properties;
+}
+
+MQMessage& MQMessage::operator=(const MQMessage& other) {
+  if (this != &other) {
+    m_body = other.m_body;
+    m_topic = other.m_topic;
+    m_flag = other.m_flag;
+    m_properties = other.m_properties;
+  }
+  return *this;
+}
+
+void MQMessage::setProperty(const string& name, const string& value) {
+  m_properties[name] = value;
+}
+
+string MQMessage::getProperty(const string& name) const {
+  map<string, string>::const_iterator it = m_properties.find(name);
+
+  return (it == m_properties.end()) ? "" : (*it).second;
+}
+
+string MQMessage::getTopic() const { return m_topic; }
+
+void MQMessage::setTopic(const string& topic) { m_topic = topic; }
+
+void MQMessage::setTopic(const char* body, int len) {
+  m_topic.clear();
+  m_topic.append(body, len);
+}
+
+string MQMessage::getTags() const { return getProperty(PROPERTY_TAGS); }
+
+void MQMessage::setTags(const string& tags) {
+  setProperty(PROPERTY_TAGS, tags);
+}
+
+string MQMessage::getKeys() const { return getProperty(PROPERTY_KEYS); }
+
+void MQMessage::setKeys(const string& keys) {
+  setProperty(PROPERTY_KEYS, keys);
+}
+
+void MQMessage::setKeys(const vector<string>& keys) {
+  if (keys.empty()) {
+    return;
+  }
+
+  vector<string>::const_iterator it = keys.begin();
+  string str;
+  str += *it;
+  it++;
+
+  for (; it != keys.end(); it++) {
+    str += KEY_SEPARATOR;
+    str += *it;
+  }
+
+  setKeys(str);
+}
+
+int MQMessage::getDelayTimeLevel() const {
+  string tmp = getProperty(PROPERTY_DELAY_TIME_LEVEL);
+  if (!tmp.empty()) {
+    return atoi(tmp.c_str());
+  }
+  return 0;
+}
+
+void MQMessage::setDelayTimeLevel(int level) {
+  char tmp[16];
+  sprintf(tmp, "%d", level);
+
+  setProperty(PROPERTY_DELAY_TIME_LEVEL, tmp);
+}
+
+bool MQMessage::isWaitStoreMsgOK() {
+  string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK);
+  if (tmp.empty()) {
+    return true;
+  } else {
+    return (tmp == "true") ? true : false;
+  }
+}
+
+void MQMessage::setWaitStoreMsgOK(bool waitStoreMsgOK) {
+  if (waitStoreMsgOK) {
+    setProperty(PROPERTY_WAIT_STORE_MSG_OK, "true");
+  } else {
+    setProperty(PROPERTY_WAIT_STORE_MSG_OK, "false");
+  }
+}
+
+int MQMessage::getFlag() const { return m_flag; }
+
+void MQMessage::setFlag(int flag) { m_flag = flag; }
+
+string MQMessage::getBody() const { return m_body; }
+
+void MQMessage::setBody(const char* body, int len) {
+  m_body.clear();
+  m_body.append(body, len);
+}
+
+void MQMessage::setBody(const string &body) {
+  m_body.clear();
+  m_body.append(body);
+}
+
+map<string, string> MQMessage::getProperties() const { return m_properties; }
+
+void MQMessage::setProperties(map<string, string>& properties) {
+  m_properties = properties;
+}
+
+void MQMessage::Init(const string& topic, const string& tags,
+                     const string& keys, const int flag, const string& body,
+                     bool waitStoreMsgOK) {
+  m_topic = topic;
+  m_flag = flag;
+  m_body = body;
+
+  if (tags.length() > 0) {
+    setTags(tags);
+  }
+
+  if (keys.length() > 0) {
+    setKeys(keys);
+  }
+
+  setWaitStoreMsgOK(waitStoreMsgOK);
+}
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQMessageExt.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQMessageExt.cpp b/rocketmq-cpp/src/message/MQMessageExt.cpp
new file mode 100755
index 0000000..bfba42d
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQMessageExt.cpp
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQMessageExt.h"
+#include "MessageSysFlag.h"
+#include "SocketUtil.h"
+#include "TopicFilterType.h"
+
+namespace rocketmq {
+//<!************************************************************************
+MQMessageExt::MQMessageExt()
+    : m_queueOffset(0),
+      m_commitLogOffset(0),
+      m_bornTimestamp(0),
+      m_storeTimestamp(0),
+      m_preparedTransactionOffset(0),
+      m_queueId(0),
+      m_storeSize(0),
+      m_sysFlag(0),
+      m_bodyCRC(0),
+      m_reconsumeTimes(3),
+      m_msgId("") {}
+
+MQMessageExt::MQMessageExt(int queueId, int64 bornTimestamp, sockaddr bornHost,
+                           int64 storeTimestamp, sockaddr storeHost,
+                           string msgId)
+    : m_queueOffset(0),
+      m_commitLogOffset(0),
+      m_bornTimestamp(bornTimestamp),
+      m_storeTimestamp(storeTimestamp),
+      m_preparedTransactionOffset(0),
+      m_queueId(queueId),
+      m_storeSize(0),
+      m_sysFlag(0),
+      m_bodyCRC(0),
+      m_reconsumeTimes(3),
+      m_bornHost(bornHost),
+      m_storeHost(storeHost),
+      m_msgId(msgId) {}
+
+MQMessageExt::~MQMessageExt() {}
+
+int MQMessageExt::getQueueId() const { return m_queueId; }
+
+void MQMessageExt::setQueueId(int queueId) { m_queueId = queueId; }
+
+int64 MQMessageExt::getBornTimestamp() const { return m_bornTimestamp; }
+
+void MQMessageExt::setBornTimestamp(int64 bornTimestamp) {
+  m_bornTimestamp = bornTimestamp;
+}
+
+sockaddr MQMessageExt::getBornHost() const { return m_bornHost; }
+
+string MQMessageExt::getBornHostString() const {
+  return socketAddress2String(m_bornHost);
+}
+
+string MQMessageExt::getBornHostNameString() const {
+  return getHostName(m_bornHost);
+}
+
+void MQMessageExt::setBornHost(const sockaddr& bornHost) {
+  m_bornHost = bornHost;
+}
+
+int64 MQMessageExt::getStoreTimestamp() const { return m_storeTimestamp; }
+
+void MQMessageExt::setStoreTimestamp(int64 storeTimestamp) {
+  m_storeTimestamp = storeTimestamp;
+}
+
+sockaddr MQMessageExt::getStoreHost() const { return m_storeHost; }
+
+string MQMessageExt::getStoreHostString() const {
+  return socketAddress2String(m_storeHost);
+}
+
+void MQMessageExt::setStoreHost(const sockaddr& storeHost) {
+  m_storeHost = storeHost;
+}
+
+const string& MQMessageExt::getMsgId() const { return m_msgId; }
+
+void MQMessageExt::setMsgId(const string& msgId) { m_msgId = msgId; }
+
+int MQMessageExt::getSysFlag() const { return m_sysFlag; }
+
+void MQMessageExt::setSysFlag(int sysFlag) { m_sysFlag = sysFlag; }
+
+int MQMessageExt::getBodyCRC() const { return m_bodyCRC; }
+
+void MQMessageExt::setBodyCRC(int bodyCRC) { m_bodyCRC = bodyCRC; }
+
+int64 MQMessageExt::getQueueOffset() const { return m_queueOffset; }
+
+void MQMessageExt::setQueueOffset(int64 queueOffset) {
+  m_queueOffset = queueOffset;
+}
+
+int64 MQMessageExt::getCommitLogOffset() const { return m_commitLogOffset; }
+
+void MQMessageExt::setCommitLogOffset(int64 physicOffset) {
+  m_commitLogOffset = physicOffset;
+}
+
+int MQMessageExt::getStoreSize() const { return m_storeSize; }
+
+void MQMessageExt::setStoreSize(int storeSize) { m_storeSize = storeSize; }
+
+int MQMessageExt::parseTopicFilterType(int sysFlag) {
+  if ((sysFlag & MessageSysFlag::MultiTagsFlag) ==
+      MessageSysFlag::MultiTagsFlag) {
+    return MULTI_TAG;
+  }
+  return SINGLE_TAG;
+}
+
+int MQMessageExt::getReconsumeTimes() const { return m_reconsumeTimes; }
+
+void MQMessageExt::setReconsumeTimes(int reconsumeTimes) {
+  m_reconsumeTimes = reconsumeTimes;
+}
+
+int64 MQMessageExt::getPreparedTransactionOffset() const {
+  return m_preparedTransactionOffset;
+}
+
+void MQMessageExt::setPreparedTransactionOffset(
+    int64 preparedTransactionOffset) {
+  m_preparedTransactionOffset = preparedTransactionOffset;
+}
+
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQMessageId.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQMessageId.h b/rocketmq-cpp/src/message/MQMessageId.h
new file mode 100755
index 0000000..366ac20
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQMessageId.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGEID_H__
+#define __MESSAGEID_H__
+
+#include "SocketUtil.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class MQMessageId {
+ public:
+  MQMessageId(sockaddr address, int64 offset)
+      : m_address(address), m_offset(offset) {}
+
+  sockaddr getAddress() const { return m_address; }
+
+  void setAddress(sockaddr address) { m_address = address; }
+
+  int64 getOffset() const { return m_offset; }
+
+  void setOffset(int64 offset) { m_offset = offset; }
+
+ private:
+  sockaddr m_address;
+  int64 m_offset;
+};
+
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQMessageQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQMessageQueue.cpp b/rocketmq-cpp/src/message/MQMessageQueue.cpp
new file mode 100755
index 0000000..60481e5
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQMessageQueue.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQMessageQueue.h"
+
+namespace rocketmq {
+//<!************************************************************************
+MQMessageQueue::MQMessageQueue() {
+  m_queueId = -1;  // invalide mq
+  m_topic.clear();
+  m_brokerName.clear();
+}
+
+MQMessageQueue::MQMessageQueue(const std::string& topic, const std::string& brokerName,
+                               int queueId)
+    : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId) {}
+
+MQMessageQueue::MQMessageQueue(const MQMessageQueue& other)
+    : m_topic(other.m_topic),
+      m_brokerName(other.m_brokerName),
+      m_queueId(other.m_queueId) {}
+
+MQMessageQueue& MQMessageQueue::operator=(const MQMessageQueue& other) {
+  if (this != &other) {
+    m_brokerName = other.m_brokerName;
+    m_topic = other.m_topic;
+    m_queueId = other.m_queueId;
+  }
+  return *this;
+}
+
+std::string MQMessageQueue::getTopic() const { return m_topic; }
+
+void MQMessageQueue::setTopic(const std::string& topic) { m_topic = topic; }
+
+std::string MQMessageQueue::getBrokerName() const { return m_brokerName; }
+
+void MQMessageQueue::setBrokerName(const std::string& brokerName) {
+  m_brokerName = brokerName;
+}
+
+int MQMessageQueue::getQueueId() const { return m_queueId; }
+
+void MQMessageQueue::setQueueId(int queueId) { m_queueId = queueId; }
+
+bool MQMessageQueue::operator==(const MQMessageQueue& mq) const {
+  if (this == &mq) {
+    return true;
+  }
+
+  if (m_brokerName != mq.m_brokerName) {
+    return false;
+  }
+
+  if (m_queueId != mq.m_queueId) {
+    return false;
+  }
+
+  if (m_topic != mq.m_topic) {
+    return false;
+  }
+
+  return true;
+}
+
+int MQMessageQueue::compareTo(const MQMessageQueue& mq) const {
+  int result = m_topic.compare(mq.m_topic);
+  if (result != 0) {
+    return result;
+  }
+
+  result = m_brokerName.compare(mq.m_brokerName);
+  if (result != 0) {
+    return result;
+  }
+
+  return m_queueId - mq.m_queueId;
+}
+
+bool MQMessageQueue::operator<(const MQMessageQueue& mq) const {
+  return compareTo(mq) < 0;
+}
+
+//<!***************************************************************************
+}  //<!end namespace;