You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:31 UTC

[rocketmq-client-cpp] 14/29: style: protocol

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 9c1c68959983e9b360eb9422bfba860636626602
Author: James Yin <yw...@hotmail.com>
AuthorDate: Thu Jul 30 16:03:56 2020 +0800

    style: protocol
---
 src/ClientRemotingProcessor.cpp                    |  32 ++--
 src/MQClientAPIImpl.cpp                            |   3 +-
 src/MQClientAPIImpl.h                              |   7 +-
 src/MQClientInstance.cpp                           |  28 ++--
 src/MQClientInstance.h                             |   6 +-
 src/common/FilterAPI.hpp                           |  18 +--
 src/common/UtilAll.cpp                             |   1 +
 src/consumer/DefaultMQPushConsumerImpl.cpp         |   4 +-
 src/consumer/DefaultMQPushConsumerImpl.h           |   2 +-
 src/consumer/LocalFileOffsetStore.cpp              |   2 +-
 src/consumer/MQConsumerInner.h                     |   4 +-
 src/consumer/ProcessQueue.cpp                      |   2 +-
 src/consumer/PullAPIWrapper.cpp                    |   2 +-
 src/consumer/PullAPIWrapper.h                      |   2 +-
 src/consumer/RebalanceImpl.cpp                     |  25 ++-
 src/consumer/RebalanceImpl.h                       |   2 +-
 src/consumer/RemoteBrokerOffsetStore.cpp           |   2 +-
 src/consumer/SubscriptionData.cpp                  |  87 ----------
 src/log/Logging.h                                  |   4 +
 src/producer/TopicPublishInfo.hpp                  |   2 +-
 src/protocol/HeartbeatData.h                       | 109 -------------
 src/protocol/MessageQueue.cpp                      |  29 ----
 src/protocol/{MessageQueue.h => MessageQueue.hpp}  |  14 +-
 src/protocol/TopicList.h                           |   6 +-
 src/protocol/{ => body}/ConsumerRunningInfo.cpp    |   0
 src/protocol/{ => body}/ConsumerRunningInfo.h      |   6 +-
 src/protocol/body/LockBatchBody.cpp                | 116 --------------
 src/protocol/body/LockBatchBody.h                  |  80 ----------
 src/protocol/body/LockBatchRequestBody.hpp         |  61 +++++++
 src/protocol/body/LockBatchResponseBody.hpp        |  57 +++++++
 .../ProcessQueueInfo.hpp}                          |   6 +-
 src/protocol/body/ResetOffsetBody.cpp              |  45 ------
 .../{ResetOffsetBody.h => ResetOffsetBody.hpp}     |  27 +++-
 src/protocol/{ => body}/TopicRouteData.hpp         |  10 +-
 src/protocol/body/UnlockBatchRequestBody.hpp       |  61 +++++++
 src/protocol/header/ReplyMessageRequestHeader.cpp  | 175 ---------------------
 src/protocol/header/ReplyMessageRequestHeader.h    |  92 -----------
 src/protocol/header/ReplyMessageRequestHeader.hpp  | 132 ++++++++++++++++
 src/protocol/heartbeat/ConsumerData.hpp            |  83 ++++++++++
 src/protocol/heartbeat/HeartbeatData.hpp           |  66 ++++++++
 .../ProducerData.hpp}                              |  30 ++--
 .../heartbeat/SubscriptionData.hpp}                |  76 ++++++---
 test/src/protocol/ConsumerRunningInfoTest.cpp      |   3 -
 test/src/protocol/HeartbeatDataTest.cpp            |  51 ++----
 test/src/protocol/LockBatchBodyTest.cpp            |  18 ++-
 test/src/protocol/MessageQueueTest.cpp             |   2 +-
 test/src/protocol/TopicRouteDataTest.cpp           |   2 +-
 test/src/transport/ClientRemotingProcessorTest.cpp |   2 +-
 48 files changed, 678 insertions(+), 916 deletions(-)

diff --git a/src/ClientRemotingProcessor.cpp b/src/ClientRemotingProcessor.cpp
index 518e464..cbc8363 100644
--- a/src/ClientRemotingProcessor.cpp
+++ b/src/ClientRemotingProcessor.cpp
@@ -16,16 +16,16 @@
  */
 #include "ClientRemotingProcessor.h"
 
-#include "ConsumerRunningInfo.h"
 #include "MessageDecoder.h"
 #include "MQProtos.h"
 #include "MessageAccessor.hpp"
 #include "MessageSysFlag.h"
 #include "RequestFutureTable.h"
 #include "SocketUtil.h"
-#include "protocol/body/ResetOffsetBody.h"
+#include "protocol/body/ConsumerRunningInfo.h"
+#include "protocol/body/ResetOffsetBody.hpp"
 #include "protocol/header/CommandHeader.h"
-#include "protocol/header/ReplyMessageRequestHeader.h"
+#include "protocol/header/ReplyMessageRequestHeader.hpp"
 
 namespace rocketmq {
 
@@ -106,7 +106,7 @@ RemotingCommand* ClientRemotingProcessor::resetOffset(RemotingCommand* request)
   if (requestBody != nullptr && requestBody->size() > 0) {
     std::unique_ptr<ResetOffsetBody> body(ResetOffsetBody::Decode(*requestBody));
     if (body != nullptr) {
-      client_instance_->resetOffset(responseHeader->getGroup(), responseHeader->getTopic(), body->getOffsetTable());
+      client_instance_->resetOffset(responseHeader->getGroup(), responseHeader->getTopic(), body->offset_table());
     } else {
       LOG_ERROR("resetOffset failed as received data could not be unserialized");
     }
@@ -148,20 +148,20 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
   try {
     std::unique_ptr<MQMessageExt> msg(new MQMessageExt);
 
-    msg->set_topic(requestHeader->getTopic());
-    msg->set_queue_id(requestHeader->getQueueId());
-    msg->set_store_timestamp(requestHeader->getStoreTimestamp());
+    msg->set_topic(requestHeader->topic());
+    msg->set_queue_id(requestHeader->queue_id());
+    msg->set_store_timestamp(requestHeader->store_timestamp());
 
-    if (!requestHeader->getBornHost().empty()) {
-      msg->set_born_host(string2SocketAddress(requestHeader->getBornHost()));
+    if (!requestHeader->born_host().empty()) {
+      msg->set_born_host(string2SocketAddress(requestHeader->born_host()));
     }
 
-    if (!requestHeader->getStoreHost().empty()) {
-      msg->set_store_host(string2SocketAddress(requestHeader->getStoreHost()));
+    if (!requestHeader->store_host().empty()) {
+      msg->set_store_host(string2SocketAddress(requestHeader->store_host()));
     }
 
     auto body = request->body();
-    if ((requestHeader->getSysFlag() & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
+    if ((requestHeader->sys_flag() & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
       std::string origin_body;
       if (UtilAll::inflate(*body, origin_body)) {
         msg->set_body(std::move(origin_body));
@@ -172,12 +172,12 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
       msg->set_body(std::string(body->array(), body->size()));
     }
 
-    msg->set_flag(requestHeader->getFlag());
-    MessageAccessor::setProperties(*msg, MessageDecoder::string2messageProperties(requestHeader->getProperties()));
+    msg->set_flag(requestHeader->flag());
+    MessageAccessor::setProperties(*msg, MessageDecoder::string2messageProperties(requestHeader->properties()));
     MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME,
                                  UtilAll::to_string(receiveTime));
-    msg->set_born_timestamp(requestHeader->getBornTimestamp());
-    msg->set_reconsume_times(requestHeader->getReconsumeTimes());
+    msg->set_born_timestamp(requestHeader->born_timestamp());
+    msg->set_reconsume_times(requestHeader->reconsume_times());
     LOG_DEBUG_NEW("receive reply message:{}", msg->toString());
 
     processReplyMessage(std::move(msg));
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index cb17dfd..c3e010f 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -28,6 +28,7 @@
 #include "PullResultExt.hpp"
 #include "SendCallbackWrap.h"
 #include "TcpRemotingClient.h"
+#include "protocol/body/LockBatchResponseBody.hpp"
 
 namespace rocketmq {
 
@@ -595,7 +596,7 @@ void MQClientAPIImpl::lockBatchMQ(const std::string& addr,
       auto requestBody = response->body();
       if (requestBody != nullptr && requestBody->size() > 0) {
         std::unique_ptr<LockBatchResponseBody> body(LockBatchResponseBody::Decode(*requestBody));
-        mqs = body->getLockOKMQSet();
+        mqs = std::move(body->lock_ok_mq_set());
       } else {
         mqs.clear();
       }
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index c947692..b105ffe 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -19,7 +19,6 @@
 
 #include "CommunicationMode.h"
 #include "DefaultMQProducerImpl.h"
-#include "HeartbeatData.h"
 #include "KVTable.h"
 #include "MQException.h"
 #include "MQClientInstance.h"
@@ -30,9 +29,11 @@
 #include "TopicConfig.h"
 #include "TopicList.h"
 #include "TopicPublishInfo.hpp"
-#include "TopicRouteData.hpp"
-#include "protocol/body/LockBatchBody.h"
+#include "protocol/body/TopicRouteData.hpp"
+#include "protocol/body/LockBatchRequestBody.hpp"
+#include "protocol/body/UnlockBatchRequestBody.hpp"
 #include "protocol/header/CommandHeader.h"
+#include "protocol/heartbeat/HeartbeatData.hpp"
 
 namespace rocketmq {
 
diff --git a/src/MQClientInstance.cpp b/src/MQClientInstance.cpp
index f10f188..443b7c4 100644
--- a/src/MQClientInstance.cpp
+++ b/src/MQClientInstance.cpp
@@ -19,7 +19,7 @@
 #include <typeindex>
 
 #include "ClientRemotingProcessor.h"
-#include "ConsumerRunningInfo.h"
+#include "protocol/body/ConsumerRunningInfo.h"
 #include "Logging.h"
 #include "MQAdminImpl.h"
 #include "MQClientAPIImpl.h"
@@ -375,8 +375,8 @@ void MQClientInstance::persistAllConsumerOffset() {
 
 void MQClientInstance::sendHeartbeatToAllBroker() {
   std::unique_ptr<HeartbeatData> heartbeatData(prepareHeartbeatData());
-  bool producerEmpty = heartbeatData->isProducerDataSetEmpty();
-  bool consumerEmpty = heartbeatData->isConsumerDataSetEmpty();
+  bool producerEmpty = heartbeatData->producer_data_set().empty();
+  bool consumerEmpty = heartbeatData->consumer_data_set().empty();
   if (producerEmpty && consumerEmpty) {
     LOG_WARN_NEW("sending heartbeat, but no consumer and no producer");
     return;
@@ -481,7 +481,7 @@ HeartbeatData* MQClientInstance::prepareHeartbeatData() {
   HeartbeatData* pHeartbeatData = new HeartbeatData();
 
   // clientID
-  pHeartbeatData->setClientID(client_id_);
+  pHeartbeatData->set_client_id(client_id_);
 
   // Consumer
   insertConsumerInfoToHeartBeatData(pHeartbeatData);
@@ -492,29 +492,21 @@ HeartbeatData* MQClientInstance::prepareHeartbeatData() {
   return pHeartbeatData;
 }
 
-void MQClientInstance::insertConsumerInfoToHeartBeatData(HeartbeatData* pHeartbeatData) {
+void MQClientInstance::insertConsumerInfoToHeartBeatData(HeartbeatData* heartbeatData) {
   std::lock_guard<std::mutex> lock(consumer_table_mutex_);
   for (const auto& it : consumer_table_) {
     const auto* consumer = it.second;
-    ConsumerData consumerData;
-    consumerData.groupName = consumer->groupName();
-    consumerData.consumeType = consumer->consumeType();
-    consumerData.messageModel = consumer->messageModel();
-    consumerData.consumeFromWhere = consumer->consumeFromWhere();
-    std::vector<SubscriptionData> result = consumer->subscriptions();
-    consumerData.subscriptionDataSet.swap(result);
     // TODO: unitMode
-
-    pHeartbeatData->insertDataToConsumerDataSet(consumerData);
+    heartbeatData->consumer_data_set().emplace_back(consumer->groupName(), consumer->consumeType(),
+                                                    consumer->messageModel(), consumer->consumeFromWhere(),
+                                                    consumer->subscriptions());
   }
 }
 
-void MQClientInstance::insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData) {
+void MQClientInstance::insertProducerInfoToHeartBeatData(HeartbeatData* heartbeatData) {
   std::lock_guard<std::mutex> lock(producer_table_mutex_);
   for (const auto& it : producer_table_) {
-    ProducerData producerData;
-    producerData.groupName = it.first;
-    pHeartbeatData->insertDataToProducerDataSet(producerData);
+    heartbeatData->producer_data_set().emplace_back(it.first);
   }
 }
 
diff --git a/src/MQClientInstance.h b/src/MQClientInstance.h
index dc583ef..5a589aa 100644
--- a/src/MQClientInstance.h
+++ b/src/MQClientInstance.h
@@ -22,9 +22,8 @@
 #include <set>
 #include <utility>
 
-#include "ConsumerRunningInfo.h"
+#include "protocol/body/ConsumerRunningInfo.h"
 #include "FindBrokerResult.hpp"
-#include "HeartbeatData.h"
 #include "MQClientConfig.h"
 #include "MQException.h"
 #include "MQConsumerInner.h"
@@ -32,7 +31,8 @@
 #include "MQProducerInner.h"
 #include "ServiceState.h"
 #include "TopicPublishInfo.hpp"
-#include "TopicRouteData.hpp"
+#include "protocol/body/TopicRouteData.hpp"
+#include "protocol/heartbeat/HeartbeatData.hpp"
 #include "concurrent/executor.hpp"
 
 namespace rocketmq {
diff --git a/src/common/FilterAPI.hpp b/src/common/FilterAPI.hpp
index aa40eb5..9bb18bb 100644
--- a/src/common/FilterAPI.hpp
+++ b/src/common/FilterAPI.hpp
@@ -20,30 +20,30 @@
 #include <string>  // std::string
 
 #include "MQException.h"
-#include "SubscriptionData.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
 #include "UtilAll.h"
 
 namespace rocketmq {
 
 class FilterAPI {
  public:
-  static SubscriptionData* buildSubscriptionData(const std::string& topic, const std::string& subString) {
+  static SubscriptionData* buildSubscriptionData(const std::string& topic, const std::string& sub_string) {
     // delete in Rebalance
-    std::unique_ptr<SubscriptionData> subscriptionData(new SubscriptionData(topic, subString));
+    std::unique_ptr<SubscriptionData> subscription_data(new SubscriptionData(topic, sub_string));
 
-    if (subString.empty() || SUB_ALL == subString) {
-      subscriptionData->set_sub_string(SUB_ALL);
+    if (sub_string.empty() || SUB_ALL == sub_string) {
+      subscription_data->set_sub_string(SUB_ALL);
     } else {
       std::vector<std::string> tags;
-      UtilAll::Split(tags, subString, "||");
+      UtilAll::Split(tags, sub_string, "||");
 
       if (!tags.empty()) {
         for (auto tag : tags) {
           if (!tag.empty()) {
             UtilAll::Trim(tag);
             if (!tag.empty()) {
-              subscriptionData->put_tag(tag);
-              subscriptionData->put_code(UtilAll::hash_code(tag));
+              subscription_data->code_set().push_back(UtilAll::hash_code(tag));
+              subscription_data->tags_set().push_back(std::move(tag));
             }
           }
         }
@@ -52,7 +52,7 @@ class FilterAPI {
       }
     }
 
-    return subscriptionData.release();
+    return subscription_data.release();
   }
 };
 
diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp
index f326d95..658695f 100644
--- a/src/common/UtilAll.cpp
+++ b/src/common/UtilAll.cpp
@@ -18,6 +18,7 @@
 
 #include <chrono>
 #include <iostream>
+#include <thread>
 
 #ifndef WIN32
 #include <netdb.h>     // gethostbyname
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index de90889..740e18b 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -22,7 +22,7 @@
 
 #include "CommunicationMode.h"
 #include "ConsumeMsgService.h"
-#include "ConsumerRunningInfo.h"
+#include "protocol/body/ConsumerRunningInfo.h"
 #include "FilterAPI.hpp"
 #include "Logging.h"
 #include "MQAdminImpl.h"
@@ -579,7 +579,7 @@ int DefaultMQPushConsumerImpl::getMaxReconsumeTimes() {
   }
 }
 
-std::string DefaultMQPushConsumerImpl::groupName() const {
+const std::string& DefaultMQPushConsumerImpl::groupName() const {
   return client_config_->group_name();
 }
 
diff --git a/src/consumer/DefaultMQPushConsumerImpl.h b/src/consumer/DefaultMQPushConsumerImpl.h
index e6bee69..1e3fdc7 100755
--- a/src/consumer/DefaultMQPushConsumerImpl.h
+++ b/src/consumer/DefaultMQPushConsumerImpl.h
@@ -85,7 +85,7 @@ class DefaultMQPushConsumerImpl : public std::enable_shared_from_this<DefaultMQP
   void resume() override;
 
  public:  // MQConsumerInner
-  std::string groupName() const override;
+  const std::string& groupName() const override;
   MessageModel messageModel() const override;
   ConsumeType consumeType() const override;
   ConsumeFromWhere consumeFromWhere() const override;
diff --git a/src/consumer/LocalFileOffsetStore.cpp b/src/consumer/LocalFileOffsetStore.cpp
index b2da588..ac66ece 100644
--- a/src/consumer/LocalFileOffsetStore.cpp
+++ b/src/consumer/LocalFileOffsetStore.cpp
@@ -20,7 +20,7 @@
 
 #include "Logging.h"
 #include "MQClientInstance.h"
-#include "MessageQueue.h"
+#include "MessageQueue.hpp"
 #include "UtilAll.h"
 
 namespace rocketmq {
diff --git a/src/consumer/MQConsumerInner.h b/src/consumer/MQConsumerInner.h
index 31a126b..b851316 100644
--- a/src/consumer/MQConsumerInner.h
+++ b/src/consumer/MQConsumerInner.h
@@ -21,7 +21,7 @@
 #include <vector>
 
 #include "ConsumeType.h"
-#include "SubscriptionData.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
 
 namespace rocketmq {
 
@@ -32,7 +32,7 @@ class MQConsumerInner {
   virtual ~MQConsumerInner() = default;
 
  public:  // MQConsumerInner in Java Client
-  virtual std::string groupName() const = 0;
+  virtual const std::string& groupName() const = 0;
   virtual MessageModel messageModel() const = 0;
   virtual ConsumeType consumeType() const = 0;
   virtual ConsumeFromWhere consumeFromWhere() const = 0;
diff --git a/src/consumer/ProcessQueue.cpp b/src/consumer/ProcessQueue.cpp
index bc6f29d..e2d3a39 100644
--- a/src/consumer/ProcessQueue.cpp
+++ b/src/consumer/ProcessQueue.cpp
@@ -17,7 +17,7 @@
 #include "ProcessQueue.h"
 
 #include "Logging.h"
-#include "ProcessQueueInfo.h"
+#include "protocol/body/ProcessQueueInfo.hpp"
 #include "UtilAll.h"
 
 static const uint64_t PULL_MAX_IDLE_TIME = 120000;  // ms
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index 54a5512..e671118 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -70,7 +70,7 @@ PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
       msgListFilterAgain.reserve(msgList.size());
       for (const auto& msg : msgList) {
         const auto& msgTag = msg->tags();
-        if (subscriptionData->contain_tag(msgTag)) {
+        if (subscriptionData->containsTag(msgTag)) {
           msgListFilterAgain.push_back(msg);
         }
       }
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 02055d0..7edc227 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -23,7 +23,7 @@
 #include "MQClientInstance.h"
 #include "MQMessageQueue.h"
 #include "PullCallback.h"
-#include "SubscriptionData.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
 
 namespace rocketmq {
 
diff --git a/src/consumer/RebalanceImpl.cpp b/src/consumer/RebalanceImpl.cpp
index 9459d5e..a3988cd 100644
--- a/src/consumer/RebalanceImpl.cpp
+++ b/src/consumer/RebalanceImpl.cpp
@@ -18,7 +18,6 @@
 
 #include "MQClientAPIImpl.h"
 #include "MQClientInstance.h"
-#include "protocol/body/LockBatchBody.h"
 
 namespace rocketmq {
 
@@ -42,9 +41,9 @@ void RebalanceImpl::unlock(MQMessageQueue mq, const bool oneway) {
       client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), MASTER_ID, true));
   if (findBrokerResult) {
     std::unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
-    unlockBatchRequest->setConsumerGroup(consumer_group_);
-    unlockBatchRequest->setClientId(client_instance_->getClientId());
-    unlockBatchRequest->getMqSet().push_back(mq);
+    unlockBatchRequest->set_consumer_group(consumer_group_);
+    unlockBatchRequest->set_client_id(client_instance_->getClientId());
+    unlockBatchRequest->mq_set().push_back(mq);
 
     try {
       client_instance_->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult->broker_addr(), unlockBatchRequest.get(),
@@ -81,9 +80,9 @@ void RebalanceImpl::unlockAll(const bool oneway) {
         client_instance_->findBrokerAddressInSubscribe(brokerName, MASTER_ID, true));
     if (findBrokerResult) {
       std::unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
-      unlockBatchRequest->setConsumerGroup(consumer_group_);
-      unlockBatchRequest->setClientId(client_instance_->getClientId());
-      unlockBatchRequest->setMqSet(mqs);
+      unlockBatchRequest->set_consumer_group(consumer_group_);
+      unlockBatchRequest->set_client_id(client_instance_->getClientId());
+      unlockBatchRequest->set_mq_set(mqs);
 
       try {
         client_instance_->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult->broker_addr(), unlockBatchRequest.get(),
@@ -125,9 +124,9 @@ bool RebalanceImpl::lock(MQMessageQueue mq) {
       client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), MASTER_ID, true));
   if (findBrokerResult) {
     std::unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
-    lockBatchRequest->setConsumerGroup(consumer_group_);
-    lockBatchRequest->setClientId(client_instance_->getClientId());
-    lockBatchRequest->getMqSet().push_back(mq);
+    lockBatchRequest->set_consumer_group(consumer_group_);
+    lockBatchRequest->set_client_id(client_instance_->getClientId());
+    lockBatchRequest->mq_set().push_back(mq);
 
     try {
       LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
@@ -182,9 +181,9 @@ void RebalanceImpl::lockAll() {
         client_instance_->findBrokerAddressInSubscribe(brokerName, MASTER_ID, true));
     if (findBrokerResult) {
       std::unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
-      lockBatchRequest->setConsumerGroup(consumer_group_);
-      lockBatchRequest->setClientId(client_instance_->getClientId());
-      lockBatchRequest->setMqSet(mqs);
+      lockBatchRequest->set_consumer_group(consumer_group_);
+      lockBatchRequest->set_client_id(client_instance_->getClientId());
+      lockBatchRequest->set_mq_set(mqs);
 
       LOG_INFO("try to lock:" SIZET_FMT " mqs of broker:%s", mqs.size(), brokerName.c_str());
       try {
diff --git a/src/consumer/RebalanceImpl.h b/src/consumer/RebalanceImpl.h
index 8648aa2..0ef1a9a 100755
--- a/src/consumer/RebalanceImpl.h
+++ b/src/consumer/RebalanceImpl.h
@@ -26,7 +26,7 @@
 #include "MQMessageQueue.h"
 #include "ProcessQueue.h"
 #include "PullRequest.h"
-#include "SubscriptionData.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
 
 namespace rocketmq {
 
diff --git a/src/consumer/RemoteBrokerOffsetStore.cpp b/src/consumer/RemoteBrokerOffsetStore.cpp
index 8754d8b..537cf7f 100644
--- a/src/consumer/RemoteBrokerOffsetStore.cpp
+++ b/src/consumer/RemoteBrokerOffsetStore.cpp
@@ -19,7 +19,7 @@
 #include "Logging.h"
 #include "MQClientAPIImpl.h"
 #include "MQClientInstance.h"
-#include "MessageQueue.h"
+#include "MessageQueue.hpp"
 #include "UtilAll.h"
 
 namespace rocketmq {
diff --git a/src/consumer/SubscriptionData.cpp b/src/consumer/SubscriptionData.cpp
deleted file mode 100644
index ad1f21f..0000000
--- a/src/consumer/SubscriptionData.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "SubscriptionData.h"
-
-#include <algorithm>
-#include <sstream>
-#include <vector>
-
-#include "Logging.h"
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-SubscriptionData::SubscriptionData() {
-  sub_version_ = UtilAll::currentTimeMillis();
-}
-
-SubscriptionData::SubscriptionData(const std::string& topic, const std::string& subString)
-    : topic_(topic), sub_string_(subString) {
-  sub_version_ = UtilAll::currentTimeMillis();
-}
-
-SubscriptionData::SubscriptionData(const SubscriptionData& other) {
-  sub_string_ = other.sub_string_;
-  sub_version_ = other.sub_version_;
-  tag_set_ = other.tag_set_;
-  topic_ = other.topic_;
-  code_set_ = other.code_set_;
-}
-
-bool SubscriptionData::operator==(const SubscriptionData& other) const {
-  if (topic_ != other.topic_) {
-    return false;
-  }
-  if (sub_string_ != other.sub_string_) {
-    return false;
-  }
-  if (sub_version_ != other.sub_version_) {
-    return false;
-  }
-  if (tag_set_.size() != other.tag_set_.size()) {
-    return false;
-  }
-  return true;
-}
-
-bool SubscriptionData::operator<(const SubscriptionData& other) const {
-  int ret = topic_.compare(other.topic_);
-  if (ret == 0) {
-    return sub_string_.compare(other.sub_string_) < 0;
-  } else {
-    return ret < 0;
-  }
-}
-
-Json::Value SubscriptionData::toJson() const {
-  Json::Value outJson;
-  outJson["topic"] = topic_;
-  outJson["subString"] = sub_string_;
-  outJson["subVersion"] = UtilAll::to_string(sub_version_);
-
-  for (const auto& tag : tag_set_) {
-    outJson["tagsSet"].append(tag);
-  }
-
-  for (const auto& code : code_set_) {
-    outJson["codeSet"].append(code);
-  }
-
-  return outJson;
-}
-
-}  // namespace rocketmq
diff --git a/src/log/Logging.h b/src/log/Logging.h
index 6c361e5..5afcc56 100644
--- a/src/log/Logging.h
+++ b/src/log/Logging.h
@@ -23,7 +23,11 @@
 
 // clang-format off
 #include <spdlog/spdlog.h>
+#if !defined(SPDLOG_FMT_EXTERNAL)
 #include <spdlog/fmt/bundled/printf.h>
+#else
+#include <fmt/printf.h>
+#endif
 // clang-format on
 
 namespace rocketmq {
diff --git a/src/producer/TopicPublishInfo.hpp b/src/producer/TopicPublishInfo.hpp
index 341e066..94c5375 100644
--- a/src/producer/TopicPublishInfo.hpp
+++ b/src/producer/TopicPublishInfo.hpp
@@ -23,7 +23,7 @@
 
 #include "MQException.h"
 #include "MQMessageQueue.h"
-#include "TopicRouteData.hpp"
+#include "protocol/body/TopicRouteData.hpp"
 
 namespace rocketmq {
 
diff --git a/src/protocol/HeartbeatData.h b/src/protocol/HeartbeatData.h
deleted file mode 100644
index 1453b63..0000000
--- a/src/protocol/HeartbeatData.h
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __HEARTBEAT_DATA_H__
-#define __HEARTBEAT_DATA_H__
-
-#include <string>
-#include <vector>
-
-#include "ConsumeType.h"
-#include "RemotingSerializable.h"
-#include "SubscriptionData.h"
-
-namespace rocketmq {
-
-class ProducerData {
- public:
-  bool operator<(const ProducerData& pd) const { return groupName < pd.groupName; }
-
-  Json::Value toJson() const {
-    Json::Value outJson;
-    outJson["groupName"] = groupName;
-    return outJson;
-  }
-
- public:
-  std::string groupName;
-};
-
-class ConsumerData {
- public:
-  bool operator<(const ConsumerData& cd) const { return groupName < cd.groupName; }
-
-  Json::Value toJson() const {
-    Json::Value outJson;
-    outJson["groupName"] = groupName;
-    outJson["consumeFromWhere"] = consumeFromWhere;
-    outJson["consumeType"] = consumeType;
-    outJson["messageModel"] = messageModel;
-
-    for (const auto& sd : subscriptionDataSet) {
-      outJson["subscriptionDataSet"].append(sd.toJson());
-    }
-
-    return outJson;
-  }
-
- public:
-  std::string groupName;
-  ConsumeType consumeType;
-  MessageModel messageModel;
-  ConsumeFromWhere consumeFromWhere;
-  std::vector<SubscriptionData> subscriptionDataSet;
-};
-
-class HeartbeatData : public RemotingSerializable {
- public:
-  std::string encode() {
-    Json::Value root;
-
-    // id
-    root["clientID"] = m_clientID;
-
-    // consumer
-    for (const auto& cd : m_consumerDataSet) {
-      root["consumerDataSet"].append(cd.toJson());
-    }
-
-    // producer
-    for (const auto& pd : m_producerDataSet) {
-      root["producerDataSet"].append(pd.toJson());
-    }
-
-    // output
-    return RemotingSerializable::toJson(root);
-  }
-
-  void setClientID(const std::string& clientID) { m_clientID = clientID; }
-
-  bool isProducerDataSetEmpty() { return m_producerDataSet.empty(); }
-
-  void insertDataToProducerDataSet(ProducerData& producerData) { m_producerDataSet.push_back(producerData); }
-
-  bool isConsumerDataSetEmpty() { return m_consumerDataSet.empty(); }
-
-  void insertDataToConsumerDataSet(ConsumerData& consumerData) { m_consumerDataSet.push_back(consumerData); }
-
- private:
-  std::string m_clientID;
-  std::vector<ProducerData> m_producerDataSet;
-  std::vector<ConsumerData> m_consumerDataSet;
-};
-
-}  // namespace rocketmq
-
-#endif  // __HEARTBEAT_DATA_H__
diff --git a/src/protocol/MessageQueue.cpp b/src/protocol/MessageQueue.cpp
deleted file mode 100644
index ab9af65..0000000
--- a/src/protocol/MessageQueue.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "MessageQueue.h"
-
-namespace rocketmq {
-
-Json::Value toJson(const MQMessageQueue& mq) {
-  Json::Value outJson;
-  outJson["topic"] = mq.topic();
-  outJson["brokerName"] = mq.broker_name();
-  outJson["queueId"] = mq.queue_id();
-  return outJson;
-}
-
-}  // namespace rocketmq
diff --git a/src/protocol/MessageQueue.h b/src/protocol/MessageQueue.hpp
similarity index 73%
rename from src/protocol/MessageQueue.h
rename to src/protocol/MessageQueue.hpp
index 50ce546..9f6af19 100644
--- a/src/protocol/MessageQueue.h
+++ b/src/protocol/MessageQueue.hpp
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __MESSAGE_QUEUE_H__
-#define __MESSAGE_QUEUE_H__
+#ifndef ROCKETMQ_PROTOCOL_MESSAGEQUEUE_H_
+#define ROCKETMQ_PROTOCOL_MESSAGEQUEUE_H_
 
 #include <json/json.h>
 
@@ -23,8 +23,14 @@
 
 namespace rocketmq {
 
-Json::Value toJson(const MQMessageQueue& mq);
+inline Json::Value toJson(const MQMessageQueue& mq) {
+  Json::Value root;
+  root["topic"] = mq.topic();
+  root["brokerName"] = mq.broker_name();
+  root["queueId"] = mq.queue_id();
+  return root;
+}
 
 }  // namespace rocketmq
 
-#endif  // __MESSAGE_QUEUE_H__
+#endif  // ROCKETMQ_PROTOCOL_MESSAGEQUEUE_H_
diff --git a/src/protocol/TopicList.h b/src/protocol/TopicList.h
index 450e6b2..bbcb7cc 100644
--- a/src/protocol/TopicList.h
+++ b/src/protocol/TopicList.h
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __TOPIC_LIST_H__
-#define __TOPIC_LIST_H__
+#ifndef ROCKETMQ_PROTOCOL_TOPICLIST_H_
+#define ROCKETMQ_PROTOCOL_TOPICLIST_H_
 
 #include <string>  // std::string
 #include <vector>  // std::vector
@@ -34,4 +34,4 @@ class TopicList {
 
 }  // namespace rocketmq
 
-#endif  // __TOPIC_LIST_H__
+#endif  // ROCKETMQ_PROTOCOL_TOPICLIST_H_
diff --git a/src/protocol/ConsumerRunningInfo.cpp b/src/protocol/body/ConsumerRunningInfo.cpp
similarity index 100%
rename from src/protocol/ConsumerRunningInfo.cpp
rename to src/protocol/body/ConsumerRunningInfo.cpp
diff --git a/src/protocol/ConsumerRunningInfo.h b/src/protocol/body/ConsumerRunningInfo.h
similarity index 96%
rename from src/protocol/ConsumerRunningInfo.h
rename to src/protocol/body/ConsumerRunningInfo.h
index 63d4c5d..5bfb886 100644
--- a/src/protocol/ConsumerRunningInfo.h
+++ b/src/protocol/body/ConsumerRunningInfo.h
@@ -17,9 +17,9 @@
 #ifndef ROCKETMQ_PROTOCOL_CONSUMERRUNNINGINFO_H_
 #define ROCKETMQ_PROTOCOL_CONSUMERRUNNINGINFO_H_
 
-#include "MessageQueue.h"
-#include "ProcessQueueInfo.h"
-#include "SubscriptionData.h"
+#include "MessageQueue.hpp"
+#include "ProcessQueueInfo.hpp"
+#include "protocol/heartbeat/SubscriptionData.hpp"
 
 namespace rocketmq {
 
diff --git a/src/protocol/body/LockBatchBody.cpp b/src/protocol/body/LockBatchBody.cpp
deleted file mode 100644
index fde8dcb..0000000
--- a/src/protocol/body/LockBatchBody.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "LockBatchBody.h"
-
-#include "Logging.h"
-#include "MessageQueue.h"
-
-namespace rocketmq {
-
-std::string LockBatchRequestBody::getConsumerGroup() {
-  return m_consumerGroup;
-}
-
-void LockBatchRequestBody::setConsumerGroup(std::string consumerGroup) {
-  m_consumerGroup = consumerGroup;
-}
-
-std::string LockBatchRequestBody::getClientId() {
-  return m_clientId;
-}
-
-void LockBatchRequestBody::setClientId(std::string clientId) {
-  m_clientId = clientId;
-}
-
-std::vector<MQMessageQueue>& LockBatchRequestBody::getMqSet() {
-  return m_mqSet;
-}
-
-void LockBatchRequestBody::setMqSet(std::vector<MQMessageQueue> mqSet) {
-  m_mqSet.swap(mqSet);
-}
-
-std::string LockBatchRequestBody::encode() {
-  Json::Value root;
-  root["consumerGroup"] = m_consumerGroup;
-  root["clientId"] = m_clientId;
-
-  for (const auto& mq : m_mqSet) {
-    root["mqSet"].append(rocketmq::toJson(mq));
-  }
-
-  return RemotingSerializable::toJson(root);
-}
-
-const std::vector<MQMessageQueue>& LockBatchResponseBody::getLockOKMQSet() {
-  return m_lockOKMQSet;
-}
-
-void LockBatchResponseBody::setLockOKMQSet(std::vector<MQMessageQueue> lockOKMQSet) {
-  m_lockOKMQSet.swap(lockOKMQSet);
-}
-
-LockBatchResponseBody* LockBatchResponseBody::Decode(const ByteArray& bodyData) {
-  Json::Value root = RemotingSerializable::fromJson(bodyData);
-  auto& mqs = root["lockOKMQSet"];
-  std::unique_ptr<LockBatchResponseBody> body(new LockBatchResponseBody());
-  for (const auto& qd : mqs) {
-    MQMessageQueue mq(qd["topic"].asString(), qd["brokerName"].asString(), qd["queueId"].asInt());
-    LOG_INFO("LockBatchResponseBody MQ:%s", mq.toString().c_str());
-    body->m_lockOKMQSet.push_back(std::move(mq));
-  }
-  return body.release();
-}
-
-std::string UnlockBatchRequestBody::getConsumerGroup() {
-  return m_consumerGroup;
-}
-
-void UnlockBatchRequestBody::setConsumerGroup(std::string consumerGroup) {
-  m_consumerGroup = consumerGroup;
-}
-
-std::string UnlockBatchRequestBody::getClientId() {
-  return m_clientId;
-}
-
-void UnlockBatchRequestBody::setClientId(std::string clientId) {
-  m_clientId = clientId;
-}
-
-std::vector<MQMessageQueue>& UnlockBatchRequestBody::getMqSet() {
-  return m_mqSet;
-}
-
-void UnlockBatchRequestBody::setMqSet(std::vector<MQMessageQueue> mqSet) {
-  m_mqSet.swap(mqSet);
-}
-
-std::string UnlockBatchRequestBody::encode() {
-  Json::Value root;
-  root["consumerGroup"] = m_consumerGroup;
-  root["clientId"] = m_clientId;
-
-  for (const auto& mq : m_mqSet) {
-    root["mqSet"].append(rocketmq::toJson(mq));
-  }
-
-  return RemotingSerializable::toJson(root);
-}
-
-}  // namespace rocketmq
diff --git a/src/protocol/body/LockBatchBody.h b/src/protocol/body/LockBatchBody.h
deleted file mode 100644
index ce38974..0000000
--- a/src/protocol/body/LockBatchBody.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __LOCK_BATCH_BODY_H__
-#define __LOCK_BATCH_BODY_H__
-
-#include <set>
-#include <string>
-
-#include "MQMessageQueue.h"
-#include "RemotingSerializable.h"
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-class LockBatchRequestBody : public RemotingSerializable {
- public:
-  std::string getConsumerGroup();
-  void setConsumerGroup(std::string consumerGroup);
-
-  std::string getClientId();
-  void setClientId(std::string clientId);
-
-  std::vector<MQMessageQueue>& getMqSet();
-  void setMqSet(std::vector<MQMessageQueue> mqSet);
-
-  std::string encode() override;
-
- private:
-  std::string m_consumerGroup;
-  std::string m_clientId;
-  std::vector<MQMessageQueue> m_mqSet;
-};
-
-class LockBatchResponseBody {
- public:
-  static LockBatchResponseBody* Decode(const ByteArray& bodyData);
-
-  const std::vector<MQMessageQueue>& getLockOKMQSet();
-  void setLockOKMQSet(std::vector<MQMessageQueue> lockOKMQSet);
-
- private:
-  std::vector<MQMessageQueue> m_lockOKMQSet;
-};
-
-class UnlockBatchRequestBody : public RemotingSerializable {
- public:
-  std::string getConsumerGroup();
-  void setConsumerGroup(std::string consumerGroup);
-
-  std::string getClientId();
-  void setClientId(std::string clientId);
-
-  std::vector<MQMessageQueue>& getMqSet();
-  void setMqSet(std::vector<MQMessageQueue> mqSet);
-
-  std::string encode() override;
-
- private:
-  std::string m_consumerGroup;
-  std::string m_clientId;
-  std::vector<MQMessageQueue> m_mqSet;
-};
-
-}  // namespace rocketmq
-
-#endif  // __LOCK_BATCH_BODY_H__
diff --git a/src/protocol/body/LockBatchRequestBody.hpp b/src/protocol/body/LockBatchRequestBody.hpp
new file mode 100644
index 0000000..6b75be4
--- /dev/null
+++ b/src/protocol/body/LockBatchRequestBody.hpp
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_BODY_LOCKBATCHREQUESTBODY_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_LOCKBATCHREQUESTBODY_HPP_
+
+#include <algorithm>  // std::move
+#include <vector>     // std::vector
+
+#include "MessageQueue.hpp"
+#include "RemotingSerializable.h"
+
+namespace rocketmq {
+
+class LockBatchRequestBody : public RemotingSerializable {
+ public:
+  std::string encode() override {
+    Json::Value root;
+    root["consumerGroup"] = consumer_group_;
+    root["clientId"] = client_id_;
+
+    for (const auto& mq : mq_set_) {
+      root["mqSet"].append(rocketmq::toJson(mq));
+    }
+
+    return RemotingSerializable::toJson(root);
+  }
+
+ public:
+  inline const std::string& consumer_group() { return consumer_group_; }
+  inline void set_consumer_group(const std::string& consumerGroup) { consumer_group_ = consumerGroup; }
+
+  inline const std::string& client_id() { return client_id_; }
+  inline void set_client_id(const std::string& clientId) { client_id_ = clientId; }
+
+  inline std::vector<MQMessageQueue>& mq_set() { return mq_set_; }
+  inline void set_mq_set(const std::vector<MQMessageQueue>& mq_set) { mq_set_ = mq_set; }
+  inline void set_mq_set(std::vector<MQMessageQueue>&& mq_set) { mq_set_ = std::move(mq_set); }
+
+ private:
+  std::string consumer_group_;
+  std::string client_id_;
+  std::vector<MQMessageQueue> mq_set_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_PROTOCOL_BODY_LOCKBATCHREQUESTBODY_HPP_
diff --git a/src/protocol/body/LockBatchResponseBody.hpp b/src/protocol/body/LockBatchResponseBody.hpp
new file mode 100644
index 0000000..af16bf2
--- /dev/null
+++ b/src/protocol/body/LockBatchResponseBody.hpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_BODY_LOCKBATCHRESPONSEBODY_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_LOCKBATCHRESPONSEBODY_HPP_
+
+#include <algorithm>  // std::move
+#include <vector>     // std::vector
+
+#include "Logging.h"
+#include "MQMessageQueue.h"
+#include "RemotingSerializable.h"
+
+namespace rocketmq {
+
+class LockBatchResponseBody {
+ public:
+  static LockBatchResponseBody* Decode(const ByteArray& bodyData) {
+    Json::Value root = RemotingSerializable::fromJson(bodyData);
+    auto& mqs = root["lockOKMQSet"];
+    std::unique_ptr<LockBatchResponseBody> body(new LockBatchResponseBody());
+    for (const auto& qd : mqs) {
+      MQMessageQueue mq(qd["topic"].asString(), qd["brokerName"].asString(), qd["queueId"].asInt());
+      LOG_INFO_NEW("LockBatchResponseBody MQ:{}", mq.toString());
+      body->lock_ok_mq_set().push_back(std::move(mq));
+    }
+    return body.release();
+  }
+
+ public:
+  inline const std::vector<MQMessageQueue>& lock_ok_mq_set() const { return lock_ok_mq_set_; }
+  inline std::vector<MQMessageQueue>& lock_ok_mq_set() { return lock_ok_mq_set_; }
+  inline void set_lock_ok_mq_set(const std::vector<MQMessageQueue>& lockOKMQSet) { lock_ok_mq_set_ = lockOKMQSet; }
+  inline void set_lock_ok_mq_set(std::vector<MQMessageQueue>&& lockOKMQSet) {
+    lock_ok_mq_set_ = std::move(lockOKMQSet);
+  }
+
+ private:
+  std::vector<MQMessageQueue> lock_ok_mq_set_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_PROTOCOL_BODY_LOCKBATCHRESPONSEBODY_HPP_
diff --git a/src/protocol/ProcessQueueInfo.h b/src/protocol/body/ProcessQueueInfo.hpp
similarity index 94%
rename from src/protocol/ProcessQueueInfo.h
rename to src/protocol/body/ProcessQueueInfo.hpp
index 3d2ea6d..64c66c2 100644
--- a/src/protocol/ProcessQueueInfo.h
+++ b/src/protocol/body/ProcessQueueInfo.hpp
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __PROCESS_QUEUE_INFO_H__
-#define __PROCESS_QUEUE_INFO_H__
+#ifndef ROCKETMQ_PROTOCOL_BODY_PROCESS_QUEUE_INFO_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_PROCESS_QUEUE_INFO_HPP_
 
 #include <json/json.h>
 
@@ -92,4 +92,4 @@ class ProcessQueueInfo {
 
 }  // namespace rocketmq
 
-#endif  // __PROCESS_QUEUE_INFO_H__
+#endif  // ROCKETMQ_PROTOCOL_BODY_PROCESS_QUEUE_INFO_HPP_
diff --git a/src/protocol/body/ResetOffsetBody.cpp b/src/protocol/body/ResetOffsetBody.cpp
deleted file mode 100644
index 497f591..0000000
--- a/src/protocol/body/ResetOffsetBody.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ResetOffsetBody.h"
-
-#include "RemotingSerializable.h"
-
-namespace rocketmq {
-
-ResetOffsetBody* ResetOffsetBody::Decode(const ByteArray& bodyData) {
-  // FIXME: object as key
-  Json::Value root = RemotingSerializable::fromJson(bodyData);
-  Json::Value qds = root["offsetTable"];
-  std::unique_ptr<ResetOffsetBody> body(new ResetOffsetBody());
-  for (unsigned int i = 0; i < qds.size(); i++) {
-    Json::Value qd = qds[i];
-    MQMessageQueue mq(qd["brokerName"].asString(), qd["topic"].asString(), qd["queueId"].asInt());
-    int64_t offset = qd["offset"].asInt64();
-    body->setOffsetTable(mq, offset);
-  }
-  return body.release();
-}
-
-std::map<MQMessageQueue, int64_t> ResetOffsetBody::getOffsetTable() {
-  return offset_table_;
-}
-
-void ResetOffsetBody::setOffsetTable(const MQMessageQueue& mq, int64_t offset) {
-  offset_table_[mq] = offset;
-}
-
-}  // namespace rocketmq
diff --git a/src/protocol/body/ResetOffsetBody.h b/src/protocol/body/ResetOffsetBody.hpp
similarity index 51%
copy from src/protocol/body/ResetOffsetBody.h
copy to src/protocol/body/ResetOffsetBody.hpp
index f1024b9..157e567 100644
--- a/src/protocol/body/ResetOffsetBody.h
+++ b/src/protocol/body/ResetOffsetBody.hpp
@@ -14,22 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __RESET_OFFSET_BODY__
-#define __RESET_OFFSET_BODY__
+#ifndef ROCKETMQ_PROTOCOL_BODY_RESETOFFSETBODY_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_RESETOFFSETBODY_HPP_
 
 #include <map>  // std::map
 
-#include "ByteArray.h"
 #include "MQMessageQueue.h"
+#include "RemotingSerializable.h"
 
 namespace rocketmq {
 
 class ResetOffsetBody {
  public:
-  static ResetOffsetBody* Decode(const ByteArray& bodyData);
+  static ResetOffsetBody* Decode(const ByteArray& bodyData) {
+    // FIXME: object as key
+    Json::Value root = RemotingSerializable::fromJson(bodyData);
+    auto& qds = root["offsetTable"];
+    std::unique_ptr<ResetOffsetBody> body(new ResetOffsetBody());
+    Json::Value::Members members = qds.getMemberNames();
+    for (const auto& member : members) {
+      Json::Value key = RemotingSerializable::fromJson(member);
+      MQMessageQueue mq(key["topic"].asString(), key["brokerName"].asString(), key["queueId"].asInt());
+      int64_t offset = qds[member].asInt64();
+      body->offset_table_.emplace(std::move(mq), offset);
+    }
+    return body.release();
+  }
 
-  std::map<MQMessageQueue, int64_t> getOffsetTable();
-  void setOffsetTable(const MQMessageQueue& mq, int64_t offset);
+ public:
+  std::map<MQMessageQueue, int64_t>& offset_table() { return offset_table_; }
 
  private:
   std::map<MQMessageQueue, int64_t> offset_table_;
@@ -37,4 +50,4 @@ class ResetOffsetBody {
 
 }  // namespace rocketmq
 
-#endif  // __RESET_OFFSET_BODY__
+#endif  // ROCKETMQ_PROTOCOL_BODY_RESETOFFSETBODY_HPP_
diff --git a/src/protocol/TopicRouteData.hpp b/src/protocol/body/TopicRouteData.hpp
similarity index 96%
rename from src/protocol/TopicRouteData.hpp
rename to src/protocol/body/TopicRouteData.hpp
index 6d4b64c..38e1889 100644
--- a/src/protocol/TopicRouteData.hpp
+++ b/src/protocol/body/TopicRouteData.hpp
@@ -123,12 +123,12 @@ class TopicRouteData {
     for (auto bd : bds) {
       std::string broker_name = bd["brokerName"].asString();
       LOG_DEBUG_NEW("brokerName:{}", broker_name);
+      auto& bas = bd["brokerAddrs"];
+      Json::Value::Members members = bas.getMemberNames();
       std::map<int, std::string> broker_addrs;
-      Json::Value bas = bd["brokerAddrs"];
-      Json::Value::Members mbs = bas.getMemberNames();
-      for (const auto& key : mbs) {
-        int id = std::stoi(key);
-        std::string addr = bas[key].asString();
+      for (const auto& member : members) {
+        int id = std::stoi(member);
+        std::string addr = bas[member].asString();
         broker_addrs.emplace(id, std::move(addr));
         LOG_DEBUG_NEW("brokerId:{}, brokerAddr:{}", id, addr);
       }
diff --git a/src/protocol/body/UnlockBatchRequestBody.hpp b/src/protocol/body/UnlockBatchRequestBody.hpp
new file mode 100644
index 0000000..ffee533
--- /dev/null
+++ b/src/protocol/body/UnlockBatchRequestBody.hpp
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_BODY_UNLOCKBATCHREQUESTBODY_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_UNLOCKBATCHREQUESTBODY_HPP_
+
+#include <algorithm>  // std::move
+#include <vector>     // std::vector
+
+#include "MessageQueue.hpp"
+#include "RemotingSerializable.h"
+
+namespace rocketmq {
+
+class UnlockBatchRequestBody : public RemotingSerializable {
+ public:
+  std::string encode() override {
+    Json::Value root;
+    root["consumerGroup"] = consumer_group_;
+    root["clientId"] = client_id_;
+
+    for (const auto& mq : mq_set_) {
+      root["mqSet"].append(rocketmq::toJson(mq));
+    }
+
+    return RemotingSerializable::toJson(root);
+  }
+
+ public:
+  inline const std::string& consumer_group() { return consumer_group_; }
+  inline void set_consumer_group(const std::string& consumerGroup) { consumer_group_ = consumerGroup; }
+
+  inline const std::string& client_id() { return client_id_; }
+  inline void set_client_id(const std::string& clientId) { client_id_ = clientId; }
+
+  inline std::vector<MQMessageQueue>& mq_set() { return mq_set_; }
+  inline void set_mq_set(const std::vector<MQMessageQueue>& mq_set) { mq_set_ = mq_set; }
+  inline void set_mq_set(std::vector<MQMessageQueue>&& mq_set) { mq_set_ = std::move(mq_set); }
+
+ private:
+  std::string consumer_group_;
+  std::string client_id_;
+  std::vector<MQMessageQueue> mq_set_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_PROTOCOL_BODY_UNLOCKBATCHREQUESTBODY_HPP_
diff --git a/src/protocol/header/ReplyMessageRequestHeader.cpp b/src/protocol/header/ReplyMessageRequestHeader.cpp
deleted file mode 100644
index 7fd3625..0000000
--- a/src/protocol/header/ReplyMessageRequestHeader.cpp
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ReplyMessageRequestHeader.h"
-
-#include <memory>
-
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-ReplyMessageRequestHeader* ReplyMessageRequestHeader::Decode(std::map<std::string, std::string>& extFields) {
-  std::unique_ptr<ReplyMessageRequestHeader> header(new ReplyMessageRequestHeader());
-
-  header->producerGroup = extFields.at("producerGroup");
-  header->topic = extFields.at("topic");
-  header->defaultTopic = extFields.at("defaultTopic");
-  header->defaultTopicQueueNums = std::stoi(extFields.at("defaultTopicQueueNums"));
-  header->queueId = std::stoi(extFields.at("queueId"));
-  header->sysFlag = std::stoi(extFields.at("sysFlag"));
-  header->bornTimestamp = std::stoll(extFields.at("bornTimestamp"));
-  header->flag = std::stoi(extFields.at("flag"));
-
-  auto it = extFields.find("properties");
-  if (it != extFields.end()) {
-    header->properties = it->second;
-  }
-
-  it = extFields.find("reconsumeTimes");
-  if (it != extFields.end()) {
-    header->reconsumeTimes = std::stoi(it->second);
-  } else {
-    header->reconsumeTimes = 0;
-  }
-
-  it = extFields.find("unitMode");
-  if (it != extFields.end()) {
-    header->unitMode = UtilAll::stob(it->second);
-  } else {
-    header->unitMode = false;
-  }
-
-  header->bornHost = extFields.at("bornHost");
-  header->storeHost = extFields.at("storeHost");
-  header->storeTimestamp = std::stoll(extFields.at("storeTimestamp"));
-
-  return header.release();
-}
-
-const std::string& ReplyMessageRequestHeader::getProducerGroup() const {
-  return this->producerGroup;
-}
-
-void ReplyMessageRequestHeader::setProducerGroup(const std::string& producerGroup) {
-  this->producerGroup = producerGroup;
-}
-
-const std::string& ReplyMessageRequestHeader::getTopic() const {
-  return this->topic;
-}
-
-void ReplyMessageRequestHeader::setTopic(const std::string& topic) {
-  this->topic = topic;
-}
-
-const std::string& ReplyMessageRequestHeader::getDefaultTopic() const {
-  return this->defaultTopic;
-}
-
-void ReplyMessageRequestHeader::setDefaultTopic(const std::string& defaultTopic) {
-  this->defaultTopic = defaultTopic;
-}
-
-int32_t ReplyMessageRequestHeader::getDefaultTopicQueueNums() const {
-  return this->defaultTopicQueueNums;
-}
-
-void ReplyMessageRequestHeader::setDefaultTopicQueueNums(int32_t defaultTopicQueueNums) {
-  this->defaultTopicQueueNums = defaultTopicQueueNums;
-}
-
-int32_t ReplyMessageRequestHeader::getQueueId() const {
-  return this->queueId;
-}
-
-void ReplyMessageRequestHeader::setQueueId(int32_t queueId) {
-  this->queueId = queueId;
-}
-
-int32_t ReplyMessageRequestHeader::getSysFlag() const {
-  return this->sysFlag;
-}
-
-void ReplyMessageRequestHeader::setSysFlag(int32_t sysFlag) {
-  this->sysFlag = sysFlag;
-}
-
-int64_t ReplyMessageRequestHeader::getBornTimestamp() const {
-  return this->bornTimestamp;
-}
-
-void ReplyMessageRequestHeader::setBornTimestamp(int64_t bornTimestamp) {
-  this->bornTimestamp = bornTimestamp;
-}
-
-int32_t ReplyMessageRequestHeader::getFlag() const {
-  return this->flag;
-}
-
-void ReplyMessageRequestHeader::setFlag(int32_t flag) {
-  this->flag = flag;
-}
-
-const std::string& ReplyMessageRequestHeader::getProperties() const {
-  return this->properties;
-}
-
-void ReplyMessageRequestHeader::setProperties(const std::string& properties) {
-  this->properties = properties;
-}
-
-int32_t ReplyMessageRequestHeader::getReconsumeTimes() const {
-  return this->reconsumeTimes;
-}
-
-void ReplyMessageRequestHeader::setReconsumeTimes(int32_t reconsumeTimes) {
-  this->reconsumeTimes = reconsumeTimes;
-}
-
-bool ReplyMessageRequestHeader::getUnitMode() const {
-  return this->unitMode;
-}
-
-void ReplyMessageRequestHeader::setUnitMode(bool unitMode) {
-  this->unitMode = unitMode;
-}
-
-const std::string& ReplyMessageRequestHeader::getBornHost() const {
-  return this->bornHost;
-}
-
-void ReplyMessageRequestHeader::setBornHost(const std::string& bornHost) {
-  this->bornHost = bornHost;
-}
-
-const std::string& ReplyMessageRequestHeader::getStoreHost() const {
-  return this->storeHost;
-}
-
-void ReplyMessageRequestHeader::setStoreHost(const std::string& storeHost) {
-  this->storeHost = storeHost;
-}
-
-int64_t ReplyMessageRequestHeader::getStoreTimestamp() const {
-  return this->storeTimestamp;
-}
-
-void ReplyMessageRequestHeader::setStoreTimestamp(int64_t storeTimestamp) {
-  this->storeTimestamp = storeTimestamp;
-}
-
-}  // namespace rocketmq
diff --git a/src/protocol/header/ReplyMessageRequestHeader.h b/src/protocol/header/ReplyMessageRequestHeader.h
deleted file mode 100644
index 5fd341e..0000000
--- a/src/protocol/header/ReplyMessageRequestHeader.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __REPLY_MESSAGE_REQUEST_HEADER_H__
-#define __REPLY_MESSAGE_REQUEST_HEADER_H__
-
-#include <vector>
-
-#include "CommandCustomHeader.h"
-
-namespace rocketmq {
-
-class ReplyMessageRequestHeader : public CommandCustomHeader {
- public:
-  static ReplyMessageRequestHeader* Decode(std::map<std::string, std::string>& extFields);
-
-  const std::string& getProducerGroup() const;
-  void setProducerGroup(const std::string& producerGroup);
-
-  const std::string& getTopic() const;
-  void setTopic(const std::string& topic);
-
-  const std::string& getDefaultTopic() const;
-  void setDefaultTopic(const std::string& defaultTopic);
-
-  int32_t getDefaultTopicQueueNums() const;
-  void setDefaultTopicQueueNums(int32_t defaultTopicQueueNums);
-
-  int32_t getQueueId() const;
-  void setQueueId(int32_t queueId);
-
-  int32_t getSysFlag() const;
-  void setSysFlag(int32_t sysFlag);
-
-  int64_t getBornTimestamp() const;
-  void setBornTimestamp(int64_t bornTimestamp);
-
-  int32_t getFlag() const;
-  void setFlag(int32_t flag);
-
-  const std::string& getProperties() const;
-  void setProperties(const std::string& properties);
-
-  int32_t getReconsumeTimes() const;
-  void setReconsumeTimes(int32_t reconsumeTimes);
-
-  bool getUnitMode() const;
-  void setUnitMode(bool unitMode);
-
-  const std::string& getBornHost() const;
-  void setBornHost(const std::string& bornHost);
-
-  const std::string& getStoreHost() const;
-  void setStoreHost(const std::string& storeHost);
-
-  int64_t getStoreTimestamp() const;
-  void setStoreTimestamp(int64_t stroeTimestamp);
-
- private:
-  std::string producerGroup;
-  std::string topic;
-  std::string defaultTopic;
-  int32_t defaultTopicQueueNums;
-  int32_t queueId;
-  int32_t sysFlag;
-  int64_t bornTimestamp;
-  int32_t flag;
-  std::string properties;  // nullable
-  int32_t reconsumeTimes;  // nullable
-  bool unitMode;           // nullable
-
-  std::string bornHost;
-  std::string storeHost;
-  int64_t storeTimestamp;
-};
-
-}  // namespace rocketmq
-
-#endif  // __REPLY_MESSAGE_REQUEST_HEADER_H__
diff --git a/src/protocol/header/ReplyMessageRequestHeader.hpp b/src/protocol/header/ReplyMessageRequestHeader.hpp
new file mode 100644
index 0000000..3aaab7b
--- /dev/null
+++ b/src/protocol/header/ReplyMessageRequestHeader.hpp
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_HEADER_REPLY_MESSAGE_REQUEST_HEADER_HPP_
+#define ROCKETMQ_PROTOCOL_HEADER_REPLY_MESSAGE_REQUEST_HEADER_HPP_
+
+#include <vector>
+
+#include "CommandCustomHeader.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+class ReplyMessageRequestHeader : public CommandCustomHeader {
+ public:
+  static ReplyMessageRequestHeader* Decode(std::map<std::string, std::string>& extFields) {
+    std::unique_ptr<ReplyMessageRequestHeader> header(new ReplyMessageRequestHeader());
+
+    header->producer_group_ = extFields.at("producerGroup");
+    header->topic_ = extFields.at("topic");
+    header->default_topic_ = extFields.at("defaultTopic");
+    header->default_topic_queue_nums_ = std::stoi(extFields.at("defaultTopicQueueNums"));
+    header->queue_id_ = std::stoi(extFields.at("queueId"));
+    header->sys_flag_ = std::stoi(extFields.at("sysFlag"));
+    header->born_timestamp_ = std::stoll(extFields.at("bornTimestamp"));
+    header->flag_ = std::stoi(extFields.at("flag"));
+
+    auto it = extFields.find("properties");
+    if (it != extFields.end()) {
+      header->properties_ = it->second;
+    }
+
+    it = extFields.find("reconsumeTimes");
+    if (it != extFields.end()) {
+      header->reconsume_times_ = std::stoi(it->second);
+    } else {
+      header->reconsume_times_ = 0;
+    }
+
+    it = extFields.find("unitMode");
+    if (it != extFields.end()) {
+      header->unit_mode_ = UtilAll::stob(it->second);
+    } else {
+      header->unit_mode_ = false;
+    }
+
+    header->born_host_ = extFields.at("bornHost");
+    header->store_host_ = extFields.at("storeHost");
+    header->store_timestamp_ = std::stoll(extFields.at("storeTimestamp"));
+
+    return header.release();
+  }
+
+ public:
+  inline const std::string& producer_group() const { return this->producer_group_; }
+  inline void set_producer_group(const std::string& producerGroup) { this->producer_group_ = producerGroup; }
+
+  inline const std::string& topic() const { return this->topic_; }
+  inline void set_topic(const std::string& topic) { this->topic_ = topic; }
+
+  inline const std::string& default_topic() const { return this->default_topic_; }
+  inline void set_default_topic(const std::string& defaultTopic) { this->default_topic_ = defaultTopic; }
+
+  inline int32_t default_topic_queue_nums() const { return this->default_topic_queue_nums_; }
+  inline void set_default_topic_queue_nums(int32_t defaultTopicQueueNums) {
+    this->default_topic_queue_nums_ = defaultTopicQueueNums;
+  }
+
+  inline int32_t queue_id() const { return this->queue_id_; }
+  inline void set_queue_id(int32_t queueId) { this->queue_id_ = queueId; }
+
+  inline int32_t sys_flag() const { return this->sys_flag_; }
+  inline void set_sys_flag(int32_t sysFlag) { this->sys_flag_ = sysFlag; }
+
+  inline int64_t born_timestamp() const { return this->born_timestamp_; }
+  inline void set_born_timestamp(int64_t bornTimestamp) { this->born_timestamp_ = bornTimestamp; }
+
+  inline int32_t flag() const { return this->flag_; }
+  inline void set_flag(int32_t flag) { this->flag_ = flag; }
+
+  inline const std::string& properties() const { return this->properties_; }
+  inline void set_properties(const std::string& properties) { this->properties_ = properties; }
+
+  inline int32_t reconsume_times() const { return this->reconsume_times_; }
+  inline void set_reconsume_times(int32_t reconsumeTimes) { this->reconsume_times_ = reconsumeTimes; }
+
+  inline bool unit_mode() const { return this->unit_mode_; }
+  inline void set_unit_mode(bool unitMode) { this->unit_mode_ = unitMode; }
+
+  inline const std::string& born_host() const { return this->born_host_; }
+  inline void set_born_host(const std::string& bornHost) { this->born_host_ = bornHost; }
+
+  inline const std::string& store_host() const { return this->store_host_; }
+  inline void set_store_host(const std::string& storeHost) { this->store_host_ = storeHost; }
+
+  inline int64_t store_timestamp() const { return this->store_timestamp_; }
+  inline void set_store_timestamp(int64_t storeTimestamp) { this->store_timestamp_ = storeTimestamp; }
+
+ private:
+  std::string producer_group_;
+  std::string topic_;
+  std::string default_topic_;
+  int32_t default_topic_queue_nums_;
+  int32_t queue_id_;
+  int32_t sys_flag_;
+  int64_t born_timestamp_;
+  int32_t flag_;
+  std::string properties_;   // nullable
+  int32_t reconsume_times_;  // nullable
+  bool unit_mode_;           // nullable
+
+  std::string born_host_;
+  std::string store_host_;
+  int64_t store_timestamp_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_PROTOCOL_HEADER_REPLY_MESSAGE_REQUEST_HEADER_HPP_
diff --git a/src/protocol/heartbeat/ConsumerData.hpp b/src/protocol/heartbeat/ConsumerData.hpp
new file mode 100644
index 0000000..470d78b
--- /dev/null
+++ b/src/protocol/heartbeat/ConsumerData.hpp
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_HEARTBEAT_CONSUMERDATA_H_
+#define ROCKETMQ_PROTOCOL_HEARTBEAT_CONSUMERDATA_H_
+
+#include <algorithm>  // std::move
+#include <string>     // std::string
+#include <vector>     // std::vector
+
+#include <json/json.h>
+
+#include "ConsumeType.h"
+#include "SubscriptionData.hpp"
+
+namespace rocketmq {
+
+class ConsumerData {
+ public:
+  ConsumerData(const std::string& group_name,
+               ConsumeType consume_type,
+               MessageModel message_model,
+               ConsumeFromWhere consume_from_where,
+               std::vector<SubscriptionData>&& subscription_data_set)
+      : group_name_(group_name),
+        consume_type_(consume_type),
+        message_model_(message_model),
+        consume_from_where_(consume_from_where),
+        subscription_data_set_(std::move(subscription_data_set)) {}
+
+  bool operator<(const ConsumerData& other) const { return group_name_ < other.group_name_; }
+
+  Json::Value toJson() const {
+    Json::Value root;
+    root["groupName"] = group_name_;
+    root["consumeType"] = consume_type_;
+    root["messageModel"] = message_model_;
+    root["consumeFromWhere"] = consume_from_where_;
+
+    for (const auto& sd : subscription_data_set_) {
+      root["subscriptionDataSet"].append(sd.toJson());
+    }
+
+    return root;
+  }
+
+ public:
+  inline const std::string& group_name() const { return group_name_; }
+  inline void set_group_name(const std::string& group_name) { group_name_ = group_name; }
+
+  inline ConsumeType consume_type() const { return consume_type_; }
+  inline void set_consume_type(ConsumeType consume_type) { consume_type_ = consume_type; }
+
+  inline MessageModel message_model() const { return message_model_; }
+
+  inline ConsumeFromWhere consume_from_where() const { return consume_from_where_; }
+
+  inline std::vector<SubscriptionData> subscription_data_set() { return subscription_data_set_; }
+
+ private:
+  std::string group_name_;
+  ConsumeType consume_type_;
+  MessageModel message_model_;
+  ConsumeFromWhere consume_from_where_;
+  std::vector<SubscriptionData> subscription_data_set_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_PROTOCOL_HEARTBEAT_CONSUMERDATA_H_
diff --git a/src/protocol/heartbeat/HeartbeatData.hpp b/src/protocol/heartbeat/HeartbeatData.hpp
new file mode 100644
index 0000000..f4af3ca
--- /dev/null
+++ b/src/protocol/heartbeat/HeartbeatData.hpp
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_HEARTBEAT_HEARTBEATDATA_H_
+#define ROCKETMQ_PROTOCOL_HEARTBEAT_HEARTBEATDATA_H_
+
+#include <string>  // std::string
+#include <vector>  // std::vector
+
+#include "RemotingSerializable.h"
+#include "ConsumerData.hpp"
+#include "ProducerData.hpp"
+
+namespace rocketmq {
+
+class HeartbeatData : public RemotingSerializable {
+ public:
+  std::string encode() {
+    Json::Value root;
+
+    // id
+    root["clientID"] = client_id_;
+
+    // consumer
+    for (const auto& cd : consumer_data_set_) {
+      root["consumerDataSet"].append(cd.toJson());
+    }
+
+    // producer
+    for (const auto& pd : producer_data_set_) {
+      root["producerDataSet"].append(pd.toJson());
+    }
+
+    // output
+    return RemotingSerializable::toJson(root);
+  }
+
+ public:
+  inline void set_client_id(const std::string& clientID) { client_id_ = clientID; }
+
+  inline std::vector<ConsumerData>& consumer_data_set() { return consumer_data_set_; }
+
+  inline std::vector<ProducerData>& producer_data_set() { return producer_data_set_; }
+
+ private:
+  std::string client_id_;
+  std::vector<ConsumerData> consumer_data_set_;
+  std::vector<ProducerData> producer_data_set_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_PROTOCOL_HEARTBEAT_HEARTBEATDATA_H_
diff --git a/src/protocol/body/ResetOffsetBody.h b/src/protocol/heartbeat/ProducerData.hpp
similarity index 55%
rename from src/protocol/body/ResetOffsetBody.h
rename to src/protocol/heartbeat/ProducerData.hpp
index f1024b9..751a87f 100644
--- a/src/protocol/body/ResetOffsetBody.h
+++ b/src/protocol/heartbeat/ProducerData.hpp
@@ -14,27 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __RESET_OFFSET_BODY__
-#define __RESET_OFFSET_BODY__
+#ifndef ROCKETMQ_PROTOCOL_HEARTBEAT_PRODUCERDATA_H_
+#define ROCKETMQ_PROTOCOL_HEARTBEAT_PRODUCERDATA_H_
 
-#include <map>  // std::map
+#include <string>  // std::string
 
-#include "ByteArray.h"
-#include "MQMessageQueue.h"
+#include <json/json.h>
 
 namespace rocketmq {
 
-class ResetOffsetBody {
+class ProducerData {
  public:
-  static ResetOffsetBody* Decode(const ByteArray& bodyData);
+  ProducerData(const std::string& group_name) : group_name_(group_name) {}
 
-  std::map<MQMessageQueue, int64_t> getOffsetTable();
-  void setOffsetTable(const MQMessageQueue& mq, int64_t offset);
+  bool operator<(const ProducerData& other) const { return group_name_ < other.group_name_; }
+
+  Json::Value toJson() const {
+    Json::Value root;
+    root["groupName"] = group_name_;
+    return root;
+  }
+
+ public:
+  inline const std::string& group_name() const { return group_name_; }
+  inline void group_name(const std::string& group_name) { group_name_ = group_name; }
 
  private:
-  std::map<MQMessageQueue, int64_t> offset_table_;
+  std::string group_name_;
 };
 
 }  // namespace rocketmq
 
-#endif  // __RESET_OFFSET_BODY__
+#endif  // ROCKETMQ_PROTOCOL_HEARTBEAT_PRODUCERDATA_H_
diff --git a/src/common/SubscriptionData.h b/src/protocol/heartbeat/SubscriptionData.hpp
similarity index 50%
rename from src/common/SubscriptionData.h
rename to src/protocol/heartbeat/SubscriptionData.hpp
index 277403c..82f8334 100644
--- a/src/common/SubscriptionData.h
+++ b/src/protocol/heartbeat/SubscriptionData.hpp
@@ -14,32 +14,72 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef ROCKETMQ_SUBSCRIPTIONDATA_H_
-#define ROCKETMQ_SUBSCRIPTIONDATA_H_
+#ifndef ROCKETMQ_PROTOCOL_HEARTBEAT_SUBSCRIPTIONDATA_HPP_
+#define ROCKETMQ_PROTOCOL_HEARTBEAT_SUBSCRIPTIONDATA_HPP_
 
-#include <json/json.h>
+#include <cstdint>  // int64_t
+
+#include <string>  // std::string
+#include <vector>  // std::vector
 
-#include <string>
-#include <vector>
+#include <json/json.h>
 
-#include "RocketMQClient.h"
+#include "UtilAll.h"
 
 namespace rocketmq {
 
-class ROCKETMQCLIENT_API SubscriptionData {
+class SubscriptionData {
  public:
-  SubscriptionData();
-  SubscriptionData(const std::string& topic, const std::string& subString);
-  SubscriptionData(const SubscriptionData& other);
+  SubscriptionData() : sub_version_(UtilAll::currentTimeMillis()) {}
+  SubscriptionData(const std::string& topic, const std::string& subString)
+      : topic_(topic), sub_string_(subString), sub_version_(UtilAll::currentTimeMillis()) {}
+
+  SubscriptionData(const SubscriptionData& other) {
+    sub_string_ = other.sub_string_;
+    sub_version_ = other.sub_version_;
+    tag_set_ = other.tag_set_;
+    topic_ = other.topic_;
+    code_set_ = other.code_set_;
+  }
 
   virtual ~SubscriptionData() = default;
 
-  bool operator==(const SubscriptionData& other) const;
+  bool operator==(const SubscriptionData& other) const {
+    // FIXME: tags
+    return topic_ == other.topic_ && sub_string_ == other.sub_string_ && sub_version_ == other.sub_version_ &&
+           tag_set_.size() == other.tag_set_.size();
+  }
   bool operator!=(const SubscriptionData& other) const { return !operator==(other); }
 
-  bool operator<(const SubscriptionData& other) const;
+  bool operator<(const SubscriptionData& other) const {
+    int ret = topic_.compare(other.topic_);
+    if (ret == 0) {
+      return sub_string_.compare(other.sub_string_) < 0;
+    } else {
+      return ret < 0;
+    }
+  }
+
+  inline bool containsTag(const std::string& tag) const {
+    return std::find(tag_set_.begin(), tag_set_.end(), tag) != tag_set_.end();
+  }
 
-  Json::Value toJson() const;
+  Json::Value toJson() const {
+    Json::Value root;
+    root["topic"] = topic_;
+    root["subString"] = sub_string_;
+    root["subVersion"] = UtilAll::to_string(sub_version_);
+
+    for (const auto& tag : tag_set_) {
+      root["tagsSet"].append(tag);
+    }
+
+    for (const auto& code : code_set_) {
+      root["codeSet"].append(code);
+    }
+
+    return root;
+  }
 
  public:
   inline const std::string& topic() const { return topic_; }
@@ -51,13 +91,7 @@ class ROCKETMQCLIENT_API SubscriptionData {
 
   inline std::vector<std::string>& tags_set() { return tag_set_; }
 
-  inline void put_tag(const std::string& tag) { tag_set_.push_back(tag); }
-
-  inline bool contain_tag(const std::string& tag) const {
-    return std::find(tag_set_.begin(), tag_set_.end(), tag) != tag_set_.end();
-  }
-
-  inline void put_code(int32_t code) { code_set_.push_back(code); }
+  inline std::vector<int32_t>& code_set() { return code_set_; }
 
  private:
   std::string topic_;
@@ -69,4 +103,4 @@ class ROCKETMQCLIENT_API SubscriptionData {
 
 }  // namespace rocketmq
 
-#endif  // ROCKETMQ_SUBSCRIPTIONDATA_H_
+#endif  // ROCKETMQ_PROTOCOL_HEARTBEAT_SUBSCRIPTIONDATA_HPP_
diff --git a/test/src/protocol/ConsumerRunningInfoTest.cpp b/test/src/protocol/ConsumerRunningInfoTest.cpp
index 7c427fd..7c3dc8f 100644
--- a/test/src/protocol/ConsumerRunningInfoTest.cpp
+++ b/test/src/protocol/ConsumerRunningInfoTest.cpp
@@ -24,9 +24,6 @@
 #include <string>
 
 #include "ConsumerRunningInfo.h"
-#include "MessageQueue.h"
-#include "ProcessQueueInfo.h"
-#include "SubscriptionData.h"
 
 using std::map;
 using std::string;
diff --git a/test/src/protocol/HeartbeatDataTest.cpp b/test/src/protocol/HeartbeatDataTest.cpp
index a1e5393..87f76b2 100644
--- a/test/src/protocol/HeartbeatDataTest.cpp
+++ b/test/src/protocol/HeartbeatDataTest.cpp
@@ -19,11 +19,7 @@
 
 #include <vector>
 
-#include "ConsumeType.h"
-#include "HeartbeatData.h"
-#include "SubscriptionData.h"
-
-using std::vector;
+#include "protocol/heartbeat/HeartbeatData.hpp"
 
 using testing::InitGoogleMock;
 using testing::InitGoogleTest;
@@ -38,24 +34,16 @@ using rocketmq::ProducerData;
 using rocketmq::SubscriptionData;
 
 TEST(HeartbeatDataTest, ProducerData) {
-  ProducerData producerData;
-  producerData.groupName = "testGroup";
+  ProducerData producerData("testGroup");
 
   Json::Value outJson = producerData.toJson();
   EXPECT_EQ(outJson["groupName"], "testGroup");
 }
 
 TEST(HeartbeatDataTest, ConsumerData) {
-  ConsumerData consumerData;
-  consumerData.groupName = "testGroup";
-  consumerData.consumeType = ConsumeType::CONSUME_ACTIVELY;
-  consumerData.messageModel = MessageModel::BROADCASTING;
-  consumerData.consumeFromWhere = ConsumeFromWhere::CONSUME_FROM_TIMESTAMP;
-
-  vector<SubscriptionData> subs;
-  subs.push_back(SubscriptionData("testTopic", "sub"));
-
-  consumerData.subscriptionDataSet = subs;
+  ConsumerData consumerData("testGroup", ConsumeType::CONSUME_ACTIVELY, MessageModel::BROADCASTING,
+                            ConsumeFromWhere::CONSUME_FROM_TIMESTAMP,
+                            std::vector<SubscriptionData>{SubscriptionData("testTopic", "sub")});
 
   Json::Value outJson = consumerData.toJson();
 
@@ -72,28 +60,17 @@ TEST(HeartbeatDataTest, ConsumerData) {
 
 TEST(HeartbeatDataTest, HeartbeatData) {
   HeartbeatData heartbeatData;
-  heartbeatData.setClientID("testClientId");
-
-  ProducerData producerData;
-  producerData.groupName = "testGroup";
-
-  EXPECT_TRUE(heartbeatData.isProducerDataSetEmpty());
-  heartbeatData.insertDataToProducerDataSet(producerData);
-  EXPECT_FALSE(heartbeatData.isProducerDataSetEmpty());
-
-  ConsumerData consumerData;
-  consumerData.groupName = "testGroup";
-  consumerData.consumeType = ConsumeType::CONSUME_ACTIVELY;
-  consumerData.messageModel = MessageModel::BROADCASTING;
-  consumerData.consumeFromWhere = ConsumeFromWhere::CONSUME_FROM_TIMESTAMP;
+  heartbeatData.set_client_id("testClientId");
 
-  vector<SubscriptionData> subs;
-  subs.push_back(SubscriptionData("testTopic", "sub"));
+  EXPECT_TRUE(heartbeatData.producer_data_set().empty());
+  heartbeatData.producer_data_set().emplace_back("testGroup");
+  EXPECT_FALSE(heartbeatData.producer_data_set().empty());
 
-  consumerData.subscriptionDataSet = subs;
-  EXPECT_TRUE(heartbeatData.isConsumerDataSetEmpty());
-  heartbeatData.insertDataToConsumerDataSet(consumerData);
-  EXPECT_FALSE(heartbeatData.isConsumerDataSetEmpty());
+  EXPECT_TRUE(heartbeatData.consumer_data_set().empty());
+  heartbeatData.consumer_data_set().emplace_back("testGroup", ConsumeType::CONSUME_ACTIVELY, MessageModel::BROADCASTING,
+                                                 ConsumeFromWhere::CONSUME_FROM_TIMESTAMP,
+                                                 std::vector<SubscriptionData>{SubscriptionData("testTopic", "sub")});
+  EXPECT_FALSE(heartbeatData.consumer_data_set().empty());
 
   std::string outData = heartbeatData.encode();
 
diff --git a/test/src/protocol/LockBatchBodyTest.cpp b/test/src/protocol/LockBatchBodyTest.cpp
index 64da9fb..8536489 100644
--- a/test/src/protocol/LockBatchBodyTest.cpp
+++ b/test/src/protocol/LockBatchBodyTest.cpp
@@ -21,7 +21,9 @@
 
 #include "ByteArray.h"
 #include "MQMessageQueue.h"
-#include "protocol/body/LockBatchBody.h"
+#include "protocol/body/LockBatchRequestBody.hpp"
+#include "protocol/body/LockBatchResponseBody.hpp"
+#include "protocol/body/UnlockBatchRequestBody.hpp"
 
 using testing::InitGoogleMock;
 using testing::InitGoogleTest;
@@ -36,18 +38,18 @@ using rocketmq::UnlockBatchRequestBody;
 TEST(LockBatchBodyTest, LockBatchRequestBody) {
   LockBatchRequestBody lockBatchRequestBody;
 
-  lockBatchRequestBody.setClientId("testClientId");
-  EXPECT_EQ(lockBatchRequestBody.getClientId(), "testClientId");
+  lockBatchRequestBody.set_client_id("testClientId");
+  EXPECT_EQ(lockBatchRequestBody.client_id(), "testClientId");
 
-  lockBatchRequestBody.setConsumerGroup("testGroup");
-  EXPECT_EQ(lockBatchRequestBody.getConsumerGroup(), "testGroup");
+  lockBatchRequestBody.set_consumer_group("testGroup");
+  EXPECT_EQ(lockBatchRequestBody.consumer_group(), "testGroup");
 
   std::vector<MQMessageQueue> messageQueueList;
   messageQueueList.push_back(MQMessageQueue("testTopic", "testBroker", 1));
   messageQueueList.push_back(MQMessageQueue("testTopic", "testBroker", 2));
 
-  lockBatchRequestBody.setMqSet(messageQueueList);
-  EXPECT_EQ(lockBatchRequestBody.getMqSet(), messageQueueList);
+  lockBatchRequestBody.set_mq_set(messageQueueList);
+  EXPECT_EQ(lockBatchRequestBody.mq_set(), messageQueueList);
 
   std::string outData = lockBatchRequestBody.encode();
 
@@ -81,7 +83,7 @@ TEST(LockBatchBodyTest, LockBatchResponseBody) {
   std::unique_ptr<LockBatchResponseBody> lockBatchResponseBody(LockBatchResponseBody::Decode(bodyData));
 
   MQMessageQueue messageQueue("testTopic", "testBroker", 1);
-  EXPECT_EQ(messageQueue, lockBatchResponseBody->getLockOKMQSet()[0]);
+  EXPECT_EQ(messageQueue, lockBatchResponseBody->lock_ok_mq_set()[0]);
 }
 
 int main(int argc, char* argv[]) {
diff --git a/test/src/protocol/MessageQueueTest.cpp b/test/src/protocol/MessageQueueTest.cpp
index 0f6f5fc..072b4f2 100644
--- a/test/src/protocol/MessageQueueTest.cpp
+++ b/test/src/protocol/MessageQueueTest.cpp
@@ -17,7 +17,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "MessageQueue.h"
+#include "MessageQueue.hpp"
 
 using testing::InitGoogleMock;
 using testing::InitGoogleTest;
diff --git a/test/src/protocol/TopicRouteDataTest.cpp b/test/src/protocol/TopicRouteDataTest.cpp
index cb70c10..ef94cdb 100644
--- a/test/src/protocol/TopicRouteDataTest.cpp
+++ b/test/src/protocol/TopicRouteDataTest.cpp
@@ -20,7 +20,7 @@
 #include <memory>
 
 #include "ByteArray.h"
-#include "TopicRouteData.hpp"
+#include "protocol/body/TopicRouteData.hpp"
 
 using testing::InitGoogleMock;
 using testing::InitGoogleTest;
diff --git a/test/src/transport/ClientRemotingProcessorTest.cpp b/test/src/transport/ClientRemotingProcessorTest.cpp
index 34f7c6a..1a840d9 100644
--- a/test/src/transport/ClientRemotingProcessorTest.cpp
+++ b/test/src/transport/ClientRemotingProcessorTest.cpp
@@ -34,7 +34,7 @@
 #include "SessionCredentials.h"
 #include "TcpTransport.h"
 #include "UtilAll.h"
-#include "protocol/body/ResetOffsetBody.h"
+#include "protocol/body/ResetOffsetBody.hpp"
 #include "protocol/header/CommandHeader.h"
 
 using testing::_;