You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:50 UTC

[04/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp code

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/DefaultMQProducer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/producer/DefaultMQProducer.cpp b/rocketmq-cpp/src/producer/DefaultMQProducer.cpp
new file mode 100755
index 0000000..9c53930
--- /dev/null
+++ b/rocketmq-cpp/src/producer/DefaultMQProducer.cpp
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQProducer.h"
+#include <assert.h>
+#include "CommandHeader.h"
+#include "CommunicationMode.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "MQDecoder.h"
+#include "MQProtos.h"
+#include "MessageSysFlag.h"
+#include "TopicPublishInfo.h"
+#include "Validators.h"
+
+namespace rocketmq {
+
+//<!************************************************************************
+DefaultMQProducer::DefaultMQProducer(const string& groupname)
+    : m_sendMsgTimeout(3000),
+      m_compressMsgBodyOverHowmuch(4 * 1024),
+      m_maxMessageSize(1024 * 128),
+      m_retryAnotherBrokerWhenNotStoreOK(false),
+      m_compressLevel(5),
+      m_retryTimes(5) {
+  //<!set default group name;
+  string gname = groupname.empty() ? DEFAULT_PRODUCER_GROUP : groupname;
+  setGroupName(gname);
+}
+
+DefaultMQProducer::~DefaultMQProducer() {}
+
+void DefaultMQProducer::start() {
+  /* Ignore the SIGPIPE */
+  struct sigaction sa;
+  sa.sa_handler = SIG_IGN;
+  sa.sa_flags = 0;
+  sigaction(SIGPIPE, &sa, 0);
+
+  switch (m_serviceState) {
+    case CREATE_JUST: {
+      m_serviceState = START_FAILED;
+      MQClient::start();
+      LOG_INFO("DefaultMQProducer:%s start", m_GroupName.c_str());
+
+      bool registerOK = getFactory()->registerProducer(this);
+      if (!registerOK) {
+        m_serviceState = CREATE_JUST;
+        THROW_MQEXCEPTION(
+            MQClientException,
+            "The producer group[" + getGroupName() +
+                "] has been created before, specify another name please.",
+            -1);
+      }
+
+      getFactory()->start();
+      getFactory()->sendHeartbeatToAllBroker();
+      m_serviceState = RUNNING;
+      break;
+    }
+    case RUNNING:
+    case START_FAILED:
+    case SHUTDOWN_ALREADY:
+      break;
+    default:
+      break;
+  }
+}
+
+void DefaultMQProducer::shutdown() {
+  switch (m_serviceState) {
+    case RUNNING: {
+      LOG_INFO("DefaultMQProducer shutdown");
+      getFactory()->unregisterProducer(this);
+      getFactory()->shutdown();
+      m_serviceState = SHUTDOWN_ALREADY;
+      break;
+    }
+    case SHUTDOWN_ALREADY:
+    case CREATE_JUST:
+      break;
+    default:
+      break;
+  }
+}
+
+SendResult DefaultMQProducer::send(MQMessage& msg, bool bSelectActiveBroker) {
+  Validators::checkMessage(msg, getMaxMessageSize());
+  try {
+    return sendDefaultImpl(msg, ComMode_SYNC, NULL, bSelectActiveBroker);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+  return SendResult();
+}
+
+void DefaultMQProducer::send(MQMessage& msg, SendCallback* pSendCallback,
+                             bool bSelectActiveBroker) {
+  Validators::checkMessage(msg, getMaxMessageSize());
+  try {
+    sendDefaultImpl(msg, ComMode_ASYNC, pSendCallback, bSelectActiveBroker);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+}
+
+SendResult DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq) {
+  Validators::checkMessage(msg, getMaxMessageSize());
+  if (msg.getTopic() != mq.getTopic()) {
+    LOG_WARN("message's topic not equal mq's topic");
+  }
+  try {
+    return sendKernelImpl(msg, mq, ComMode_SYNC, NULL);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+  return SendResult();
+}
+
+void DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq,
+                             SendCallback* pSendCallback) {
+  Validators::checkMessage(msg, getMaxMessageSize());
+  if (msg.getTopic() != mq.getTopic()) {
+    LOG_WARN("message's topic not equal mq's topic");
+  }
+  try {
+    sendKernelImpl(msg, mq, ComMode_ASYNC, pSendCallback);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+}
+
+void DefaultMQProducer::sendOneway(MQMessage& msg, bool bSelectActiveBroker) {
+  Validators::checkMessage(msg, getMaxMessageSize());
+  try {
+    sendDefaultImpl(msg, ComMode_ONEWAY, NULL, bSelectActiveBroker);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+}
+
+void DefaultMQProducer::sendOneway(MQMessage& msg, const MQMessageQueue& mq) {
+  Validators::checkMessage(msg, getMaxMessageSize());
+  if (msg.getTopic() != mq.getTopic()) {
+    LOG_WARN("message's topic not equal mq's topic");
+  }
+  try {
+    sendKernelImpl(msg, mq, ComMode_ONEWAY, NULL);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+}
+
+SendResult DefaultMQProducer::send(MQMessage& msg,
+                                   MessageQueueSelector* pSelector, void* arg) {
+  try {
+    return sendSelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+  return SendResult();
+}
+
+SendResult DefaultMQProducer::send(MQMessage& msg,
+                                   MessageQueueSelector* pSelector, void* arg,
+                                   int autoRetryTimes, bool bActiveBroker) {
+  try {
+    return sendAutoRetrySelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL,
+                                   autoRetryTimes, bActiveBroker);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+  return SendResult();
+}
+
+void DefaultMQProducer::send(MQMessage& msg, MessageQueueSelector* pSelector,
+                             void* arg, SendCallback* pSendCallback) {
+  try {
+    sendSelectImpl(msg, pSelector, arg, ComMode_ASYNC, pSendCallback);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+}
+
+void DefaultMQProducer::sendOneway(MQMessage& msg,
+                                   MessageQueueSelector* pSelector, void* arg) {
+  try {
+    sendSelectImpl(msg, pSelector, arg, ComMode_ONEWAY, NULL);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+}
+
+int DefaultMQProducer::getSendMsgTimeout() const { return m_sendMsgTimeout; }
+
+void DefaultMQProducer::setSendMsgTimeout(int sendMsgTimeout) {
+  m_sendMsgTimeout = sendMsgTimeout;
+}
+
+int DefaultMQProducer::getCompressMsgBodyOverHowmuch() const {
+  return m_compressMsgBodyOverHowmuch;
+}
+
+void DefaultMQProducer::setCompressMsgBodyOverHowmuch(
+    int compressMsgBodyOverHowmuch) {
+  m_compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+}
+
+int DefaultMQProducer::getMaxMessageSize() const { return m_maxMessageSize; }
+
+void DefaultMQProducer::setMaxMessageSize(int maxMessageSize) {
+  m_maxMessageSize = maxMessageSize;
+}
+
+int DefaultMQProducer::getCompressLevel() const { return m_compressLevel; }
+
+void DefaultMQProducer::setCompressLevel(int compressLevel) {
+  assert(compressLevel >= 0 && compressLevel <= 9 || compressLevel == -1);
+
+  m_compressLevel = compressLevel;
+}
+
+//<!************************************************************************
+SendResult DefaultMQProducer::sendDefaultImpl(MQMessage& msg,
+                                              int communicationMode,
+                                              SendCallback* pSendCallback,
+                                              bool bActiveMQ) {
+  MQMessageQueue lastmq;
+  int mq_index = 0;
+  for (int times = 1; times <= m_retryTimes; times++) {
+    boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
+        getFactory()->tryToFindTopicPublishInfo(msg.getTopic(),
+                                                getSessionCredentials()));
+    boost::shared_ptr<TopicPublishInfo> topicPublishInfo(
+        weak_topicPublishInfo.lock());
+    if (topicPublishInfo) {
+      if (times == 1) {
+        mq_index = topicPublishInfo->getWhichQueue();
+      } else {
+        mq_index++;
+      }
+
+      SendResult sendResult;
+      MQMessageQueue mq;
+      if (bActiveMQ)
+        mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index);
+      else
+        mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index);
+
+      lastmq = mq;
+      if (mq.getQueueId() == -1) {
+        // THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is
+        // invalide", -1);
+        continue;
+      }
+
+      try {
+        LOG_DEBUG("send to brokerName:%s", mq.getBrokerName().c_str());
+        sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
+        switch (communicationMode) {
+          case ComMode_ASYNC:
+            return sendResult;
+          case ComMode_ONEWAY:
+            return sendResult;
+          case ComMode_SYNC:
+            if (sendResult.getSendStatus() != SEND_OK) {
+              if (bActiveMQ) {
+                topicPublishInfo->updateNonServiceMessageQueue(
+                    mq, getSendMsgTimeout());
+              }
+              continue;
+            }
+            return sendResult;
+          default:
+            break;
+        }
+      } catch (...) {
+        LOG_ERROR("send failed of times:%d,brokerName:%s", times,
+                  mq.getBrokerName().c_str());
+        if (bActiveMQ) {
+          topicPublishInfo->updateNonServiceMessageQueue(mq,
+                                                         getSendMsgTimeout());
+        }
+        continue;
+      }
+    }  // end of for
+    LOG_WARN("Retry many times, still failed");
+  }
+  THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1);
+}
+
+SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
+                                             const MQMessageQueue& mq,
+                                             int communicationMode,
+                                             SendCallback* sendCallback) {
+  string brokerAddr =
+      getFactory()->findBrokerAddressInPublish(mq.getBrokerName());
+
+  if (brokerAddr.empty()) {
+    getFactory()->tryToFindTopicPublishInfo(mq.getTopic(),
+                                            getSessionCredentials());
+    brokerAddr = getFactory()->findBrokerAddressInPublish(mq.getBrokerName());
+  }
+
+  if (!brokerAddr.empty()) {
+    try {
+      LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(),
+                mq.toString().c_str());
+      int sysFlag = 0;
+      if (tryToCompressMessage(msg)) {
+        sysFlag |= MessageSysFlag::CompressedFlag;
+      }
+
+      string tranMsg =
+          msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED);
+      if (!tranMsg.empty() && tranMsg == "true") {
+        sysFlag |= MessageSysFlag::TransactionPreparedType;
+      }
+
+      SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
+      requestHeader->producerGroup = getGroupName();
+      requestHeader->topic = (msg.getTopic());
+      requestHeader->defaultTopic = DEFAULT_TOPIC;
+      requestHeader->defaultTopicQueueNums = 4;
+      requestHeader->queueId = (mq.getQueueId());
+      requestHeader->sysFlag = (sysFlag);
+      requestHeader->bornTimestamp = UtilAll::currentTimeMillis();
+      requestHeader->flag = (msg.getFlag());
+      requestHeader->properties =
+          (MQDecoder::messageProperties2String(msg.getProperties()));
+
+      return getFactory()->getMQClientAPIImpl()->sendMessage(
+          brokerAddr, mq.getBrokerName(), msg, requestHeader,
+          getSendMsgTimeout(), communicationMode, sendCallback,
+          getSessionCredentials());
+    } catch (MQException& e) {
+      throw e;
+    }
+  }
+  THROW_MQEXCEPTION(MQClientException,
+                    "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+SendResult DefaultMQProducer::sendSelectImpl(MQMessage& msg,
+                                             MessageQueueSelector* pSelector,
+                                             void* pArg, int communicationMode,
+                                             SendCallback* sendCallback) {
+  Validators::checkMessage(msg, getMaxMessageSize());
+
+  boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
+      getFactory()->tryToFindTopicPublishInfo(msg.getTopic(),
+                                              getSessionCredentials()));
+  boost::shared_ptr<TopicPublishInfo> topicPublishInfo(
+      weak_topicPublishInfo.lock());
+  if (topicPublishInfo)  //&& topicPublishInfo->ok())
+  {
+    MQMessageQueue mq =
+        pSelector->select(topicPublishInfo->getMessageQueueList(), msg, pArg);
+    return sendKernelImpl(msg, mq, communicationMode, sendCallback);
+  }
+  THROW_MQEXCEPTION(MQClientException, "No route info for this topic", -1);
+}
+
+SendResult DefaultMQProducer::sendAutoRetrySelectImpl(
+    MQMessage& msg, MessageQueueSelector* pSelector, void* pArg,
+    int communicationMode, SendCallback* pSendCallback, int autoRetryTimes,
+    bool bActiveMQ) {
+  Validators::checkMessage(msg, getMaxMessageSize());
+
+  MQMessageQueue lastmq;
+  MQMessageQueue mq;
+  int mq_index = 0;
+  for (int times = 1; times <= autoRetryTimes + 1; times++) {
+    boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
+        getFactory()->tryToFindTopicPublishInfo(msg.getTopic(),
+                                                getSessionCredentials()));
+    boost::shared_ptr<TopicPublishInfo> topicPublishInfo(
+        weak_topicPublishInfo.lock());
+    if (topicPublishInfo) {
+      SendResult sendResult;
+      if (times == 1) {  // always send to selected MQ firstly, evenif bActiveMQ
+                         // was setted to true
+        mq = pSelector->select(topicPublishInfo->getMessageQueueList(), msg,
+                               pArg);
+        lastmq = mq;
+      } else {
+        LOG_INFO("sendAutoRetrySelectImpl with times:%d", times);
+        vector<MQMessageQueue> mqs(topicPublishInfo->getMessageQueueList());
+        for (size_t i = 0; i < mqs.size(); i++) {
+          if (mqs[i] == lastmq) mq_index = i;
+        }
+        if (bActiveMQ)
+          mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index);
+        else
+          mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index);
+        lastmq = mq;
+        if (mq.getQueueId() == -1) {
+          // THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is
+          // invalide", -1);
+          continue;
+        }
+      }
+
+      try {
+        LOG_DEBUG("send to broker:%s", mq.toString().c_str());
+        sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
+        switch (communicationMode) {
+          case ComMode_ASYNC:
+            return sendResult;
+          case ComMode_ONEWAY:
+            return sendResult;
+          case ComMode_SYNC:
+            if (sendResult.getSendStatus() != SEND_OK) {
+              if (bActiveMQ) {
+                topicPublishInfo->updateNonServiceMessageQueue(
+                    mq, getSendMsgTimeout());
+              }
+              continue;
+            }
+            return sendResult;
+          default:
+            break;
+        }
+      } catch (...) {
+        LOG_ERROR("send failed of times:%d,mq:%s", times,
+                  mq.toString().c_str());
+        if (bActiveMQ) {
+          topicPublishInfo->updateNonServiceMessageQueue(mq,
+                                                         getSendMsgTimeout());
+        }
+        continue;
+      }
+    }  // end of for
+    LOG_WARN("Retry many times, still failed");
+  }
+  THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1);
+}
+
+bool DefaultMQProducer::tryToCompressMessage(MQMessage& msg) {
+  string body = msg.getBody();
+  if ((int)body.length() >= getCompressMsgBodyOverHowmuch()) {
+    string outBody;
+    if (UtilAll::deflate(body, outBody, getCompressLevel())) {
+      msg.setBody(outBody);
+      return true;
+    }
+  }
+
+  return false;
+}
+int DefaultMQProducer::getRetryTimes() const { return m_retryTimes; }
+void DefaultMQProducer::setRetryTimes(int times) {
+  if (times <= 0) {
+    LOG_WARN("set retry times illegal, use default value:5");
+    return;
+  }
+
+  if (times > 15) {
+    LOG_WARN("set retry times illegal, use max value:15");
+    m_retryTimes = 15;
+    return;
+  }
+  LOG_WARN("set retry times to:%d", times);
+  m_retryTimes = times;
+}
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/SendResult.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/producer/SendResult.cpp b/rocketmq-cpp/src/producer/SendResult.cpp
new file mode 100755
index 0000000..7fd844e
--- /dev/null
+++ b/rocketmq-cpp/src/producer/SendResult.cpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SendResult.h"
+#include "UtilAll.h"
+#include "VirtualEnvUtil.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+SendResult::SendResult() : m_sendStatus(SEND_OK), m_queueOffset(0) {}
+
+SendResult::SendResult(const SendStatus& sendStatus, const string& msgId,
+                       const MQMessageQueue& messageQueue, int64 queueOffset)
+    : m_sendStatus(sendStatus),
+      m_msgId(msgId),
+      m_messageQueue(messageQueue),
+      m_queueOffset(queueOffset) {}
+
+SendResult::SendResult(const SendResult& other) {
+  m_sendStatus = other.m_sendStatus;
+  m_msgId = other.m_msgId;
+  m_messageQueue = other.m_messageQueue;
+  m_queueOffset = other.m_queueOffset;
+}
+
+SendResult& SendResult::operator=(const SendResult& other) {
+  if (this != &other) {
+    m_sendStatus = other.m_sendStatus;
+    m_msgId = other.m_msgId;
+    m_messageQueue = other.m_messageQueue;
+    m_queueOffset = other.m_queueOffset;
+  }
+  return *this;
+}
+
+SendResult::~SendResult() {}
+
+const string& SendResult::getMsgId() const { return m_msgId; }
+
+SendStatus SendResult::getSendStatus() const { return m_sendStatus; }
+
+MQMessageQueue SendResult::getMessageQueue() const { return m_messageQueue; }
+
+int64 SendResult::getQueueOffset() const { return m_queueOffset; }
+
+//<!************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/TopicPublishInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/producer/TopicPublishInfo.h b/rocketmq-cpp/src/producer/TopicPublishInfo.h
new file mode 100755
index 0000000..726b231
--- /dev/null
+++ b/rocketmq-cpp/src/producer/TopicPublishInfo.h
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICPUBLISHINFO_H__
+#define __TOPICPUBLISHINFO_H__
+
+#include <boost/asio.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/atomic.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
+#include "Logging.h"
+#include "MQMessageQueue.h"
+
+namespace rocketmq {
+//<!************************************************************************/
+class TopicPublishInfo {
+ public:
+  TopicPublishInfo() : m_sendWhichQueue(0) {
+    m_async_service_thread.reset(new boost::thread(
+        boost::bind(&TopicPublishInfo::boost_asio_work, this)));
+  }
+
+  void boost_asio_work() {
+    boost::asio::io_service::work work(m_async_ioService);  // avoid async io
+                                                            // service stops
+                                                            // after first timer
+                                                            // timeout callback
+    boost::system::error_code e;
+    boost::asio::deadline_timer t(m_async_ioService,
+                                  boost::posix_time::seconds(60));
+    t.async_wait(boost::bind(
+        &TopicPublishInfo::op_resumeNonServiceMessageQueueList, this, e, &t));
+    boost::system::error_code ec;
+    m_async_ioService.run(ec);
+  }
+
+  virtual ~TopicPublishInfo() {
+    m_async_ioService.stop();
+    m_async_service_thread->interrupt();
+    m_async_service_thread->join();
+      
+    m_nonSerivceQueues.clear();
+    m_onSerivceQueues.clear();
+    m_brokerTimerMap.clear();
+    m_queues.clear();
+  }
+
+  bool ok() {
+    boost::lock_guard<boost::mutex> lock(m_queuelock);
+    return !m_queues.empty();
+  }
+
+  void updateMessageQueueList(const MQMessageQueue& mq) {
+    boost::lock_guard<boost::mutex> lock(m_queuelock);
+    m_queues.push_back(mq);
+    string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
+    m_onSerivceQueues[key] = mq;
+    if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
+      m_nonSerivceQueues.erase(key);  // if topicPublishInfo changed, erase this
+                                      // mq from m_nonSerivceQueues to avoid 2
+                                      // copies both in m_onSerivceQueues and
+                                      // m_nonSerivceQueues
+    }
+  }
+
+  void op_resumeNonServiceMessageQueueList(boost::system::error_code& ec,
+                                           boost::asio::deadline_timer* t) {
+    resumeNonServiceMessageQueueList();
+    boost::system::error_code e;
+    t->expires_at(t->expires_at() + boost::posix_time::seconds(60), e);
+    t->async_wait(boost::bind(
+        &TopicPublishInfo::op_resumeNonServiceMessageQueueList, this, e, t));
+  }
+
+  void resumeNonServiceMessageQueueList() {
+    boost::lock_guard<boost::mutex> lock(m_queuelock);
+    for (map<MQMessageQueue, int64>::iterator it = m_brokerTimerMap.begin();
+         it != m_brokerTimerMap.end(); ++it) {
+      if (UtilAll::currentTimeMillis() - it->second >= 1000 * 60 * 5) {
+        string key = it->first.getBrokerName() +
+                     UtilAll::to_string(it->first.getQueueId());
+        if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
+          m_nonSerivceQueues.erase(key);
+        }
+        m_onSerivceQueues[key] = it->first;
+      }
+    }
+  }
+
+  void updateNonServiceMessageQueue(const MQMessageQueue& mq,
+                                    int timeoutMilliseconds) {
+    boost::lock_guard<boost::mutex> lock(m_queuelock);
+
+    string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
+    if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
+      return;
+    }
+    LOG_INFO("updateNonServiceMessageQueue of mq:%s", mq.toString().c_str());
+    m_brokerTimerMap[mq] = UtilAll::currentTimeMillis();
+    m_nonSerivceQueues[key] = mq;
+    if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) {
+      m_onSerivceQueues.erase(key);
+    }
+  }
+
+  vector<MQMessageQueue>& getMessageQueueList() {
+    boost::lock_guard<boost::mutex> lock(m_queuelock);
+    return m_queues;
+  }
+
+  int getWhichQueue() {
+    return m_sendWhichQueue.load(boost::memory_order_acquire);
+  }
+
+  MQMessageQueue selectOneMessageQueue(const MQMessageQueue& lastmq,
+                                       int& mq_index) {
+    boost::lock_guard<boost::mutex> lock(m_queuelock);
+
+    if (m_queues.size() > 0) {
+      LOG_DEBUG("selectOneMessageQueue Enter, queue size:%zu", m_queues.size());
+      unsigned int pos = 0;
+      if (mq_index >= 0) {
+        pos = mq_index % m_queues.size();
+      } else {
+        LOG_ERROR("mq_index is negative");
+        return MQMessageQueue();
+      }
+      if (!lastmq.getBrokerName().empty()) {
+        for (size_t i = 0; i < m_queues.size(); i++) {
+          if (m_sendWhichQueue.load(boost::memory_order_acquire) ==
+              numeric_limits<int>::max()) {
+            m_sendWhichQueue.store(0, boost::memory_order_release);
+          }
+
+          if (pos >= m_queues.size()) pos = pos % m_queues.size();
+
+          ++m_sendWhichQueue;
+          MQMessageQueue mq = m_queues.at(pos);
+          LOG_DEBUG("lastmq broker not empty, m_sendWhichQueue:%d, pos:%d",
+                    m_sendWhichQueue.load(boost::memory_order_acquire), pos);
+          if (mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) {
+            mq_index = pos;
+            return mq;
+          }
+          ++pos;
+        }
+        LOG_ERROR("could not find property mq");
+        return MQMessageQueue();
+      } else {
+        if (m_sendWhichQueue.load(boost::memory_order_acquire) ==
+            numeric_limits<int>::max()) {
+          m_sendWhichQueue.store(0, boost::memory_order_release);
+        }
+
+        ++m_sendWhichQueue;
+        LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
+                  m_sendWhichQueue.load(boost::memory_order_acquire), pos);
+        mq_index = pos;
+        return m_queues.at(pos);
+      }
+    } else {
+      LOG_ERROR("m_queues empty");
+      return MQMessageQueue();
+    }
+  }
+
+  MQMessageQueue selectOneActiveMessageQueue(const MQMessageQueue& lastmq,
+                                             int& mq_index) {
+    boost::lock_guard<boost::mutex> lock(m_queuelock);
+
+    if (m_queues.size() > 0) {
+      unsigned int pos = 0;
+      if (mq_index >= 0) {
+        pos = mq_index % m_queues.size();
+      } else {
+        LOG_ERROR("mq_index is negative");
+        return MQMessageQueue();
+      }
+      if (!lastmq.getBrokerName().empty()) {
+        for (size_t i = 0; i < m_queues.size(); i++) {
+          if (m_sendWhichQueue.load(boost::memory_order_acquire) ==
+              numeric_limits<int>::max()) {
+            m_sendWhichQueue.store(0, boost::memory_order_release);
+          }
+
+          if (pos >= m_queues.size()) pos = pos % m_queues.size();
+
+          ++m_sendWhichQueue;
+          MQMessageQueue mq = m_queues.at(pos);
+          string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
+          if ((mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) &&
+              (m_onSerivceQueues.find(key) != m_onSerivceQueues.end())) {
+            mq_index = pos;
+            return mq;
+          }
+          ++pos;
+        }
+
+        for (MQMAP::iterator it = m_nonSerivceQueues.begin();
+             it != m_nonSerivceQueues.end();
+             ++it) {  // if no MQMessageQueue(except lastmq) in
+                      // m_onSerivceQueues, search m_nonSerivceQueues
+          if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
+            return it->second;
+        }
+        LOG_ERROR("can not find property mq");
+        return MQMessageQueue();
+      } else {
+        for (size_t i = 0; i < m_queues.size(); i++) {
+          if (m_sendWhichQueue.load(boost::memory_order_acquire) ==
+              numeric_limits<int>::max()) {
+            m_sendWhichQueue.store(0, boost::memory_order_release);
+          }
+          if (pos >= m_queues.size()) pos = pos % m_queues.size();
+
+          ++m_sendWhichQueue;
+          LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
+                    m_sendWhichQueue.load(boost::memory_order_acquire), pos);
+          mq_index = pos;
+          MQMessageQueue mq = m_queues.at(pos);
+          string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
+          if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) {
+            return mq;
+          } else {
+            ++pos;
+          }
+        }
+
+        for (MQMAP::iterator it = m_nonSerivceQueues.begin();
+             it != m_nonSerivceQueues.end();
+             ++it) {  // if no MQMessageQueue(except lastmq) in
+                      // m_onSerivceQueues, search m_nonSerivceQueues
+          if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
+            return it->second;
+        }
+        LOG_ERROR("can not find property mq");
+        return MQMessageQueue();
+      }
+    } else {
+      LOG_ERROR("m_queues empty");
+      return MQMessageQueue();
+    }
+  }
+
+ private:
+  boost::mutex m_queuelock;
+  typedef vector<MQMessageQueue> QueuesVec;
+  QueuesVec m_queues;
+  typedef map<string, MQMessageQueue> MQMAP;
+  MQMAP m_onSerivceQueues;
+  MQMAP m_nonSerivceQueues;
+  boost::atomic<int> m_sendWhichQueue;
+  map<MQMessageQueue, int64> m_brokerTimerMap;
+  boost::asio::io_service m_async_ioService;
+  boost::scoped_ptr<boost::thread> m_async_service_thread;
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/CommandHeader.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/CommandHeader.cpp b/rocketmq-cpp/src/protocol/CommandHeader.cpp
new file mode 100644
index 0000000..366ac2e
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/CommandHeader.cpp
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CommandHeader.h"
+#include <cstdlib>
+#include <sstream>
+#include "Logging.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!************************************************************************
+void GetRouteInfoRequestHeader::Encode(Json::Value& outData) {
+  outData["topic"] = topic;
+}
+
+void GetRouteInfoRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("topic", topic));
+}
+//<!***************************************************************************
+void UnregisterClientRequestHeader::Encode(Json::Value& outData) {
+  outData["clientID"] = clientID;
+  outData["producerGroup"] = producerGroup;
+  outData["consumerGroup"] = consumerGroup;
+}
+
+void UnregisterClientRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("clientID", clientID));
+  requestMap.insert(pair<string, string>("producerGroup", producerGroup));
+  requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+}
+//<!************************************************************************
+void CreateTopicRequestHeader::Encode(Json::Value& outData) {
+  outData["topic"] = topic;
+  outData["defaultTopic"] = defaultTopic;
+  outData["readQueueNums"] = readQueueNums;
+  outData["writeQueueNums"] = writeQueueNums;
+  outData["perm"] = perm;
+  outData["topicFilterType"] = topicFilterType;
+}
+void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(pair<string, string>("defaultTopic", defaultTopic));
+  requestMap.insert(
+      pair<string, string>("readQueueNums", UtilAll::to_string(readQueueNums)));
+  requestMap.insert(pair<string, string>("writeQueueNums",
+                                         UtilAll::to_string(writeQueueNums)));
+  requestMap.insert(pair<string, string>("perm", UtilAll::to_string(perm)));
+  requestMap.insert(pair<string, string>("topicFilterType", topicFilterType));
+}
+
+//<!************************************************************************
+void SendMessageRequestHeader::Encode(Json::Value& outData) {
+  outData["producerGroup"] = producerGroup;
+  outData["topic"] = topic;
+  outData["defaultTopic"] = defaultTopic;
+  outData["defaultTopicQueueNums"] = defaultTopicQueueNums;
+  outData["queueId"] = queueId;
+  outData["sysFlag"] = sysFlag;
+  outData["bornTimestamp"] = UtilAll::to_string(bornTimestamp);
+  outData["flag"] = flag;
+  outData["properties"] = properties;
+#ifdef ONS
+  outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes);
+  outData["unitMode"] = UtilAll::to_string(unitMode);
+#endif
+}
+
+int SendMessageRequestHeader::getReconsumeTimes() { return reconsumeTimes; }
+
+void SendMessageRequestHeader::setReconsumeTimes(int input_reconsumeTimes) {
+  reconsumeTimes = input_reconsumeTimes;
+}
+
+void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  LOG_DEBUG(
+      "SendMessageRequestHeader producerGroup is:%s,topic is:%s, defaulttopic "
+      "is:%s, properties is:%s,UtilAll::to_string( defaultTopicQueueNums) "
+      "is:%s,UtilAll::to_string( queueId):%s, UtilAll::to_string( sysFlag) "
+      "is:%s, UtilAll::to_string( bornTimestamp) is:%s,UtilAll::to_string( "
+      "flag) is:%s",
+      producerGroup.c_str(), topic.c_str(), defaultTopic.c_str(),
+      properties.c_str(), UtilAll::to_string(defaultTopicQueueNums).c_str(),
+      UtilAll::to_string(queueId).c_str(), UtilAll::to_string(sysFlag).c_str(),
+      UtilAll::to_string(bornTimestamp).c_str(),
+      UtilAll::to_string(flag).c_str());
+
+  requestMap.insert(pair<string, string>("producerGroup", producerGroup));
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(pair<string, string>("defaultTopic", defaultTopic));
+  requestMap.insert(pair<string, string>(
+      "defaultTopicQueueNums", UtilAll::to_string(defaultTopicQueueNums)));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+  requestMap.insert(
+      pair<string, string>("sysFlag", UtilAll::to_string(sysFlag)));
+  requestMap.insert(
+      pair<string, string>("bornTimestamp", UtilAll::to_string(bornTimestamp)));
+  requestMap.insert(pair<string, string>("flag", UtilAll::to_string(flag)));
+  requestMap.insert(pair<string, string>("properties", properties));
+#ifdef ONS
+  requestMap.insert(pair<string, string>("reconsumeTimes",
+                                         UtilAll::to_string(reconsumeTimes)));
+  requestMap.insert(
+      pair<string, string>("unitMode", UtilAll::to_string(unitMode)));
+#endif
+}
+
+//<!************************************************************************
+CommandHeader* SendMessageResponseHeader::Decode(Json::Value& ext) {
+  SendMessageResponseHeader* h = new SendMessageResponseHeader();
+
+  Json::Value& tempValue = ext["msgId"];
+  if (tempValue.isString()) {
+    h->msgId = tempValue.asString();
+  }
+
+  tempValue = ext["queueId"];
+  if (tempValue.isString()) {
+    h->queueId = atoi(tempValue.asCString());
+  }
+
+  tempValue = ext["queueOffset"];
+  if (tempValue.isString()) {
+    h->queueOffset = UtilAll::str2ll(tempValue.asCString());
+  }
+  return h;
+}
+
+void SendMessageResponseHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("msgId", msgId));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+  requestMap.insert(
+      pair<string, string>("queueOffset", UtilAll::to_string(queueOffset)));
+}
+//<!************************************************************************
+void PullMessageRequestHeader::Encode(Json::Value& outData) {
+  outData["consumerGroup"] = consumerGroup;
+  outData["topic"] = topic;
+  outData["queueId"] = queueId;
+  outData["queueOffset"] = UtilAll::to_string(queueOffset);
+  ;
+  outData["maxMsgNums"] = maxMsgNums;
+  outData["sysFlag"] = sysFlag;
+  outData["commitOffset"] = UtilAll::to_string(commitOffset);
+  ;
+  outData["subVersion"] = UtilAll::to_string(subVersion);
+  ;
+  outData["suspendTimeoutMillis"] = UtilAll::to_string(suspendTimeoutMillis);
+  ;
+  outData["subscription"] = subscription;
+}
+
+void PullMessageRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+  requestMap.insert(
+      pair<string, string>("queueOffset", UtilAll::to_string(queueOffset)));
+  requestMap.insert(
+      pair<string, string>("maxMsgNums", UtilAll::to_string(maxMsgNums)));
+  requestMap.insert(
+      pair<string, string>("sysFlag", UtilAll::to_string(sysFlag)));
+  requestMap.insert(
+      pair<string, string>("commitOffset", UtilAll::to_string(commitOffset)));
+  requestMap.insert(
+      pair<string, string>("subVersion", UtilAll::to_string(subVersion)));
+  requestMap.insert(pair<string, string>(
+      "suspendTimeoutMillis", UtilAll::to_string(suspendTimeoutMillis)));
+  requestMap.insert(pair<string, string>("subscription", subscription));
+}
+//<!************************************************************************
+CommandHeader* PullMessageResponseHeader::Decode(Json::Value& ext) {
+  PullMessageResponseHeader* h = new PullMessageResponseHeader();
+
+  Json::Value& tempValue = ext["suggestWhichBrokerId"];
+  if (tempValue.isString()) {
+    h->suggestWhichBrokerId = UtilAll::str2ll(tempValue.asCString());
+  }
+
+  tempValue = ext["nextBeginOffset"];
+  if (tempValue.isString()) {
+    h->nextBeginOffset = UtilAll::str2ll(tempValue.asCString());
+  }
+
+  tempValue = ext["minOffset"];
+  if (tempValue.isString()) {
+    h->minOffset = UtilAll::str2ll(tempValue.asCString());
+  }
+
+  tempValue = ext["maxOffset"];
+  if (tempValue.isString()) {
+    h->maxOffset = UtilAll::str2ll(tempValue.asCString());
+  }
+
+  return h;
+}
+
+void PullMessageResponseHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>(
+      "suggestWhichBrokerId", UtilAll::to_string(suggestWhichBrokerId)));
+  requestMap.insert(pair<string, string>("nextBeginOffset",
+                                         UtilAll::to_string(nextBeginOffset)));
+  requestMap.insert(
+      pair<string, string>("minOffset", UtilAll::to_string(minOffset)));
+  requestMap.insert(
+      pair<string, string>("maxOffset", UtilAll::to_string(maxOffset)));
+}
+//<!************************************************************************
+void GetConsumerListByGroupResponseHeader::Encode(Json::Value& outData) {
+  // outData = "{}";
+}
+
+void GetConsumerListByGroupResponseHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {}
+//<!***************************************************************************
+void GetMinOffsetRequestHeader::Encode(Json::Value& outData) {
+  outData["topic"] = topic;
+  outData["queueId"] = queueId;
+}
+
+void GetMinOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+}
+//<!***************************************************************************
+CommandHeader* GetMinOffsetResponseHeader::Decode(Json::Value& ext) {
+  GetMinOffsetResponseHeader* h = new GetMinOffsetResponseHeader();
+
+  Json::Value& tempValue = ext["offset"];
+  if (tempValue.isString()) {
+    h->offset = UtilAll::str2ll(tempValue.asCString());
+  }
+  return h;
+}
+
+void GetMinOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void GetMaxOffsetRequestHeader::Encode(Json::Value& outData) {
+  outData["topic"] = topic;
+  outData["queueId"] = queueId;
+}
+
+void GetMaxOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+}
+//<!***************************************************************************
+CommandHeader* GetMaxOffsetResponseHeader::Decode(Json::Value& ext) {
+  GetMaxOffsetResponseHeader* h = new GetMaxOffsetResponseHeader();
+
+  Json::Value& tempValue = ext["offset"];
+  if (tempValue.isString()) {
+    h->offset = UtilAll::str2ll(tempValue.asCString());
+  }
+  return h;
+}
+
+void GetMaxOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void SearchOffsetRequestHeader::Encode(Json::Value& outData) {
+  outData["topic"] = topic;
+  outData["queueId"] = queueId;
+  outData["timestamp"] = UtilAll::to_string(timestamp);
+}
+
+void SearchOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+  requestMap.insert(
+      pair<string, string>("timestamp", UtilAll::to_string(timestamp)));
+}
+//<!***************************************************************************
+CommandHeader* SearchOffsetResponseHeader::Decode(Json::Value& ext) {
+  SearchOffsetResponseHeader* h = new SearchOffsetResponseHeader();
+
+  Json::Value& tempValue = ext["offset"];
+  if (tempValue.isString()) {
+    h->offset = UtilAll::str2ll(tempValue.asCString());
+  }
+  return h;
+}
+
+void SearchOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void ViewMessageRequestHeader::Encode(Json::Value& outData) {
+  outData["offset"] = UtilAll::to_string(offset);
+}
+
+void ViewMessageRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void GetEarliestMsgStoretimeRequestHeader::Encode(Json::Value& outData) {
+  outData["topic"] = topic;
+  outData["queueId"] = queueId;
+}
+
+void GetEarliestMsgStoretimeRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+}
+//<!***************************************************************************
+CommandHeader* GetEarliestMsgStoretimeResponseHeader::Decode(
+    Json::Value& ext) {
+  GetEarliestMsgStoretimeResponseHeader* h =
+      new GetEarliestMsgStoretimeResponseHeader();
+
+  Json::Value& tempValue = ext["timestamp"];
+  if (tempValue.isString()) {
+    h->timestamp = UtilAll::str2ll(tempValue.asCString());
+  }
+  return h;
+}
+
+void GetEarliestMsgStoretimeResponseHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(
+      pair<string, string>("timestamp", UtilAll::to_string(timestamp)));
+}
+//<!***************************************************************************
+void GetConsumerListByGroupRequestHeader::Encode(Json::Value& outData) {
+  outData["consumerGroup"] = consumerGroup;
+}
+
+void GetConsumerListByGroupRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+}
+//<!***************************************************************************
+void QueryConsumerOffsetRequestHeader::Encode(Json::Value& outData) {
+  outData["consumerGroup"] = consumerGroup;
+  outData["topic"] = topic;
+  outData["queueId"] = queueId;
+}
+
+void QueryConsumerOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+}
+//<!***************************************************************************
+CommandHeader* QueryConsumerOffsetResponseHeader::Decode(
+    Json::Value& ext) {
+  QueryConsumerOffsetResponseHeader* h =
+      new QueryConsumerOffsetResponseHeader();
+  Json::Value& tempValue = ext["offset"];
+  if (tempValue.isString()) {
+    h->offset = UtilAll::str2ll(tempValue.asCString());
+  }
+  return h;
+}
+
+void QueryConsumerOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void UpdateConsumerOffsetRequestHeader::Encode(Json::Value& outData) {
+  outData["consumerGroup"] = consumerGroup;
+  outData["topic"] = topic;
+  outData["queueId"] = queueId;
+  outData["commitOffset"] = UtilAll::to_string(commitOffset);
+}
+
+void UpdateConsumerOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+  requestMap.insert(pair<string, string>("topic", topic));
+  requestMap.insert(
+      pair<string, string>("queueId", UtilAll::to_string(queueId)));
+  requestMap.insert(
+      pair<string, string>("commitOffset", UtilAll::to_string(commitOffset)));
+}
+//<!***************************************************************************
+void ConsumerSendMsgBackRequestHeader::Encode(Json::Value& outData) {
+  outData["group"] = group;
+  outData["delayLevel"] = delayLevel;
+  outData["offset"] = UtilAll::to_string(offset);
+#ifdef ONS
+  outData["originMsgId"] = originMsgId;
+  outData["originTopic"] = originTopic;
+#endif
+}
+
+void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("group", group));
+  requestMap.insert(
+      pair<string, string>("delayLevel", UtilAll::to_string(delayLevel)));
+  requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void GetConsumerListByGroupResponseBody::Decode(const MemoryBlock* mem,
+                                                vector<string>& cids) {
+  cids.clear();
+  //<! decode;
+  const char* const pData = static_cast<const char*>(mem->getData());
+
+  Json::Reader reader;
+  Json::Value root;
+  if (!reader.parse(pData, root)) {
+    LOG_ERROR("GetConsumerListByGroupResponse error");
+    return;
+  }
+
+  Json::Value ids = root["consumerIdList"];
+  for (unsigned int i = 0; i < ids.size(); i++) {
+    if (ids[i].isString()) {
+      cids.push_back(ids[i].asString());
+    }
+  }
+}
+
+void GetConsumerListByGroupResponseBody::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {}
+
+void ResetOffsetRequestHeader::setTopic(const string& tmp) { topic = tmp; }
+
+void ResetOffsetRequestHeader::setGroup(const string& tmp) { group = tmp; }
+
+void ResetOffsetRequestHeader::setTimeStamp(const int64& tmp) {
+  timestamp = tmp;
+}
+
+void ResetOffsetRequestHeader::setForceFlag(const bool& tmp) { isForce = tmp; }
+
+const string ResetOffsetRequestHeader::getTopic() const { return topic; }
+
+const string ResetOffsetRequestHeader::getGroup() const { return group; }
+
+const int64 ResetOffsetRequestHeader::getTimeStamp() const { return timestamp; }
+
+const bool ResetOffsetRequestHeader::getForceFlag() const { return isForce; }
+
+CommandHeader* ResetOffsetRequestHeader::Decode(Json::Value& ext) {
+  ResetOffsetRequestHeader* h = new ResetOffsetRequestHeader();
+
+  Json::Value& tempValue = ext["topic"];
+  if (tempValue.isString()) {
+    h->topic = tempValue.asString();
+  }
+
+  tempValue = ext["group"];
+  if (tempValue.isString()) {
+    h->group = tempValue.asString();
+  }
+
+  tempValue = ext["timestamp"];
+  if (tempValue.isString()) {
+    h->timestamp = UtilAll::str2ll(tempValue.asCString());
+  }
+
+  tempValue = ext["isForce"];
+  if (tempValue.isString()) {
+    h->isForce = UtilAll::to_bool(tempValue.asCString());
+  }
+  LOG_INFO("topic:%s, group:%s, timestamp:%lld, isForce:%d,isForce:%s",
+           h->topic.c_str(), h->group.c_str(), h->timestamp, h->isForce,
+           tempValue.asCString());
+  return h;
+}
+
+CommandHeader* GetConsumerRunningInfoRequestHeader::Decode(
+    Json::Value& ext) {
+  GetConsumerRunningInfoRequestHeader* h =
+      new GetConsumerRunningInfoRequestHeader();
+
+  Json::Value& tempValue = ext["consumerGroup"];
+  if (tempValue.isString()) {
+    h->consumerGroup = tempValue.asString();
+  }
+
+  tempValue = ext["clientId"];
+  if (tempValue.isString()) {
+    h->clientId = tempValue.asString();
+  }
+
+  tempValue = ext["jstackEnable"];
+  if (tempValue.isString()) {
+    h->jstackEnable = UtilAll::to_bool(tempValue.asCString());
+  }
+  LOG_INFO("consumerGroup:%s, clientId:%s,  jstackEnable:%d",
+           h->consumerGroup.c_str(), h->clientId.c_str(), h->jstackEnable);
+  return h;
+}
+
+void GetConsumerRunningInfoRequestHeader::Encode(Json::Value& outData) {
+  outData["consumerGroup"] = consumerGroup;
+  outData["clientId"] = clientId;
+  outData["jstackEnable"] = jstackEnable;
+}
+
+void GetConsumerRunningInfoRequestHeader::SetDeclaredFieldOfCommandHeader(
+    map<string, string>& requestMap) {
+  requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+  requestMap.insert(pair<string, string>("clientId", clientId));
+  requestMap.insert(
+      pair<string, string>("jstackEnable", UtilAll::to_string(jstackEnable)));
+}
+
+const string GetConsumerRunningInfoRequestHeader::getConsumerGroup() const {
+  return consumerGroup;
+}
+
+void GetConsumerRunningInfoRequestHeader::setConsumerGroup(
+    const string& Group) {
+  consumerGroup = Group;
+}
+
+const string GetConsumerRunningInfoRequestHeader::getClientId() const {
+  return clientId;
+}
+
+void GetConsumerRunningInfoRequestHeader::setClientId(
+    const string& input_clientId) {
+  clientId = input_clientId;
+}
+
+const bool GetConsumerRunningInfoRequestHeader::isJstackEnable() const {
+  return jstackEnable;
+}
+
+void GetConsumerRunningInfoRequestHeader::setJstackEnable(
+    const bool& input_jstackEnable) {
+  jstackEnable = input_jstackEnable;
+}
+
+CommandHeader* NotifyConsumerIdsChangedRequestHeader::Decode(
+    Json::Value& ext) {
+  NotifyConsumerIdsChangedRequestHeader* h =
+      new NotifyConsumerIdsChangedRequestHeader();
+
+  Json::Value& tempValue = ext["consumerGroup"];
+  if (tempValue.isString()) {
+    h->consumerGroup = tempValue.asString();
+  }
+
+  return h;
+}
+
+void NotifyConsumerIdsChangedRequestHeader::setGroup(const string& tmp) {
+  consumerGroup = tmp;
+}
+const string NotifyConsumerIdsChangedRequestHeader::getGroup() const {
+  return consumerGroup;
+}
+
+//<!************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/CommandHeader.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/CommandHeader.h b/rocketmq-cpp/src/protocol/CommandHeader.h
new file mode 100644
index 0000000..5a55c55
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/CommandHeader.h
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef __COMMANDCUSTOMHEADER_H__
+#define __COMMANDCUSTOMHEADER_H__
+
+#include <string>
+#include "MQClientException.h"
+#include "MessageSysFlag.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+#include "json/json.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+
+class CommandHeader {
+ public:
+  virtual ~CommandHeader() {}
+  virtual void Encode(Json::Value& outData) {}
+  virtual void SetDeclaredFieldOfCommandHeader(
+      map<string, string>& requestMap) {}
+};
+
+//<!************************************************************************
+class GetRouteInfoRequestHeader : public CommandHeader {
+ public:
+  GetRouteInfoRequestHeader(const string& top) : topic(top) {}
+  virtual ~GetRouteInfoRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ private:
+  string topic;
+};
+
+//<!************************************************************************
+class UnregisterClientRequestHeader : public CommandHeader {
+ public:
+  UnregisterClientRequestHeader(string cID, string proGroup, string conGroup)
+      : clientID(cID), producerGroup(proGroup), consumerGroup(conGroup) {}
+  virtual ~UnregisterClientRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ private:
+  string clientID;
+  string producerGroup;
+  string consumerGroup;
+};
+
+//<!************************************************************************
+class CreateTopicRequestHeader : public CommandHeader {
+ public:
+  CreateTopicRequestHeader() : readQueueNums(0), writeQueueNums(0), perm(0) {}
+  virtual ~CreateTopicRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string topic;
+  string defaultTopic;
+  int readQueueNums;
+  int writeQueueNums;
+  int perm;
+  string topicFilterType;
+};
+
+//<!************************************************************************
+class SendMessageRequestHeader : public CommandHeader {
+ public:
+  SendMessageRequestHeader()
+      : defaultTopicQueueNums(0),
+        queueId(0),
+        sysFlag(0),
+        bornTimestamp(0),
+        flag(0),
+        reconsumeTimes(0),
+        unitMode(false) {}
+  virtual ~SendMessageRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+  int getReconsumeTimes();
+  void setReconsumeTimes(int input_reconsumeTimes);
+
+ public:
+  string producerGroup;
+  string topic;
+  string defaultTopic;
+  int defaultTopicQueueNums;
+  int queueId;
+  int sysFlag;
+  int64 bornTimestamp;
+  int flag;
+  string properties;
+  int reconsumeTimes;
+  bool unitMode;
+};
+
+//<!************************************************************************
+class SendMessageResponseHeader : public CommandHeader {
+ public:
+  SendMessageResponseHeader() : queueId(0), queueOffset(0) { msgId.clear(); }
+  virtual ~SendMessageResponseHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string msgId;
+  int queueId;
+  int64 queueOffset;
+};
+
+//<!************************************************************************
+class PullMessageRequestHeader : public CommandHeader {
+ public:
+  PullMessageRequestHeader()
+      : queueId(0),
+        maxMsgNums(0),
+        sysFlag(0),
+        queueOffset(0),
+        commitOffset(0),
+        suspendTimeoutMillis(0),
+        subVersion(0) {}
+  virtual ~PullMessageRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string consumerGroup;
+  string topic;
+  int queueId;
+  int maxMsgNums;
+  int sysFlag;
+  string subscription;
+  int64 queueOffset;
+  int64 commitOffset;
+  int64 suspendTimeoutMillis;
+  int64 subVersion;
+};
+
+//<!************************************************************************
+class PullMessageResponseHeader : public CommandHeader {
+ public:
+  PullMessageResponseHeader()
+      : suggestWhichBrokerId(0),
+        nextBeginOffset(0),
+        minOffset(0),
+        maxOffset(0) {}
+  virtual ~PullMessageResponseHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  int64 suggestWhichBrokerId;
+  int64 nextBeginOffset;
+  int64 minOffset;
+  int64 maxOffset;
+};
+
+//<!************************************************************************
+class GetConsumerListByGroupResponseHeader : public CommandHeader {
+ public:
+  GetConsumerListByGroupResponseHeader() {}
+  virtual ~GetConsumerListByGroupResponseHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+};
+
+//<!***************************************************************************
+class GetMinOffsetRequestHeader : public CommandHeader {
+ public:
+  GetMinOffsetRequestHeader() : queueId(0){};
+  virtual ~GetMinOffsetRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string topic;
+  int queueId;
+};
+
+//<!***************************************************************************
+class GetMinOffsetResponseHeader : public CommandHeader {
+ public:
+  GetMinOffsetResponseHeader() : offset(0){};
+  virtual ~GetMinOffsetResponseHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  int64 offset;
+};
+
+//<!***************************************************************************
+class GetMaxOffsetRequestHeader : public CommandHeader {
+ public:
+  GetMaxOffsetRequestHeader() : queueId(0){};
+  virtual ~GetMaxOffsetRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string topic;
+  int queueId;
+};
+
+//<!***************************************************************************
+class GetMaxOffsetResponseHeader : public CommandHeader {
+ public:
+  GetMaxOffsetResponseHeader() : offset(0){};
+  virtual ~GetMaxOffsetResponseHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  int64 offset;
+};
+
+//<!***************************************************************************
+class SearchOffsetRequestHeader : public CommandHeader {
+ public:
+  SearchOffsetRequestHeader() : queueId(0), timestamp(0){};
+  virtual ~SearchOffsetRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string topic;
+  int queueId;
+  int64 timestamp;
+};
+
+//<!***************************************************************************
+class SearchOffsetResponseHeader : public CommandHeader {
+ public:
+  SearchOffsetResponseHeader() : offset(0){};
+  virtual ~SearchOffsetResponseHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  int64 offset;
+};
+
+//<!***************************************************************************
+class ViewMessageRequestHeader : public CommandHeader {
+ public:
+  ViewMessageRequestHeader() : offset(0){};
+  virtual ~ViewMessageRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  int64 offset;
+};
+
+//<!***************************************************************************
+class GetEarliestMsgStoretimeRequestHeader : public CommandHeader {
+ public:
+  GetEarliestMsgStoretimeRequestHeader() : queueId(0){};
+  virtual ~GetEarliestMsgStoretimeRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string topic;
+  int queueId;
+};
+
+//<!***************************************************************************
+class GetEarliestMsgStoretimeResponseHeader : public CommandHeader {
+ public:
+  GetEarliestMsgStoretimeResponseHeader() : timestamp(0){};
+  virtual ~GetEarliestMsgStoretimeResponseHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  int64 timestamp;
+};
+
+//<!***************************************************************************
+class GetConsumerListByGroupRequestHeader : public CommandHeader {
+ public:
+  GetConsumerListByGroupRequestHeader(){};
+  virtual ~GetConsumerListByGroupRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string consumerGroup;
+};
+
+//<!************************************************************************
+class QueryConsumerOffsetRequestHeader : public CommandHeader {
+ public:
+  QueryConsumerOffsetRequestHeader() : queueId(0){};
+  virtual ~QueryConsumerOffsetRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string consumerGroup;
+  string topic;
+  int queueId;
+};
+
+//<!************************************************************************
+class QueryConsumerOffsetResponseHeader : public CommandHeader {
+ public:
+  QueryConsumerOffsetResponseHeader() : offset(0){};
+  virtual ~QueryConsumerOffsetResponseHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  int64 offset;
+};
+
+//<!************************************************************************
+class UpdateConsumerOffsetRequestHeader : public CommandHeader {
+ public:
+  UpdateConsumerOffsetRequestHeader() : queueId(0), commitOffset(0){};
+  virtual ~UpdateConsumerOffsetRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string consumerGroup;
+  string topic;
+  int queueId;
+  int64 commitOffset;
+};
+
+//<!***************************************************************************
+class ConsumerSendMsgBackRequestHeader : public CommandHeader {
+ public:
+  ConsumerSendMsgBackRequestHeader() : delayLevel(0), offset(0){};
+  virtual ~ConsumerSendMsgBackRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  string group;
+  int delayLevel;
+  int64 offset;
+};
+
+//<!***************************************************************************
+class GetConsumerListByGroupResponseBody {
+ public:
+  GetConsumerListByGroupResponseBody(){};
+  virtual ~GetConsumerListByGroupResponseBody() {}
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+  static void Decode(const MemoryBlock* mem, vector<string>& cids);
+};
+
+class ResetOffsetRequestHeader : public CommandHeader {
+ public:
+  ResetOffsetRequestHeader() {}
+  ~ResetOffsetRequestHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  void setTopic(const string& tmp);
+  void setGroup(const string& tmp);
+  void setTimeStamp(const int64& tmp);
+  void setForceFlag(const bool& tmp);
+  const string getTopic() const;
+  const string getGroup() const;
+  const int64 getTimeStamp() const;
+  const bool getForceFlag() const;
+
+ private:
+  string topic;
+  string group;
+  int64 timestamp;
+  bool isForce;
+};
+
+class GetConsumerRunningInfoRequestHeader : public CommandHeader {
+ public:
+  GetConsumerRunningInfoRequestHeader() {}
+  virtual ~GetConsumerRunningInfoRequestHeader() {}
+  virtual void Encode(Json::Value& outData);
+  virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+  static CommandHeader* Decode(Json::Value& ext);
+  const string getConsumerGroup() const;
+  void setConsumerGroup(const string& consumerGroup);
+  const string getClientId() const;
+  void setClientId(const string& clientId);
+  const bool isJstackEnable() const;
+  void setJstackEnable(const bool& jstackEnable);
+
+ private:
+  string consumerGroup;
+  string clientId;
+  bool jstackEnable;
+};
+
+class NotifyConsumerIdsChangedRequestHeader : public CommandHeader {
+ public:
+  NotifyConsumerIdsChangedRequestHeader() {}
+  virtual ~NotifyConsumerIdsChangedRequestHeader() {}
+  static CommandHeader* Decode(Json::Value& ext);
+  void setGroup(const string& tmp);
+  const string getGroup() const;
+
+ private:
+  string consumerGroup;
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp
new file mode 100644
index 0000000..10ac0aa
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp
@@ -0,0 +1,109 @@
+#include "ConsumerRunningInfo.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+const string ConsumerRunningInfo::PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
+const string ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE =
+    "PROP_THREADPOOL_CORE_SIZE";
+const string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
+const string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
+const string ConsumerRunningInfo::PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
+const string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP =
+    "PROP_CONSUMER_START_TIMESTAMP";
+
+const map<string, string> ConsumerRunningInfo::getProperties() const {
+  return properties;
+}
+
+void ConsumerRunningInfo::setProperties(
+    const map<string, string>& input_properties) {
+  properties = input_properties;
+}
+
+void ConsumerRunningInfo::setProperty(const string& key, const string& value) {
+  properties[key] = value;
+}
+
+const map<MessageQueue, ProcessQueueInfo> ConsumerRunningInfo::getMqTable()
+    const {
+  return mqTable;
+}
+
+void ConsumerRunningInfo::setMqTable(MessageQueue queue,
+                                     ProcessQueueInfo queueInfo) {
+  mqTable[queue] = queueInfo;
+}
+
+/*const map<string, ConsumeStatus> ConsumerRunningInfo::getStatusTable() const
+{
+return statusTable;
+}
+
+
+void ConsumerRunningInfo::setStatusTable(const map<string, ConsumeStatus>&
+input_statusTable)
+{
+statusTable = input_statusTable;
+}	*/
+
+const vector<SubscriptionData> ConsumerRunningInfo::getSubscriptionSet() const {
+  return subscriptionSet;
+}
+
+void ConsumerRunningInfo::setSubscriptionSet(
+    const vector<SubscriptionData>& input_subscriptionSet) {
+  subscriptionSet = input_subscriptionSet;
+}
+
+const string ConsumerRunningInfo::getJstack() const { return jstack; }
+
+void ConsumerRunningInfo::setJstack(const string& input_jstack) {
+  jstack = input_jstack;
+}
+
+string ConsumerRunningInfo::encode() {
+  Json::Value outData;
+
+  outData[PROP_NAMESERVER_ADDR] = properties[PROP_NAMESERVER_ADDR];
+  outData[PROP_CONSUME_TYPE] = properties[PROP_CONSUME_TYPE];
+  outData[PROP_CLIENT_VERSION] = properties[PROP_CLIENT_VERSION];
+  outData[PROP_CONSUMER_START_TIMESTAMP] =
+      properties[PROP_CONSUMER_START_TIMESTAMP];
+  outData[PROP_CONSUME_ORDERLY] = properties[PROP_CONSUME_ORDERLY];
+  outData[PROP_THREADPOOL_CORE_SIZE] = properties[PROP_THREADPOOL_CORE_SIZE];
+
+  Json::Value root;
+  root["jstack"] = jstack;
+  root["properties"] = outData;
+
+  {
+    vector<SubscriptionData>::const_iterator it = subscriptionSet.begin();
+    for (; it != subscriptionSet.end(); it++) {
+      root["subscriptionSet"].append(it->toJson());
+    }
+  }
+
+  Json::FastWriter fastwrite;
+  string finals = fastwrite.write(root);
+
+  Json::Value mq;
+  string key = "\"mqTable\":";
+  key.append("{");
+  for (map<MessageQueue, ProcessQueueInfo>::iterator it = mqTable.begin();
+       it != mqTable.end(); ++it) {
+    key.append((it->first).toJson().toStyledString());
+    key.erase(key.end() - 1);
+    key.append(":");
+    key.append((it->second).toJson().toStyledString());
+    key.append(",");
+  }
+  key.erase(key.end() - 1);
+  key.append("}");
+
+  // insert mqTable to final string
+  key.append(",");
+  finals.insert(1, key);
+
+  return finals;
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h
new file mode 100644
index 0000000..6467ad5
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h
@@ -0,0 +1,50 @@
+#ifndef __CONSUMERRUNNINGINFO_H__
+#define __CONSUMERRUNNINGINFO_H__
+
+#include "MessageQueue.h"
+#include "ProcessQueueInfo.h"
+#include "SubscriptionData.h"
+
+namespace rocketmq {
+
+class ConsumerRunningInfo {
+ public:
+  ConsumerRunningInfo() {}
+  virtual ~ConsumerRunningInfo() {
+    properties.clear();
+    mqTable.clear();
+    subscriptionSet.clear();
+  }
+
+ public:
+  static const string PROP_NAMESERVER_ADDR;
+  static const string PROP_THREADPOOL_CORE_SIZE;
+  static const string PROP_CONSUME_ORDERLY;
+  static const string PROP_CONSUME_TYPE;
+  static const string PROP_CLIENT_VERSION;
+  static const string PROP_CONSUMER_START_TIMESTAMP;
+
+ public:
+  const map<string, string> getProperties() const;
+  void setProperties(const map<string, string>& input_properties);
+  void setProperty(const string& key, const string& value);
+  const map<MessageQueue, ProcessQueueInfo> getMqTable() const;
+  void setMqTable(MessageQueue queue, ProcessQueueInfo queueInfo);
+  // const map<string, ConsumeStatus> getStatusTable() const;
+  // void setStatusTable(const map<string, ConsumeStatus>& input_statusTable) ;
+  const vector<SubscriptionData> getSubscriptionSet() const;
+  void setSubscriptionSet(
+      const vector<SubscriptionData>& input_subscriptionSet);
+  const string getJstack() const;
+  void setJstack(const string& input_jstack);
+  string encode();
+
+ private:
+  map<string, string> properties;
+  vector<SubscriptionData> subscriptionSet;
+  map<MessageQueue, ProcessQueueInfo> mqTable;
+  // map<string, ConsumeStatus> statusTable;
+  string jstack;
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/HeartbeatData.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/HeartbeatData.h b/rocketmq-cpp/src/protocol/HeartbeatData.h
new file mode 100755
index 0000000..9b74280
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/HeartbeatData.h
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __HEARTBEATDATA_H__
+#define __HEARTBEATDATA_H__
+#include <boost/thread/thread.hpp>
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include "ConsumeType.h"
+#include "SubscriptionData.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class ProducerData {
+ public:
+  ProducerData(){};
+  bool operator<(const ProducerData& pd) const {
+    return groupName < pd.groupName;
+  }
+  Json::Value toJson() const {
+    Json::Value outJson;
+    outJson["groupName"] = groupName;
+    return outJson;
+  }
+
+ public:
+  string groupName;
+};
+
+//<!***************************************************************************
+class ConsumerData {
+ public:
+  ConsumerData(){};
+  virtual ~ConsumerData() { subscriptionDataSet.clear(); }
+  bool operator<(const ConsumerData& cd) const {
+    return groupName < cd.groupName;
+  }
+
+  Json::Value toJson() const {
+    Json::Value outJson;
+    outJson["groupName"] = groupName;
+    outJson["consumeFromWhere"] = consumeFromWhere;
+    outJson["consumeType"] = consumeType;
+    outJson["messageModel"] = messageModel;
+
+    vector<SubscriptionData>::const_iterator it = subscriptionDataSet.begin();
+    for (; it != subscriptionDataSet.end(); it++) {
+      outJson["subscriptionDataSet"].append((*it).toJson());
+    }
+
+    return outJson;
+  }
+
+ public:
+  string groupName;
+  ConsumeType consumeType;
+  MessageModel messageModel;
+  ConsumeFromWhere consumeFromWhere;
+  vector<SubscriptionData> subscriptionDataSet;
+};
+
+//<!***************************************************************************
+class HeartbeatData {
+ public:
+  virtual ~HeartbeatData() {
+    m_producerDataSet.clear();
+    m_consumerDataSet.clear();
+  }
+  void Encode(string& outData) {
+    Json::Value root;
+
+    //<!id;
+    root["clientID"] = m_clientID;
+
+    //<!consumer;
+    {
+      boost::lock_guard<boost::mutex> lock(m_consumerDataMutex);
+      vector<ConsumerData>::iterator itc = m_consumerDataSet.begin();
+      for (; itc != m_consumerDataSet.end(); itc++) {
+        root["consumerDataSet"].append((*itc).toJson());
+      }
+    }
+
+    //<!producer;
+    {
+      boost::lock_guard<boost::mutex> lock(m_producerDataMutex);
+      vector<ProducerData>::iterator itp = m_producerDataSet.begin();
+      for (; itp != m_producerDataSet.end(); itp++) {
+        root["producerDataSet"].append((*itp).toJson());
+      }
+    }
+    //<!output;
+    Json::FastWriter fastwrite;
+    outData = fastwrite.write(root);
+  }
+
+  void setClientID(const string& clientID) { m_clientID = clientID; }
+
+  bool isProducerDataSetEmpty() {
+    boost::lock_guard<boost::mutex> lock(m_producerDataMutex);
+    return m_producerDataSet.empty();
+  }
+
+  void insertDataToProducerDataSet(ProducerData& producerData) {
+    boost::lock_guard<boost::mutex> lock(m_producerDataMutex);
+    m_producerDataSet.push_back(producerData);
+  }
+
+  bool isConsumerDataSetEmpty() {
+    boost::lock_guard<boost::mutex> lock(m_consumerDataMutex);
+    return m_consumerDataSet.empty();
+  }
+
+  void insertDataToConsumerDataSet(ConsumerData& consumerData) {
+    boost::lock_guard<boost::mutex> lock(m_consumerDataMutex);
+    m_consumerDataSet.push_back(consumerData);
+  }
+
+ private:
+  string m_clientID;
+  vector<ProducerData> m_producerDataSet;
+  vector<ConsumerData> m_consumerDataSet;
+  boost::mutex m_producerDataMutex;
+  boost::mutex m_consumerDataMutex;
+};
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/KVTable.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/KVTable.h b/rocketmq-cpp/src/protocol/KVTable.h
new file mode 100755
index 0000000..69191b7
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/KVTable.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __KVTABLE_H__
+#define __KVTABLE_H__
+#include <map>
+#include <string>
+#include "RemotingSerializable.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class KVTable : public RemotingSerializable {
+ public:
+  virtual ~KVTable() { m_table.clear(); }
+
+  void Encode(string& outData) {}
+
+  const map<string, string>& getTable() { return m_table; }
+
+  void setTable(const map<string, string>& table) { m_table = table; }
+
+ private:
+  map<string, string> m_table;
+};
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/LockBatchBody.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/LockBatchBody.cpp b/rocketmq-cpp/src/protocol/LockBatchBody.cpp
new file mode 100755
index 0000000..c56c17f
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/LockBatchBody.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LockBatchBody.h"
+#include "Logging.h"
+namespace rocketmq {  //<!end namespace;
+
+string LockBatchRequestBody::getConsumerGroup() { return consumerGroup; }
+void LockBatchRequestBody::setConsumerGroup(string in_consumerGroup) {
+  consumerGroup = in_consumerGroup;
+}
+string LockBatchRequestBody::getClientId() { return clientId; }
+void LockBatchRequestBody::setClientId(string in_clientId) {
+  clientId = in_clientId;
+}
+vector<MQMessageQueue> LockBatchRequestBody::getMqSet() { return mqSet; }
+void LockBatchRequestBody::setMqSet(vector<MQMessageQueue> in_mqSet) {
+  mqSet.swap(in_mqSet);
+}
+void LockBatchRequestBody::Encode(string& outData) {
+  Json::Value root;
+  root["consumerGroup"] = consumerGroup;
+  root["clientId"] = clientId;
+
+  vector<MQMessageQueue>::const_iterator it = mqSet.begin();
+  for (; it != mqSet.end(); it++) {
+    root["mqSet"].append(toJson(*it));
+  }
+
+  Json::FastWriter fastwrite;
+  outData = fastwrite.write(root);
+}
+
+Json::Value LockBatchRequestBody::toJson(const MQMessageQueue& mq) const {
+  Json::Value outJson;
+  outJson["topic"] = mq.getTopic();
+  outJson["brokerName"] = mq.getBrokerName();
+  outJson["queueId"] = mq.getQueueId();
+  return outJson;
+}
+
+vector<MQMessageQueue> LockBatchResponseBody::getLockOKMQSet() {
+  return lockOKMQSet;
+}
+void LockBatchResponseBody::setLockOKMQSet(
+    vector<MQMessageQueue> in_lockOKMQSet) {
+  lockOKMQSet.swap(in_lockOKMQSet);
+}
+
+void LockBatchResponseBody::Decode(const MemoryBlock* mem,
+                                   vector<MQMessageQueue>& messageQueues) {
+  messageQueues.clear();
+  //<! decode;
+  const char* const pData = static_cast<const char*>(mem->getData());
+
+  Json::Reader reader;
+  Json::Value root;
+  if (!reader.parse(pData, root)) {
+    LOG_WARN("decode LockBatchResponseBody error");
+    return;
+  }
+
+  Json::Value mqs = root["lockOKMQSet"];
+  LOG_DEBUG("LockBatchResponseBody mqs size:%d", mqs.size());
+  for (unsigned int i = 0; i < mqs.size(); i++) {
+    MQMessageQueue mq;
+    Json::Value qd = mqs[i];
+    mq.setTopic(qd["topic"].asString());
+    mq.setBrokerName(qd["brokerName"].asString());
+    mq.setQueueId(qd["queueId"].asInt());
+    LOG_INFO("LockBatchResponseBody MQ:%s", mq.toString().c_str());
+    messageQueues.push_back(mq);
+  }
+}
+
+string UnlockBatchRequestBody::getConsumerGroup() { return consumerGroup; }
+void UnlockBatchRequestBody::setConsumerGroup(string in_consumerGroup) {
+  consumerGroup = in_consumerGroup;
+}
+string UnlockBatchRequestBody::getClientId() { return clientId; }
+void UnlockBatchRequestBody::setClientId(string in_clientId) {
+  clientId = in_clientId;
+}
+vector<MQMessageQueue> UnlockBatchRequestBody::getMqSet() { return mqSet; }
+void UnlockBatchRequestBody::setMqSet(vector<MQMessageQueue> in_mqSet) {
+  mqSet.swap(in_mqSet);
+}
+void UnlockBatchRequestBody::Encode(string& outData) {
+  Json::Value root;
+  root["consumerGroup"] = consumerGroup;
+  root["clientId"] = clientId;
+
+  vector<MQMessageQueue>::const_iterator it = mqSet.begin();
+  for (; it != mqSet.end(); it++) {
+    root["mqSet"].append(toJson(*it));
+  }
+
+  Json::FastWriter fastwrite;
+  outData = fastwrite.write(root);
+}
+
+Json::Value UnlockBatchRequestBody::toJson(
+    const MQMessageQueue& mq) const {
+  Json::Value outJson;
+  outJson["topic"] = mq.getTopic();
+  outJson["brokerName"] = mq.getBrokerName();
+  outJson["queueId"] = mq.getQueueId();
+  return outJson;
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/LockBatchBody.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/LockBatchBody.h b/rocketmq-cpp/src/protocol/LockBatchBody.h
new file mode 100755
index 0000000..c1d7155
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/LockBatchBody.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOCKBATCHBODY_H__
+#define __LOCKBATCHBODY_H__
+#include <set>
+#include <string>
+#include "MQMessageQueue.h"
+#include "RemotingSerializable.h"
+#include "dataBlock.h"
+#include "json/json.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+
+class LockBatchRequestBody {
+ public:
+  virtual ~LockBatchRequestBody() { mqSet.clear(); }
+  string getConsumerGroup();
+  void setConsumerGroup(string consumerGroup);
+  string getClientId();
+  void setClientId(string clientId);
+  vector<MQMessageQueue> getMqSet();
+  void setMqSet(vector<MQMessageQueue> mqSet);
+  void Encode(string& outData);
+  Json::Value toJson(const MQMessageQueue& mq) const;
+
+ private:
+  string consumerGroup;
+  string clientId;
+  vector<MQMessageQueue> mqSet;
+};
+
+class LockBatchResponseBody {
+ public:
+  virtual ~LockBatchResponseBody() { lockOKMQSet.clear(); }
+  vector<MQMessageQueue> getLockOKMQSet();
+  void setLockOKMQSet(vector<MQMessageQueue> lockOKMQSet);
+  static void Decode(const MemoryBlock* mem,
+                     vector<MQMessageQueue>& messageQueues);
+
+ private:
+  vector<MQMessageQueue> lockOKMQSet;
+};
+
+class UnlockBatchRequestBody {
+ public:
+  virtual ~UnlockBatchRequestBody() { mqSet.clear(); }
+  string getConsumerGroup();
+  void setConsumerGroup(string consumerGroup);
+  string getClientId();
+  void setClientId(string clientId);
+  vector<MQMessageQueue> getMqSet();
+  void setMqSet(vector<MQMessageQueue> mqSet);
+  void Encode(string& outData);
+  Json::Value toJson(const MQMessageQueue& mq) const;
+
+ private:
+  string consumerGroup;
+  string clientId;
+  vector<MQMessageQueue> mqSet;
+};
+
+}  //<!end namespace;
+#endif