You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by gx...@apache.org on 2020/09/15 11:47:05 UTC

[incubator-tubemq] 49/50: [TUBEMQ-350]C++ SDK client code adj (#262)

This is an automated email from the ASF dual-hosted git repository.

gxcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 1cc56e53bfec60ce7eb5fd9cd4c3d95091463212
Author: charlely <41...@users.noreply.github.com>
AuthorDate: Tue Sep 15 12:26:53 2020 +0800

    [TUBEMQ-350]C++ SDK client code adj (#262)
    
    Co-authored-by: charleli <ch...@tencent.com>
---
 .../tubemq-client-cpp/src/CMakeLists.txt           |   2 +-
 .../tubemq-client-cpp/src/baseconsumer.cc          |  64 ++--
 .../tubemq-client-cpp/src/client_connection.cc     | 287 ++++++++++++++
 .../tubemq-client-cpp/src/client_connection.h      | 101 +++++
 .../tubemq-client-cpp/src/client_service.h         | 417 ++++++---------------
 .../tubemq-client-cpp/src/client_subinfo.cc        |  55 +--
 .../tubemq-client-cpp/src/client_subinfo.h         |  79 ++++
 .../tubemq-client-cpp/src/connection.h             |  14 +-
 .../tubemq-client-cpp/src/connection_pool.h        |   5 +-
 .../tubemq-client-cpp/src/const_config.h           | 120 ++++++
 .../tubemq-client-cpp/src/const_rpc.h              |  91 +++++
 .../tubemq-client-cpp/src/executor_pool.cc         |  21 +-
 .../tubemq-client-cpp/src/executor_pool.h          |  95 +++++
 .../tubemq-client-cpp/src/file_ini.cc              |  26 +-
 .../tubemq-client-cpp/src/file_ini.h               |  51 +++
 .../tubemq-client-cpp/src/flowctrl_def.cc          | 244 ++++++------
 .../tubemq-client-cpp/src/flowctrl_def.h           | 144 +++++++
 .../tubemq-client-cpp/src/logger.cc                |   9 +-
 tubemq-client-twins/tubemq-client-cpp/src/logger.h | 118 ++++++
 .../tubemq-client-cpp/src/meta_info.cc             | 417 +++++++++++----------
 .../tubemq-client-cpp/src/meta_info.h              | 188 ++++++++++
 .../tubemq-client-cpp/src/noncopyable.h            |  37 ++
 .../tubemq-client-cpp/src/rmt_data_cache.cc        | 274 +++++++++-----
 .../tubemq-client-cpp/src/rmt_data_cache.h         | 166 ++++++++
 .../src/{thread_pool.h => singleton.h}             |  60 ++-
 .../tubemq-client-cpp/src/thread_pool.h            |   5 +-
 .../tubemq-client-cpp/src/tubemq_client.cc         | 184 +++++++++
 .../tubemq-client-cpp/src/tubemq_codec.h           |  21 +-
 .../tubemq-client-cpp/src/tubemq_config.cc         | 342 +++++++++--------
 .../tubemq-client-cpp/src/tubemq_message.cc        | 126 ++++---
 .../tubemq-client-cpp/src/tubemq_return.cc         | 123 +++---
 .../tubemq-client-cpp/src/unique_seq_id.h          |  41 ++
 tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 238 +++++++++++-
 tubemq-client-twins/tubemq-client-cpp/src/utils.h  |  71 ++++
 .../tubemq-client-cpp/src/version.h                |  33 ++
 35 files changed, 3115 insertions(+), 1154 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
index 5995d5c..938ca2a 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
@@ -23,7 +23,7 @@ cmake_minimum_required (VERSION 3.1)
 
 AUX_SOURCE_DIRECTORY(. CURRENT_DIR_SRCS)                                        
 ADD_LIBRARY(tubemq STATIC ${CURRENT_DIR_SRCS})   
-TARGET_LINK_LIBRARIES (tubemq)
+TARGET_LINK_LIBRARIES (tubemq libprotobuf.a liblog4cplus.a tubemq_proto)
 
 
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index 215892d..714b353 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -137,7 +137,7 @@ bool BaseConsumer::GetMessage(ConsumerResult& result) {
   string err_info;
   PartitionExt partition_ext;
   string confirm_context;
-  
+
   if (!IsConsumeReady(result)) {
     return false;
   }
@@ -146,7 +146,9 @@ bool BaseConsumer::GetMessage(ConsumerResult& result) {
     result.SetFailureResult(error_code, err_info);
     return false;
   }
-  long curr_offset = tb_config::kInvalidValue;
+
+  int64_t curr_offset = tb_config::kInvalidValue;
+
   bool filter_consume = sub_info_.IsFilterConsume(partition_ext.GetTopic());
   PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
     partition_ext.GetPartitionKey(), curr_offset);
@@ -206,9 +208,11 @@ bool BaseConsumer::IsConsumeReady(ConsumerResult& result) {
     if (err_code::kErrSuccess == ret_code) {
       return true;
     }
-    if ((config_.GetMaxPartCheckPeriodMs() > 0)
-      && (Utils::GetCurrentTimeMillis() - start_time 
-      > config_.GetMaxPartCheckPeriodMs())) {
+
+    if ((config_.GetMaxPartCheckPeriodMs() >= 0)
+      && (Utils::GetCurrentTimeMillis() - start_time
+      >= config_.GetMaxPartCheckPeriodMs())) {
+
       switch (ret_code) {
         case err_code::kErrNoPartAssigned: {
           result.SetFailureResult(ret_code,
@@ -277,7 +281,9 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
   string part_key = Utils::Trim(confirm_context.substr(0, pos1));
   string booked_time_str =
       Utils::Trim(confirm_context.substr(pos1 + token1.size(), confirm_context.size()));
-  long booked_time = atol(booked_time_str.c_str());
+
+  int64_t booked_time = atol(booked_time_str.c_str());
+
   pos1 = part_key.find(token2);
   if (string::npos == pos1) {
     result.SetFailureResult(err_code::kErrBadRequest,
@@ -305,7 +311,9 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
                             "Not found the partition by confirm_context!");
     return false;
   }
-  long curr_offset = tb_config::kInvalidValue;
+
+  int64_t curr_offset = tb_config::kInvalidValue;
+
   PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
     partition_ext.GetPartitionKey(), curr_offset);
   auto request = std::make_shared<RequestContext>();
@@ -344,7 +352,9 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
     if (rsp->success_) {
       CommitOffsetResponseB2C rsp_b2c;
       ret_result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
-                                          (int)(rsp->rsp_body_.data().length()));
+
+                                          (int32_t)(rsp->rsp_body_.data().length()));
+
       if (ret_result) {
         if (rsp_b2c.success()) {
           curr_offset = rsp_b2c.curroffset();
@@ -439,7 +449,8 @@ bool BaseConsumer::register2Master(int32_t& error_code, string& err_info, bool n
       error_code = error.Value();
       err_info = error.Message();
     }
-    if (error_code == err_code::kErrConsumeGroupForbidden 
+
+    if (error_code == err_code::kErrConsumeGroupForbidden
       || error_code == err_code::kErrConsumeContentForbidden) {
       // set regist process status to existed
       master_reg_status_.CompareAndSet(1, 0);
@@ -805,7 +816,8 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
           if (rsp->success_) {
             HeartBeatResponseB2C rsp_b2c;
             bool result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
-                                                 (int)(rsp->rsp_body_.data().length()));
+                                                (int32_t)(rsp->rsp_body_.data().length()));
+
             if (result) {
               set<string> partition_keys;
               if (rsp_b2c.success()) {
@@ -1066,7 +1078,8 @@ bool BaseConsumer::processRegisterResponseM2C(int32_t& error_code, string& err_i
   }
   RegisterResponseM2C rsp_m2c;
   bool result = rsp_m2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
-                                       (int)(rsp_protocol->rsp_body_.data().length()));
+                                       (int32_t)(rsp_protocol->rsp_body_.data().length()));
+
   if (!result) {
     error_code = err_code::kErrParseFailure;
     err_info = "Parse RegisterResponseM2C response failure!";
@@ -1223,7 +1236,8 @@ bool BaseConsumer::processRegResponseB2C(int32_t& error_code, string& err_info,
   }
   RegisterResponseB2C rsp_b2c;
   bool result = rsp_b2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
-                                       (int)(rsp_protocol->rsp_body_.data().length()));
+                                       (int32_t)(rsp_protocol->rsp_body_.data().length()));
+
   if (!result) {
     error_code = err_code::kErrParseFailure;
     err_info = "Parse RegisterResponseB2C response failure!";
@@ -1259,10 +1273,8 @@ void BaseConsumer::convertMessages(int32_t& msg_size, list<Message>& message_lis
     int32_t payload_length = tsfMsg.payloaddata().length();
     int32_t calc_checksum = Utils::Crc32(tsfMsg.payloaddata());
     if (in_check_sum != calc_checksum) {
-
       LOG_TRACE("[CONSUMER] convertMessages [%d], Crc32 failure, in=%d, calc=%d, client=%s",
         i, in_check_sum, calc_checksum, client_uuid_.c_str());
-
       continue;
     }
     int read_pos = 0;
@@ -1272,10 +1284,8 @@ void BaseConsumer::convertMessages(int32_t& msg_size, list<Message>& message_lis
     memcpy(&payload_data[0], tsfMsg.payloaddata().c_str(), payload_length);
     if ((flag & tb_config::kMsgFlagIncProperties) == 1) {
       if (payload_length < 4) {
-
         LOG_TRACE("[CONSUMER] convertMessages [%d], payload_length(%d) < 4, client=%s",
           i, payload_length, client_uuid_.c_str());
-
         continue;
       }
       int32_t attr_len = ntohl(*(int*)(&payload_data[0]));
@@ -1322,6 +1332,8 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
                                              bool filter_consume, const PartitionExt& partition_ext,
                                              const string& confirm_context,
                                              const TubeMQCodec::RspProtocolPtr& rsp) {
+
+  // #lizard forgives
   string err_info;
 
   if (!rsp->success_) {
@@ -1334,8 +1346,9 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
     return false;
   }
   GetMessageResponseB2C rsp_b2c;
-  bool ret_result =
-      rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(), (int)(rsp->rsp_body_.data().length()));
+  bool ret_result = rsp_b2c.ParseFromArray(
+    rsp->rsp_body_.data().c_str(), (int32_t)(rsp->rsp_body_.data().length()));
+
   if (!ret_result) {
     rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false);
     result.SetFailureResult(err_code::kErrServerError,
@@ -1343,16 +1356,18 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
                             partition_ext.GetTopic(), peer_info);
 
     LOG_TRACE("[CONSUMER] processGetMessageRspB2C parse failure, client=%s", client_uuid_.c_str());
-
     return false;
   }
 
   switch (rsp_b2c.errcode()) {
     case err_code::kErrSuccess: {
       bool esc_limit = (rsp_b2c.has_escflowctrl() && rsp_b2c.escflowctrl());
-      long data_dltval =
+
+      int64_t data_dltval =
           rsp_b2c.has_currdatadlt() ? rsp_b2c.currdatadlt() : tb_config::kInvalidValue;
-      long curr_offset = rsp_b2c.has_curroffset() ? rsp_b2c.curroffset() : tb_config::kInvalidValue;
+      int64_t curr_offset = rsp_b2c.has_curroffset() ?
+        rsp_b2c.curroffset() : tb_config::kInvalidValue;
+
       bool req_slow = rsp_b2c.has_requireslow() ? rsp_b2c.requireslow() : false;
       int msg_size = 0;
       list<Message> message_list;
@@ -1377,7 +1392,8 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
 
     case err_code::kErrConsumeSpeedLimit: {
       // Process with server side speed limit
-      long def_dlttime = rsp_b2c.has_minlimittime() ? rsp_b2c.minlimittime()
+
+      int64_t def_dlttime = rsp_b2c.has_minlimittime() ? rsp_b2c.minlimittime()
                                                     : config_.GetMsgNotFoundWaitPeriodMs();
       rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false,
                                   tb_config::kInvalidValue, rsp_b2c.errcode(), false, 0,
@@ -1393,7 +1409,7 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
     case err_code::kErrServiceUnavilable:
     default: {
       // Slow down the request based on the limitation configuration when meet these errors
-      long limit_dlt = 300;
+      int64_t limit_dlt = 300;
       switch (rsp_b2c.errcode()) {
         case err_code::kErrForbidden: {
           limit_dlt = 2000;
@@ -1455,8 +1471,6 @@ int32_t BaseConsumer::getConsumeReadStatus(bool is_first_reg) {
       LOG_INFO("[Consumer From Max Offset Always], clientId=%s", client_uuid_.c_str());
     }
   }
-  LOG_INFO("[getConsumeReadStatus], readStatus=%d, is_first_reg=%d, config_.GetConsumePosition()=%d",
-    readStatus, is_first_reg, config_.GetConsumePosition());
   return readStatus;
 }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
new file mode 100644
index 0000000..c795d35
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "client_connection.h"
+
+using namespace tubemq;
+
+void ClientConnection::AsyncWrite(RequestContextPtr& req) {
+  auto self = shared_from_this();
+  executor_->Post([self, this, req]() {
+    if (request_list_.find(req->request_id_) != request_list_.end()) {
+      LOG_ERROR("Write requestid[%d] is repeat", req->request_id_);
+      return;
+    }
+    auto& transport_req = request_list_[req->request_id_];
+    transport_req.req_ = req;
+    bool queue_empty = write_queue_.empty();
+    write_queue_.push_back(req->request_id_);
+    if (req->timeout_ > 0) {
+      transport_req.deadline_ = executor_->CreateSteadyTimer();
+      transport_req.deadline_->expires_after(std::chrono::milliseconds(req->timeout_));
+      transport_req.deadline_->async_wait(std::bind(&ClientConnection::requestTimeoutHandle,
+                                                    shared_from_this(), std::placeholders::_1,
+                                                    transport_req.req_));
+    }
+    if (IsConnected() && queue_empty) {
+      asyncWrite();
+    }
+  });
+}
+
+void ClientConnection::Close() {
+  auto self = shared_from_this();
+  executor_->Post([self, this]() { close(); });
+}
+
+void ClientConnection::requestTimeoutHandle(const std::error_code& ec, RequestContextPtr req) {
+  if (ec) {
+    return;
+  }
+  auto request_id = req->request_id_;
+  auto err = ErrorCode(err_code::kErrNetWorkTimeout, "Request is timeout");
+  requestCallback(request_id, &err);
+}
+
+void ClientConnection::close(const std::error_code* err) {
+  if (IsStop()) {
+    return;
+  }
+  status_ = kDisconnected;
+  LOG_INFO("%scloseed", ToString().c_str());
+  socket_->close();
+  if (notifier_ != nullptr) {
+    notifier_(err);
+  }
+  releaseAllRequest(err);
+}
+
+void ClientConnection::releaseAllRequest(const std::error_code* err) {
+  std::string msg = "connect close ";
+  if (err != nullptr) {
+    msg += "error_code, value:";
+    msg += std::to_string(err->value());
+    msg += ", msg:";
+    msg += err->message();
+    msg += ", category:";
+    msg += err->category().name();
+  }
+
+  auto terr = ErrorCode(err_code::kErrNetworkError, msg);
+  for (auto& it : request_list_) {
+    it.second.req_->promise_.SetFailed(terr);
+    it.second.deadline_->cancel();
+  }
+  request_list_.clear();
+  write_queue_.clear();
+  recv_buffer_->Reset();
+}
+
+void ClientConnection::connect(const asio::ip::tcp::resolver::results_type& endpoints) {
+  if (IsStop()) {
+    return;
+  }
+  status_ = kConnecting;
+  deadline_->expires_after(std::chrono::milliseconds(kConnnectMaxTimeMs));
+  deadline_->async_wait(std::bind(&ClientConnection::checkDeadline, this, std::placeholders::_1));
+  asio::async_connect(
+      *socket_, endpoints, [this](std::error_code ec, asio::ip::tcp::endpoint endpoint) {
+        deadline_->cancel();
+        if (ec) {
+          status_ = kDisconnected;
+          LOG_ERROR("%s[%s:%d]async connect error:%d, %s, %s", ToString().c_str(), ip_.c_str(),
+                    port_, ec.value(), ec.message().c_str(), ec.category().name());
+          close(&ec);
+          return;
+        }
+        status_ = kConnected;
+        socket_->set_option(asio::ip::tcp::no_delay(true));
+        // socket_->set_option(asio::ip::tcp::socket::reuse_address(true));
+        contextString();
+        LOG_INFO("%sis connected", ToString().c_str());
+
+        asyncWrite();
+        asyncRead();
+      });
+}
+
+void ClientConnection::checkDeadline(const std::error_code& ec) {
+  if (ec) {
+    return;
+  }
+  if (IsStop()) {
+    return;
+  }
+  LOG_ERROR("%s connect timeout", ToString().c_str());
+  close();
+}
+
+void ClientConnection::contextString() {
+  std::stringstream stream;
+  stream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
+  context_string_ += stream.str();
+}
+
+void ClientConnection::asyncRead() {
+  if (IsStop()) {
+    return;
+  }
+  if (recv_buffer_->capacity() > rpc_config::kRpcRecvBufferMaxBytes) {
+    LOG_ERROR("%sbuffer capacity over config:%d", ToString().c_str(),
+              rpc_config::kRpcRecvBufferMaxBytes);
+    close();
+    return;
+  }
+  recv_buffer_->EnsureWritableBytes(rpc_config::kRpcEnsureWriteableBytes);
+  auto self = shared_from_this();
+  socket_->async_read_some(
+      asio::buffer(recv_buffer_->WriteBegin(), recv_buffer_->WritableBytes()),
+      [self, this](std::error_code ec, std::size_t len) {
+        if (ec) {
+          LOG_ERROR("[%s]async read error:%d, %s, %s", ToString().c_str(), ec.value(),
+                    ec.message().c_str(), ec.category().name());
+          close(&ec);
+          return;
+        }
+        if (len == 0) {
+          LOG_ERROR("[%s]async read zero", ToString().c_str());
+          close(&ec);
+          return;
+        }
+        recv_time_ = std::time(nullptr);
+        recv_buffer_->WriteBytes(len);
+        checkPackageDone();
+        LOG_TRACE("[%s]async read done, len:%ld, package_length_:%ld, recvbuffer:%s",
+                  ToString().c_str(), len, package_length_, recv_buffer_->String().c_str());
+        asyncRead();
+      });
+}
+
+void ClientConnection::checkPackageDone() {
+  if (check_ == nullptr) {
+    recv_buffer_->Reset();
+    LOG_ERROR("check codec func not set");
+    return;
+  }
+  if (package_length_ > recv_buffer_->length()) {
+    return;
+  }
+  uint32_t request_id = 0;
+  bool has_request_id = false;
+  Any check_out;
+  auto buff = recv_buffer_->Slice();
+  size_t package_length = 0;
+  auto result = check_(buff, check_out, request_id, has_request_id, package_length);
+  if (result < 0) {
+    package_length_ = 0;
+    LOG_ERROR("%s, check codec package result:%d", ToString().c_str(), result);
+    close();
+    return;
+  }
+  if (result == 0) {
+    package_length_ = package_length;
+    return;
+  }
+  ++read_package_number_;
+  package_length_ = 0;
+  recv_buffer_->Skip(result);
+  if (!has_request_id) {
+    auto it = request_list_.begin();
+    if (it == request_list_.end()) {
+      LOG_ERROR("%s, not find request", ToString().c_str());
+      return;
+    }
+    requestCallback(it->first, nullptr, &check_out);
+    return;
+  }
+  requestCallback(request_id, nullptr, &check_out);
+}
+
+void ClientConnection::requestCallback(uint32_t request_id, ErrorCode* err, Any* check_out) {
+  auto it = request_list_.find(request_id);
+  if (it == request_list_.end()) {
+    LOG_INFO("%srequest[%d] not find from request_list_", ToString().c_str(), request_id);
+    return;
+  }
+  auto req = &it->second;
+  req->deadline_->cancel();
+  if (err != nullptr) {
+    LOG_ERROR("%srequest[%d] error:%d, msg:%s", ToString().c_str(), request_id, err->Value(),
+              err->Message().c_str());
+    req->req_->promise_.SetFailed(*err);
+    request_list_.erase(it);
+    return;
+  }
+  if (check_out != nullptr) {
+    ResponseContext rsp;
+    BufferPtr* buff = any_cast<BufferPtr>(check_out);
+    if (buff != nullptr) {
+      req->req_->codec_->Decode(*buff, rsp.rsp_);
+    } else {
+      rsp.rsp_ = *check_out;
+    }
+    req->req_->promise_.SetValue(rsp);
+  } else {
+    req->req_->promise_.SetFailed(ErrorCode(err_code::kErrNetworkError, "response is null"));
+  }
+  request_list_.erase(it);
+}
+
+TransportRequest* ClientConnection::nextTransportRequest() {
+  uint32_t request_id;
+  TransportRequest* transport_req = nullptr;
+  while (!write_queue_.empty()) {
+    request_id = write_queue_.front();
+    write_queue_.pop_front();
+    auto it = request_list_.find(request_id);
+    if (it == request_list_.end()) {
+      continue;
+    }
+    transport_req = &it->second;
+    break;
+  }
+  return transport_req;
+}
+
+void ClientConnection::asyncWrite() {
+  if (IsStop()) {
+    return;
+  }
+  auto transport_req = nextTransportRequest();
+  if (transport_req == nullptr) {
+    return;
+  }
+  auto self = shared_from_this();
+  auto& req = transport_req->req_;
+  asio::async_write(
+      *socket_,
+      asio::buffer(transport_req->req_->buf_->data(), transport_req->req_->buf_->length()),
+      [self, this, req](std::error_code ec, std::size_t length) {
+        if (ec) {
+          close(&ec);
+          LOG_ERROR("[%s]async write error:%d, message:%s, category:%s", ToString().c_str(),
+                    ec.value(), ec.message().c_str(), ec.category().name());
+          return;
+        }
+        ++write_package_number_;
+        LOG_TRACE("[%s]async write done, request_id:%d", ToString().c_str(), req->request_id_);
+        asyncWrite();
+      });
+}
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
new file mode 100644
index 0000000..b6c7a36
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_CLIENT_CONNECTION_
+#define _TUBEMQ_CLIENT_CONNECTION_
+
+#include <stdlib.h>
+
+#include <chrono>
+#include <deque>
+#include <exception>
+#include <functional>
+#include <iostream>
+#include <list>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+
+#include "asio.hpp"
+#include "connection.h"
+#include "const_rpc.h"
+#include "executor_pool.h"
+#include "logger.h"
+#include "noncopyable.h"
+#include "transport.h"
+
+namespace tubemq {
+
+struct TransportRequest {
+  RequestContextPtr req_;
+  SteadyTimerPtr deadline_;
+};
+
+class ClientConnection : public Connection, public std::enable_shared_from_this<ClientConnection> {
+ public:
+  // executor: ExecutorPool.Get()
+  // endpoints: executor->CreateTcpResolver()->resolve("ip", port);
+  ClientConnection(ExecutorPtr& executor, const std::string& ip, uint16_t port)
+      : Connection(ip, port),
+        executor_(executor),
+        socket_(std::move(executor->CreateTcpSocket())),
+        recv_buffer_(std::make_shared<Buffer>(rpc_config::kRpcConnectInitBufferSize)),
+        deadline_(std::move(executor->CreateSteadyTimer())) {
+    auto endpoints = executor_->CreateTcpResolver()->resolve(ip_, std::to_string(port_));
+    connect(endpoints);
+  }
+
+  virtual ~ClientConnection() {}
+
+  virtual void AsyncWrite(RequestContextPtr& req);
+
+  virtual void Close();
+
+ private:
+  void requestTimeoutHandle(const std::error_code& ec, RequestContextPtr req);
+
+  void close(const std::error_code* err = nullptr);
+
+  void releaseAllRequest(const std::error_code* err = nullptr);
+  void connect(const asio::ip::tcp::resolver::results_type& endpoints);
+  void checkDeadline(const std::error_code& ec);
+  void contextString();
+  void asyncRead();
+  void checkPackageDone();
+  void requestCallback(uint32_t request_id, ErrorCode* err = nullptr, Any* check_out = nullptr);
+  TransportRequest* nextTransportRequest();
+  void asyncWrite();
+
+ private:
+  using BufferQueue = std::deque<uint32_t>;
+  static const uint32_t kConnnectMaxTimeMs{1000 * 20};  // ms
+  static const uint32_t kReadMaxTimeMs{1000 * 30};      // ms
+  ExecutorPtr executor_;
+  TcpSocketPtr socket_;
+  BufferPtr recv_buffer_;
+  BufferQueue write_queue_;
+  std::unordered_map<uint32_t, TransportRequest> request_list_;  // requestid->request context
+  SteadyTimerPtr deadline_;
+};
+using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
+
+}  // namespace tubemq
+
+#endif  // _TUBEMQ_CLIENT_CONNECTION_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
index c6e0aaa..11f5b42 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
@@ -1,296 +1,121 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "client_service.h"
-
-#include <sstream>
-
-#include "const_config.h"
-#include "logger.h"
-#include "utils.h"
-
-namespace tubemq {
-
-using std::lock_guard;
-using std::stringstream;
-
-BaseClient::BaseClient(bool is_producer) {
-  is_producer_ = is_producer;
-  client_index_ = tb_config::kInvalidValue;
-}
-
-BaseClient::~BaseClient() {
-  // no code
-}
-
-TubeMQService* TubeMQService::_instance = NULL;
-
-static mutex tubemq_mutex_service_;
-
-TubeMQService* TubeMQService::Instance() {
-  if (NULL == _instance) {
-    lock_guard<mutex> lck(tubemq_mutex_service_);
-    if (NULL == _instance) {
-      _instance = new TubeMQService;
-    }
-  }
-  return _instance;
-}
-
-TubeMQService::TubeMQService()
-    : timer_executor_(std::make_shared<ExecutorPool>(2)),
-      network_executor_(std::make_shared<ExecutorPool>(4)) {
-  service_status_.Set(0);
-  client_index_base_.Set(0);
-  last_check_time_ = 0;
-}
-
-TubeMQService::~TubeMQService() {
-  string err_info;
-  Stop(err_info);
-}
-
-bool TubeMQService::Start(string& err_info, string conf_file) {
-  // check configure file
-  bool result = false;
-  Fileini fileini;
-  string sector = "TubeMQ";
-
-  result = Utils::ValidConfigFile(err_info, conf_file);
-  if (!result) {
-    return result;
-  }
-  result = fileini.Loadini(err_info, conf_file);
-  if (!result) {
-    return result;
-  }
-  result = Utils::GetLocalIPV4Address(err_info, local_host_);
-  if (!result) {
-    return result;
-  }
-  if (!service_status_.CompareAndSet(0, 1)) {
-    err_info = "TubeMQ Service has startted or Stopped!";
-    return false;
-  }
-  iniLogger(fileini, sector);
-  iniPoolThreads(fileini, sector);
-  iniXfsThread(fileini, sector);
-  service_status_.Set(2);
-  err_info = "Ok!";
-  LOG_INFO("[TubeMQService] TubeMQ service startted!");
-
-  return true;
-}
-
-bool TubeMQService::Stop(string& err_info) {
-  if (service_status_.CompareAndSet(2, -1)) {
-    LOG_INFO("[TubeMQService] TubeMQ service begin to stop!");
-    if (dns_xfs_thread_.joinable()) {
-      dns_xfs_thread_.join();
-    }
-    shutDownClinets();
-    timer_executor_->Close();
-    network_executor_->Close();
-    connection_pool_ = nullptr;
-    thread_pool_ = nullptr;
-    LOG_INFO("[TubeMQService] TubeMQ service stopped!");
-  }
-  err_info = "OK!";
-  return true;
-}
-
-bool TubeMQService::IsRunning() { return (service_status_.Get() == 2); }
-
-void TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
-  string err_info;
-  int32_t log_num = 10;
-  int32_t log_size = 10;
-  int32_t log_level = 4;
-  string log_path = "../log/tubemq";
-  fileini.GetValue(err_info, sector, "log_num", log_num, 10);
-  fileini.GetValue(err_info, sector, "log_size", log_size, 100);
-  fileini.GetValue(err_info, sector, "log_path", log_path, "../log/tubemq");
-  fileini.GetValue(err_info, sector, "log_level", log_level, 4);
-  log_level = TUBEMQ_MID(log_level, 4, 0);
-  GetLogger().Init(log_path, Logger::Level(log_level), log_size, log_num);
-}
-
-void TubeMQService::iniXfsThread(const Fileini& fileini, const string& sector) {
-  string err_info;
-  int32_t dns_xfs_period_ms = 30 * 1000;
-  fileini.GetValue(err_info, sector, "dns_xfs_period_ms", dns_xfs_period_ms, 30 * 1000);
-  TUBEMQ_MID(dns_xfs_period_ms, tb_config::kMaxIntValue, 10000);
-  dns_xfs_thread_ = std::thread(&TubeMQService::thread_task_dnsxfs, this, dns_xfs_period_ms);
-}
-
-void TubeMQService::iniPoolThreads(const Fileini& fileini, const string& sector) {
-  string err_info;
-  int32_t timer_threads = 2;
-  int32_t network_threads = 4;
-  int32_t signal_threads = 8;
-  fileini.GetValue(err_info, sector, "timer_threads", timer_threads, 2);
-  TUBEMQ_MID(timer_threads, 50, 2);
-  fileini.GetValue(err_info, sector, "network_threads", network_threads, 4);
-  TUBEMQ_MID(network_threads, 50, 4);
-  fileini.GetValue(err_info, sector, "signal_threads", signal_threads, 8);
-  TUBEMQ_MID(signal_threads, 50, 4);
-  timer_executor_->Resize(timer_threads);
-  network_executor_->Resize(network_threads);
-  thread_pool_ = std::make_shared<ThreadPool>(signal_threads);
-  connection_pool_ = std::make_shared<ConnectionPool>(network_executor_);
-}
-
-int32_t TubeMQService::GetClientObjCnt() {
-  lock_guard<mutex> lck(mutex_);
-  return clients_map_.size();
-}
-
-bool TubeMQService::AddClientObj(string& err_info, BaseClient* client_obj) {
-  if (!IsRunning()) {
-    err_info = "Service not startted!";
-    return false;
-  }
-  int32_t client_index = client_index_base_.IncrementAndGet();
-  lock_guard<mutex> lck(mutex_);
-  clients_map_[client_index] = client_obj;
-  client_obj->SetClientIndex(client_index);
-  err_info = "Ok";
-  return true;
-}
-
-BaseClient* TubeMQService::GetClientObj(int32_t client_index) const {
-  BaseClient* client_obj = NULL;
-  map<int32_t, BaseClient*>::const_iterator it;
-
-  lock_guard<mutex> lck(mutex_);
-  it = clients_map_.find(client_index);
-  if (it != clients_map_.end()) {
-    client_obj = it->second;
-  }
-  return client_obj;
-}
-
-void TubeMQService::RmvClientObj(BaseClient* client_obj) {
-  map<int32_t, BaseClient*>::iterator it;
-  if (client_obj != NULL) {
-    lock_guard<mutex> lck(mutex_);
-    int32_t client_index = client_obj->GetClientIndex();
-    clients_map_.erase(client_index);
-    client_obj->SetClientIndex(tb_config::kInvalidValue);
-  }
-}
-
-void TubeMQService::shutDownClinets() const {
-  map<int32_t, BaseClient*>::const_iterator it;
-  lock_guard<mutex> lck(mutex_);
-  for (it = clients_map_.begin(); it != clients_map_.end(); it++) {
-    it->second->ShutDown();
-  }
-}
-
-bool TubeMQService::AddMasterAddress(string& err_info, const string& master_info) {
-  map<string, int32_t>::iterator it;
-  map<string, int32_t> tmp_addr_map;
-  Utils::Split(master_info, tmp_addr_map, delimiter::kDelimiterComma, delimiter::kDelimiterColon);
-  if (tmp_addr_map.empty()) {
-    err_info = "Illegal parameter: master_info is blank!";
-    return false;
-  }
-  for (it = tmp_addr_map.begin(); it != tmp_addr_map.end();) {
-    if (!Utils::NeedDnsXfs(it->first)) {
-      tmp_addr_map.erase(it++);
-    }
-  }
-  if (tmp_addr_map.empty()) {
-    err_info = "Ok";
-    return true;
-  }
-  if (addNeedDnsXfsAddr(tmp_addr_map)) {
-    updMasterAddrByDns();
-  }
-  err_info = "Ok";
-  return true;
-}
-
-void TubeMQService::GetXfsMasterAddress(const string& source, string& target) {
-  target = source;
-  lock_guard<mutex> lck(mutex_);
-  if (master_source_.find(source) != master_source_.end()) {
-    target = master_target_[source];
-  }
-}
-
-void TubeMQService::thread_task_dnsxfs(int dns_xfs_period_ms) {
-  LOG_INFO("[TubeMQService] DSN transfer thread startted!");
-  while (true) {
-    if (TubeMQService::Instance()->GetServiceStatus() <= 0) {
-      break;
-    }
-    if ((Utils::GetCurrentTimeMillis() - last_check_time_) >= dns_xfs_period_ms) {
-      TubeMQService::Instance()->updMasterAddrByDns();
-      last_check_time_ = Utils::GetCurrentTimeMillis();
-    }
-    std::this_thread::sleep_for(std::chrono::milliseconds(500));
-  }
-  LOG_INFO("[TubeMQService] DSN transfer thread stopped!");
-}
-
-bool TubeMQService::hasXfsTask(map<string, int32_t>& src_addr_map) {
-  lock_guard<mutex> lck(mutex_);
-  if (!master_source_.empty()) {
-    src_addr_map = master_source_;
-    return true;
-  }
-  return false;
-}
-
-bool TubeMQService::addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map) {
-  bool added = false;
-  map<string, int32_t>::iterator it;
-  if (!src_addr_map.empty()) {
-    lock_guard<mutex> lck(mutex_);
-    for (it = src_addr_map.begin(); it != src_addr_map.end(); it++) {
-      if (master_source_.find(it->first) == master_source_.end()) {
-        added = true;
-        master_source_[it->first] = it->second;
-      }
-    }
-  }
-  return added;
-}
-
-void TubeMQService::updMasterAddrByDns() {
-  map<string, int32_t> tmp_src_addr_map;
-  map<string, string> tmp_tgt_addr_map;
-  map<string, int32_t>::iterator it;
-  if (!hasXfsTask(tmp_src_addr_map)) {
-    return;
-  }
-  Utils::XfsAddrByDns(tmp_src_addr_map, tmp_tgt_addr_map);
-  lock_guard<mutex> lck(mutex_);
-  if (tmp_tgt_addr_map.empty()) {
-    for (it = tmp_src_addr_map.begin(); it != tmp_src_addr_map.end(); it++) {
-      master_target_[it->first] = it->first;
-    }
-  } else {
-    master_target_ = tmp_tgt_addr_map;
-  }
-}
-
-}  // namespace tubemq
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#ifndef TUBEMQ_CLIENT_BASE_CLIENT_H_
+#define TUBEMQ_CLIENT_BASE_CLIENT_H_
+
+#include <stdint.h>
+
+#include <map>
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "connection_pool.h"
+#include "file_ini.h"
+#include "noncopyable.h"
+#include "rmt_data_cache.h"
+#include "thread_pool.h"
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+namespace tubemq {
+
+using std::map;
+using std::mutex;
+using std::string;
+using std::thread;
+
+class BaseClient {
+ public:
+  explicit BaseClient(bool is_producer);
+  virtual ~BaseClient();
+  virtual void ShutDown() {}
+  void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
+  bool IsProducer() { return is_producer_; }
+  const int32_t GetClientIndex() { return client_index_; }
+
+ protected:
+  bool is_producer_;
+  int32_t client_index_;
+};
+
+class TubeMQService : public noncopyable {
+ public:
+  static TubeMQService* Instance();
+  bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
+  bool Stop(string& err_info);
+  bool IsRunning();
+  const int32_t GetServiceStatus() const { return service_status_.Get(); }
+  int32_t GetClientObjCnt();
+  bool AddClientObj(string& err_info, BaseClient* client_obj);
+  BaseClient* GetClientObj(int32_t client_index) const;
+  void RmvClientObj(BaseClient* client_obj);
+  const string& GetLocalHost() const { return local_host_; }
+  ExecutorPoolPtr GetTimerExecutorPool() { return timer_executor_; }
+  SteadyTimerPtr CreateTimer() { return timer_executor_->Get()->CreateSteadyTimer(); }
+  ExecutorPoolPtr GetNetWorkExecutorPool() { return network_executor_; }
+  ConnectionPoolPtr GetConnectionPool() { return connection_pool_; }
+  template <class function>
+  void Post(function f) {
+    if (thread_pool_ != nullptr) {
+      thread_pool_->Post(f);
+    }
+  }
+  bool AddMasterAddress(string& err_info, const string& master_info);
+  void GetXfsMasterAddress(const string& source, string& target);
+
+ protected:
+  void updMasterAddrByDns();
+
+ private:
+  TubeMQService();
+  ~TubeMQService();
+  void iniLogger(const Fileini& fileini, const string& sector);
+  void iniPoolThreads(const Fileini& fileini, const string& sector);
+  void iniXfsThread(const Fileini& fileini, const string& sector);
+  void thread_task_dnsxfs(int dns_xfs_period_ms);
+  void shutDownClinets() const;
+  bool hasXfsTask(map<string, int32_t>& src_addr_map);
+  bool addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map);
+
+ private:
+  static TubeMQService* _instance;
+  string local_host_;
+  AtomicInteger service_status_;
+  AtomicInteger client_index_base_;
+  mutable mutex mutex_;
+  map<int32_t, BaseClient*> clients_map_;
+  ExecutorPoolPtr timer_executor_;
+  ExecutorPoolPtr network_executor_;
+  ConnectionPoolPtr connection_pool_;
+  std::shared_ptr<ThreadPool> thread_pool_;
+  thread dns_xfs_thread_;
+  mutable mutex dns_mutex_;
+  int64_t last_check_time_;
+  map<string, int32_t> master_source_;
+  map<string, string> master_target_;
+};
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_BASE_CLIENT_H_
\ No newline at end of file
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
index a6c0fc1..8df82fd 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
@@ -17,56 +17,15 @@
  * under the License.
  */
 
-#include "tubemq/client_subinfo.h"
-#include "tubemq/const_config.h"
-#include "tubemq/utils.h"
+#include "client_subinfo.h"
 
+#include "const_config.h"
+#include "utils.h"
 
 
-namespace tubemq {
-
-
-MasterAddrInfo::MasterAddrInfo() {
-  curr_master_addr_ = "";
-  master_addr_.clear();
-}
-
-bool MasterAddrInfo::InitMasterAddress(string& err_info, 
-                                       const string& master_info) {
-  master_addr_.clear();
-  Utils::Split(master_info, master_addr_, delimiter::kDelimiterComma,
-               delimiter::kDelimiterColon);
-  if (master_addr_.empty()) {
-    err_info = "Illegal parameter: master_info is blank!";
-    return false;
-  }
-
-  map<string, int32_t>::iterator it = master_addr_.begin();
-  curr_master_addr_ = it->first;
-  err_info = "Ok";
-  return true;
-}
 
-void MasterAddrInfo::GetNextMasterAddr(string& ipaddr, int32_t& port) {
-  map<string, int32_t>::iterator it;
-  it = master_addr_.find(curr_master_addr_);
-  if(it != master_addr_.end()) {
-    it++;
-    if (it == master_addr_.end()) {
-      it = master_addr_.begin();
-    }
-  } else {
-    it = master_addr_.begin();
-  }
-  port   = it->second;
-  ipaddr = it->first;
-  curr_master_addr_ = it->first;
-}
+namespace tubemq {
 
-void MasterAddrInfo::GetCurrentMasterAddr(string& ipaddr, int32_t& port) {
-   ipaddr = curr_master_addr_;
-   port = master_addr_[curr_master_addr_];
-}
 
 
 ClientSubInfo::ClientSubInfo() {
@@ -154,13 +113,15 @@ bool ClientSubInfo::IsFilterConsume(const string& topic) {
 
 void ClientSubInfo::GetAssignedPartOffset(const string& partition_key, int64_t& offset) {
   map<string, int64_t>::iterator it;
-  if (first_registered_.Get() && bound_consume_ && not_allocated_.Get()) {
+  offset = tb_config::kInvalidValue;
+  if (!first_registered_.Get()
+    && bound_consume_
+    && not_allocated_.Get()) {
     it = assigned_part_map_.find(partition_key);
     if (it != assigned_part_map_.end()) {
       offset = it->second;
     }
   }
-  offset = tb_config::kInvalidValue;
 }
 
 const map<string, set<string> >& ClientSubInfo::GetTopicFilterMap() const {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.h b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.h
new file mode 100644
index 0000000..1b1e7a1
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.h
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_SUBINFO_H_
+#define TUBEMQ_CLIENT_SUBINFO_H_
+
+#include <stdint.h>
+#include <list>
+#include <map>
+#include <mutex>
+#include <set>
+#include <string>
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_config.h"
+
+namespace tubemq {
+
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+
+
+class ClientSubInfo {
+ public:
+  ClientSubInfo();
+  void SetConsumeTarget(const ConsumerConfig& config);
+  bool CompAndSetNotAllocated(bool expect, bool update);
+  void BookFstRegistered() { first_registered_.Set(true); }
+  bool IsBoundConsume() const { return bound_consume_; }
+  bool IsNotAllocated() const { return not_allocated_.Get(); }
+  const int64_t GetSubscribedTime() const { return subscribed_time_; }
+  const string& GetSessionKey() const { return session_key_; }
+  const uint32_t GetSourceCnt() const { return source_count_; }
+  bool SelectBig() { return select_big_; }
+  bool IsFilterConsume(const string& topic);
+  void GetAssignedPartOffset(const string& partition_key, int64_t& offset);
+  const string& GetBoundPartInfo() const { return bound_partions_; }
+  const list<string>& GetSubTopics() const { return topics_; }
+  const list<string>& GetTopicConds() const { return topic_conds_; }
+  const map<string, set<string> >& GetTopicFilterMap() const;
+
+ private:
+  bool bound_consume_;
+  AtomicBoolean first_registered_;
+  AtomicBoolean not_allocated_;
+  int64_t  subscribed_time_;
+  map<string, set<string> > topic_and_filter_map_;
+  list<string> topics_;
+  list<string> topic_conds_;
+  map<string, bool> topic_filter_map_;
+  // bound info
+  string session_key_;
+  uint32_t source_count_;
+  bool select_big_;
+  map<string, int64_t> assigned_part_map_;
+  string bound_partions_;
+};
+
+}  // namespace tubemq
+
+
+#endif  // TUBEMQ_CLIENT_SUBINFO_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/connection.h b/tubemq-client-twins/tubemq-client-cpp/src/connection.h
index 30db70b..d4423f5 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/connection.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/connection.h
@@ -50,7 +50,6 @@ class Connection : noncopyable {
       : connect_id_(unique_id_.Next()),
         status_(kConnecting),
         recv_time_(std::time(nullptr)),
-        package_length_(0),
         create_time_(std::time(nullptr)) {
     formatContextString();
   }
@@ -60,7 +59,6 @@ class Connection : noncopyable {
         connect_id_(unique_id_.Next()),
         status_(kConnecting),
         recv_time_(std::time(nullptr)),
-        package_length_(0),
         create_time_(std::time(nullptr)) {
     formatContextString();
   }
@@ -82,7 +80,13 @@ class Connection : noncopyable {
 
   inline std::time_t GetRecvTime() const { return recv_time_; }
 
-  inline const std::string& ToString() const { return context_string_; }
+  inline const std::string ToString() const {
+    std::stringstream stream;
+    stream << "[recvtime:" << recv_time_ << "]"
+           << "[read_package:" << read_package_number_ << "]"
+           << "[write_package:" << write_package_number_ << "]";
+    return context_string_ + stream.str();
+  }
 
  private:
   void formatContextString() {
@@ -101,7 +105,9 @@ class Connection : noncopyable {
   std::atomic<Status> status_;
   std::string context_string_;  // for log
   std::time_t recv_time_;
-  size_t package_length_;
+  size_t package_length_ = 0;
+  size_t read_package_number_ = 0;
+  size_t write_package_number_ = 0;
 
  private:
   std::time_t create_time_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/connection_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/connection_pool.h
index f3ec067..8ca1881 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/connection_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/connection_pool.h
@@ -40,6 +40,9 @@
 #include "transport.h"
 
 namespace tubemq {
+
+using std::string;
+
 class ConnectionPool : noncopyable {
  public:
   explicit ConnectionPool(ExecutorPoolPtr& executor_pool)
@@ -70,8 +73,8 @@ class ConnectionPool : noncopyable {
       }
       if (it->second->GetRecvTime() + rpc_config::kRpcInvalidConnectOverTime < std::time(nullptr)) {
         it->second->Close();
-        it = connection_pool_.erase(it);
         LOG_ERROR("connection pool clear overtime connect:%s", it->second->ToString().c_str());
+        it = connection_pool_.erase(it);
         continue;
       }
       ++it;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/const_config.h b/tubemq-client-twins/tubemq-client-cpp/src/const_config.h
new file mode 100644
index 0000000..d22f7e7
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/const_config.h
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_CONST_CONFIG_H_
+#define TUBEMQ_CLIENT_CONST_CONFIG_H_
+
+#include <stdint.h>
+
+#include <algorithm>
+#include <map>
+#include <string>
+
+namespace tubemq {
+
+using std::string;
+
+#define TUBEMQ_MAX(a, b) std::max((a), (b))
+#define TUBEMQ_MIN(a, b) std::min((a), (b))
+#define TUBEMQ_MID(data, max, min) TUBEMQ_MAX(min, TUBEMQ_MIN((max), (data)))
+
+// configuration value setting
+namespace tb_config {
+// rpc timeout define
+static const int32_t kRpcTimoutDefMs = 15000;
+static const int32_t kRpcTimoutMaxMs = 300000;
+static const int32_t kRpcTimoutMinMs = 8000;
+
+// heartbeat period define
+static const int32_t kHeartBeatPeriodDefMs = 10000;
+static const int32_t kHeartBeatFailRetryTimesDef = 5;
+static const int32_t kHeartBeatSleepPeriodDefMs = 60000;
+// max masterAddrInfo length
+static const int32_t kMasterAddrInfoMaxLength = 1024;
+
+// max TopicName length
+static const int32_t kTopicNameMaxLength = 64;
+// max Consume GroupName length
+static const int32_t kGroupNameMaxLength = 1024;
+// max filter item length
+static const int32_t kFilterItemMaxLength = 256;
+// max allowed filter item count
+static const int32_t kFilterItemMaxCount = 500;
+// max session key length
+static const int32_t kSessionKeyMaxLength = 1024;
+
+// max subscribe info report times
+static const int32_t kSubInfoReportMaxIntervalTimes = 6;
+// default message not found response wait period
+static const int32_t kMsgNotfoundWaitPeriodMsDef = 400;
+// default confirm wait period if rebalance meeting
+static const int32_t kRebConfirmWaitPeriodMsDef = 3000;
+// max confirm wait period anyway
+static const int32_t kConfirmWaitPeriodMsMax = 60000;
+// default rebalance wait if shutdown meeting
+static const int32_t kRebWaitPeriodWhenShutdownMs = 10000;
+// default partition status check period
+static const int32_t kMaxPartCheckPeriodMsDef = 60 * 1000;
+// default partition status check slice
+static const int32_t kPartCheckSliceMsDef = 300;
+
+// max int value
+static const int32_t kMaxIntValue = 0x7fffffff;
+// max long value
+static const int64_t kMaxLongValue = 0x7fffffffffffffffL;
+
+// default broker port
+static const uint32_t kBrokerPortDef = 8123;
+// default broker TLS port
+static const uint32_t kBrokerTlsPortDef = 8124;
+
+// invalid value
+static const int32_t kInvalidValue = -2;
+
+// message flag's properties settings
+static const int32_t kMsgFlagIncProperties = 0x01;
+
+// reserved property key Filter Item
+static const char kRsvPropKeyFilterItem[] = "$msgType$";
+// reserved property key message send time
+static const char kRsvPropKeyMsgTime[] = "$msgTime$";
+
+}  // namespace tb_config
+
+namespace delimiter {
+static const char kDelimiterDot[] = ".";
+static const char kDelimiterEqual[] = "=";
+static const char kDelimiterAnd[] = "&";
+static const char kDelimiterComma[] = ",";
+static const char kDelimiterColon[] = ":";
+static const char kDelimiterAt[] = "@";
+static const char kDelimiterPound[] = "#";
+static const char kDelimiterSemicolon[] = ";";
+// Double slash
+static const char kDelimiterDbSlash[] = "//";
+// left square bracket
+static const char kDelimiterLftSB[] = "[";
+// right square bracket
+static const char kDelimiterRgtSB[] = "]";
+
+}  // namespace delimiter
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_CONST_CONFIG_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/const_rpc.h b/tubemq-client-twins/tubemq-client-cpp/src/const_rpc.h
new file mode 100644
index 0000000..313d536
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/const_rpc.h
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_CONST_RPC_H_
+#define TUBEMQ_CLIENT_CONST_RPC_H_
+
+namespace tubemq {
+
+#include <stdint.h>
+
+namespace rpc_config {
+
+// constant define
+static const uint32_t kRpcPrtBeginToken = 0xFF7FF4FE;
+static const uint32_t kRpcMaxBufferSize = 8192;
+static const uint32_t kRpcMaxFrameListCnt = (uint32_t)((1024 * 1024 * 8) / kRpcMaxBufferSize);
+
+// rpc protocol version
+static const uint32_t kRpcProtocolVersion = 2;
+
+// rps network const
+static const uint32_t kRpcConnectInitBufferSize = 64 * 1024;
+static const uint32_t kRpcEnsureWriteableBytes = 16 * 1024;
+static constexpr uint32_t kRpcRecvBufferMaxBytes =
+    uint32_t(kRpcMaxFrameListCnt * kRpcMaxBufferSize * 2 + 1024);
+static const uint32_t kRpcInvalidConnectOverTime = 60 * 3;  // second
+
+// msg type flag
+static const int32_t kRpcFlagMsgRequest = 0x0;
+static const int32_t kRpcFlagMsgResponse = 0x1;
+
+// service type
+static const int32_t kMasterService = 1;
+static const int32_t kBrokerReadService = 2;
+static const int32_t kBrokerWriteService = 3;
+static const int32_t kBrokerAdminService = 4;
+static const int32_t kMasterAdminService = 5;
+
+// request method
+// master rpc method
+static const int32_t kMasterMethoddProducerRegister = 1;
+static const int32_t kMasterMethoddProducerHeatbeat = 2;
+static const int32_t kMasterMethoddProducerClose = 3;
+static const int32_t kMasterMethoddConsumerRegister = 4;
+static const int32_t kMasterMethoddConsumerHeatbeat = 5;
+static const int32_t kMasterMethoddConsumerClose = 6;
+
+// broker rpc method
+static const int32_t kBrokerMethoddProducerRegister = 11;
+static const int32_t kBrokerMethoddProducerHeatbeat = 12;
+static const int32_t kBrokerMethoddProducerSendMsg = 13;
+static const int32_t kBrokerMethoddProducerClose = 14;
+static const int32_t kBrokerMethoddConsumerRegister = 15;
+static const int32_t kBrokerMethoddConsumerHeatbeat = 16;
+static const int32_t kBrokerMethoddConsumerGetMsg = 17;
+static const int32_t kBrokerMethoddConsumerCommit = 18;
+static const int32_t kBrokerMethoddConsumerClose = 19;
+static const int32_t kMethodInvalid = 99999;
+
+// register operate type
+static const int32_t kRegOpTypeRegister = 31;
+static const int32_t kRegOpTypeUnReg = 32;
+
+// rpc connect node timeout
+static const int32_t kRpcConnectTimeoutMs = 3000;
+
+static const int32_t kConsumeStatusNormal = 0;
+static const int32_t kConsumeStatusFromMax = 1;
+static const int32_t kConsumeStatusFromMaxAlways = 2;
+
+}  // namespace rpc_config
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_CONST_RPC_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc
index a2536ae..71a84ac 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc
@@ -17,11 +17,13 @@
  * under the License.
  */
 
-#include "tubemq/executor_pool.h"
+#include "executor_pool.h"
 
-#include <asio.hpp>
 #include <functional>
 #include <memory>
+#include <asio.hpp>
+
+
 
 namespace tubemq {
 
@@ -39,19 +41,20 @@ Executor::~Executor() {
 
 void Executor::StartWorker(std::shared_ptr<asio::io_context> io_context) { io_context_->run(); }
 
-SocketPtr Executor::CreateSocket() { return SocketPtr(new asio::ip::tcp::socket(*io_context_)); }
+TcpSocketPtr Executor::CreateTcpSocket() {
+  return std::make_shared<asio::ip::tcp::socket>(*io_context_);
+}
 
-TlsSocketPtr Executor::CreateTlsSocket(SocketPtr &socket, asio::ssl::context &ctx) {
-  return std::shared_ptr<asio::ssl::stream<asio::ip::tcp::socket &> >(
-      new asio::ssl::stream<asio::ip::tcp::socket &>(*socket, ctx));
+TlsSocketPtr Executor::CreateTlsSocket(TcpSocketPtr &socket, asio::ssl::context &ctx) {
+  return std::make_shared<asio::ssl::stream<asio::ip::tcp::socket &>>(*socket, ctx);
 }
 
 TcpResolverPtr Executor::CreateTcpResolver() {
-  return TcpResolverPtr(new asio::ip::tcp::resolver(*io_context_));
+  return std::make_shared<asio::ip::tcp::resolver>(*io_context_);
 }
 
 SteadyTimerPtr Executor::CreateSteadyTimer() {
-  return SteadyTimerPtr(new asio::steady_timer(*io_context_));
+  return std::make_shared<asio::steady_timer>(*io_context_);
 }
 
 void Executor::Close() {
@@ -61,8 +64,6 @@ void Executor::Close() {
   }
 }
 
-void Executor::Post(Executor::func task) { io_context_->post(task); }
-
 ExecutorPool::ExecutorPool(int nthreads) : executors_(nthreads), executorIdx_(0), mutex_() {}
 
 ExecutorPtr ExecutorPool::Get() {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.h
new file mode 100644
index 0000000..e858985
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.h
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_EXECUTOR_POOL_
+#define _TUBEMQ_EXECUTOR_POOL_
+
+#include <stdlib.h>
+
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ssl.hpp>
+#include "noncopyable.h"
+
+namespace tubemq {
+
+using TcpSocketPtr = std::shared_ptr<asio::ip::tcp::socket>;
+using TlsSocketPtr = std::shared_ptr<asio::ssl::stream<asio::ip::tcp::socket &> >;
+using TcpResolverPtr = std::shared_ptr<asio::ip::tcp::resolver>;
+using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
+
+class Executor : noncopyable {
+ public:
+  Executor();
+  ~Executor();
+
+  TcpSocketPtr CreateTcpSocket();
+  TlsSocketPtr CreateTlsSocket(TcpSocketPtr &socket, asio::ssl::context &ctx);
+  TcpResolverPtr CreateTcpResolver();
+  SteadyTimerPtr CreateSteadyTimer();
+
+  inline void Post(std::function<void(void)> task) { io_context_->post(task); }
+
+  std::shared_ptr<asio::io_context> GetIoContext() { return io_context_; }
+
+  // Close executor and exit thread
+  void Close();
+
+ private:
+  void StartWorker(std::shared_ptr<asio::io_context> io_context);
+  std::shared_ptr<asio::io_context> io_context_;
+  using io_context_work = asio::executor_work_guard<asio::io_context::executor_type>;
+  io_context_work work_;
+  std::thread worker_;
+};
+
+typedef std::shared_ptr<Executor> ExecutorPtr;
+
+class ExecutorPool : noncopyable {
+ public:
+  explicit ExecutorPool(int nthreads = 2);
+
+  ExecutorPtr Get();
+
+  // Resize executor thread
+  void Resize(int nthreads) {
+    Lock lock(mutex_);
+    executors_.resize(nthreads);
+  }
+
+  void Close();
+
+ private:
+  typedef std::vector<ExecutorPtr> ExecutorList;
+  ExecutorList executors_;
+  uint32_t executorIdx_;
+  std::mutex mutex_;
+  typedef std::unique_lock<std::mutex> Lock;
+};
+
+typedef std::shared_ptr<ExecutorPool> ExecutorPoolPtr;
+
+}  // namespace tubemq
+
+#endif  // _TUBEMQ_EXECUTOR_POOL_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
index 9af90fc..09cc76d 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
@@ -17,15 +17,15 @@
  * under the License.
  */
 
-#include "tubemq/file_ini.h"
+#include "file_ini.h"
 
 #include <stdlib.h>
 
 #include <fstream>
 #include <sstream>
 
-#include "tubemq/const_config.h"
-#include "tubemq/utils.h"
+#include "const_config.h"
+#include "utils.h"
 
 namespace tubemq {
 
@@ -34,13 +34,13 @@ using std::ifstream;
 
 
 Fileini::Fileini() {
-  this->init_flag_ = false;
-  this->ini_map_.clear();
+  init_flag_ = false;
+  ini_map_.clear();
 }
 
 Fileini::~Fileini() {
-  this->init_flag_ = false;
-  this->ini_map_.clear();
+  init_flag_ = false;
+  ini_map_.clear();
 }
 
 bool Fileini::Loadini(string& err_info, const string& file_name) {
@@ -95,8 +95,8 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
       continue;
     }
     map<string, map<string, string> >::iterator it_sec;
-    it_sec = this->ini_map_.find(sector);
-    if (it_sec == this->ini_map_.end()) {
+    it_sec = ini_map_.find(sector);
+    if (it_sec == ini_map_.end()) {
       map<string, string> tmp_key_val_map;
       tmp_key_val_map[key] = value;
       ini_map_[sector] = tmp_key_val_map;
@@ -108,7 +108,7 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
   conf_file.close();
   conf_file.clear();
   // set parser status
-  this->init_flag_ = true;
+  init_flag_ = true;
   // end
   err_info = "Ok";
   return true;
@@ -116,7 +116,7 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
 
 bool Fileini::GetValue(string& err_info, const string& sector, const string& key, string& value,
                        const string& def) const {
-  if (!this->init_flag_) {
+  if (!init_flag_) {
     err_info = "Please load configure file first!";
     return false;
   }
@@ -125,8 +125,8 @@ bool Fileini::GetValue(string& err_info, const string& sector, const string& key
   // search key's value in sector
   map<string, map<string, string> >::const_iterator it_sec;
   map<string, string>::const_iterator it_keyval;
-  it_sec = this->ini_map_.find(sector);
-  if (it_sec == this->ini_map_.end()) {
+  it_sec = ini_map_.find(sector);
+  if (it_sec == ini_map_.end()) {
     value = def;
     return true;
   }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.h b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.h
new file mode 100644
index 0000000..1bede63
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.h
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_FILE_INI_H_
+#define TUBEMQ_CLIENT_FILE_INI_H_
+
+#include <stdint.h>
+
+#include <map>
+#include <string>
+
+namespace tubemq {
+
+using std::map;
+using std::string;
+
+class Fileini {
+ public:
+  Fileini();
+  ~Fileini();
+  bool Loadini(string& err_info, const string& file_name);
+  bool GetValue(string& err_info, const string& sector, const string& key,
+                    string& value, const string& def) const;
+  bool GetValue(string& err_info, const string& sector, const string& key,
+                   int32_t& value, int32_t def) const;
+
+ private:
+  bool init_flag_;
+  // sector        key    value
+  map<string, map<string, string> > ini_map_;
+};
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_FILE_INI_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
index ef3357f..2b96f52 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include "tubemq/flowctrl_def.h"
+#include "flowctrl_def.h"
 
 #include <stdio.h>
 #include <time.h>
@@ -25,9 +25,9 @@
 
 #include <sstream>
 
-#include "tubemq/const_config.h"
-#include "tubemq/logger.h"
-#include "tubemq/utils.h"
+#include "const_config.h"
+#include "logger.h"
+#include "utils.h"
 
 namespace tubemq {
 
@@ -35,172 +35,172 @@ using std::stringstream;
 using std::lock_guard;
 
 FlowCtrlResult::FlowCtrlResult() {
-  this->datasize_limit_ = tb_config::kMaxIntValue;
-  this->freqms_limit_ = 0;
+  datasize_limit_ = tb_config::kMaxIntValue;
+  freqms_limit_ = 0;
 }
 
 FlowCtrlResult::FlowCtrlResult(int64_t datasize_limit, int32_t freqms_limit) {
-  this->datasize_limit_ = datasize_limit;
-  this->freqms_limit_ = freqms_limit;
+  datasize_limit_ = datasize_limit;
+  freqms_limit_ = freqms_limit;
 }
 
 FlowCtrlResult& FlowCtrlResult::operator=(const FlowCtrlResult& target) {
   if (this == &target) return *this;
-  this->datasize_limit_ = target.datasize_limit_;
-  this->freqms_limit_ = target.freqms_limit_;
+  datasize_limit_ = target.datasize_limit_;
+  freqms_limit_ = target.freqms_limit_;
   return *this;
 }
 
 void FlowCtrlResult::SetDataDltAndFreqLimit(int64_t datasize_limit, int32_t freqms_limit) {
-  this->datasize_limit_ = datasize_limit;
-  this->freqms_limit_ = freqms_limit;
+  datasize_limit_ = datasize_limit;
+  freqms_limit_ = freqms_limit;
 }
 
 void FlowCtrlResult::SetDataSizeLimit(int64_t datasize_limit) {
-  this->datasize_limit_ = datasize_limit;
+  datasize_limit_ = datasize_limit;
 }
 
-void FlowCtrlResult::SetFreqMsLimit(int32_t freqms_limit) { this->freqms_limit_ = freqms_limit; }
+void FlowCtrlResult::SetFreqMsLimit(int32_t freqms_limit) { freqms_limit_ = freqms_limit; }
 
-int64_t FlowCtrlResult::GetDataSizeLimit() { return this->datasize_limit_; }
+int64_t FlowCtrlResult::GetDataSizeLimit() { return datasize_limit_; }
 
-int32_t FlowCtrlResult::GetFreqMsLimit() { return this->freqms_limit_; }
+int32_t FlowCtrlResult::GetFreqMsLimit() { return freqms_limit_; }
 
 FlowCtrlItem::FlowCtrlItem() {
-  this->type_ = 0;
-  this->start_time_ = 2500;
-  this->end_time_ = tb_config::kInvalidValue;
-  this->datadlt_m_ = tb_config::kInvalidValue;
-  this->datasize_limit_ = tb_config::kInvalidValue;
-  this->freqms_limit_ = tb_config::kInvalidValue;
-  this->zero_cnt_ = tb_config::kInvalidValue;
+  type_ = 0;
+  start_time_ = 2500;
+  end_time_ = tb_config::kInvalidValue;
+  datadlt_m_ = tb_config::kInvalidValue;
+  datasize_limit_ = tb_config::kInvalidValue;
+  freqms_limit_ = tb_config::kInvalidValue;
+  zero_cnt_ = tb_config::kInvalidValue;
 }
 
 FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t zero_cnt, int32_t freqms_limit) {
-  this->type_ = type;
-  this->start_time_ = 2500;
-  this->end_time_ = tb_config::kInvalidValue;
-  this->datadlt_m_ = tb_config::kInvalidValue;
-  this->datasize_limit_ = tb_config::kInvalidValue;
-  this->freqms_limit_ = freqms_limit;
-  this->zero_cnt_ = zero_cnt;
+  type_ = type;
+  start_time_ = 2500;
+  end_time_ = tb_config::kInvalidValue;
+  datadlt_m_ = tb_config::kInvalidValue;
+  datasize_limit_ = tb_config::kInvalidValue;
+  freqms_limit_ = freqms_limit;
+  zero_cnt_ = zero_cnt;
 }
 
 FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
                            int32_t min_data_filter_freqms) {
-  this->type_ = type;
-  this->start_time_ = 2500;
-  this->end_time_ = tb_config::kInvalidValue;
-  this->datadlt_m_ = tb_config::kInvalidValue;
-  this->datasize_limit_ = datasize_limit;
-  this->freqms_limit_ = freqms_limit;
-  this->zero_cnt_ = min_data_filter_freqms;
+  type_ = type;
+  start_time_ = 2500;
+  end_time_ = tb_config::kInvalidValue;
+  datadlt_m_ = tb_config::kInvalidValue;
+  datasize_limit_ = datasize_limit;
+  freqms_limit_ = freqms_limit;
+  zero_cnt_ = min_data_filter_freqms;
 }
 
 FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t start_time, int32_t end_time, int64_t datadlt_m,
                            int64_t datasize_limit, int32_t freqms_limit) {
-  this->type_ = type;
-  this->start_time_ = start_time;
-  this->end_time_ = end_time;
-  this->datadlt_m_ = datadlt_m;
-  this->datasize_limit_ = datasize_limit;
-  this->freqms_limit_ = freqms_limit;
-  this->zero_cnt_ = tb_config::kInvalidValue;
+  type_ = type;
+  start_time_ = start_time;
+  end_time_ = end_time;
+  datadlt_m_ = datadlt_m;
+  datasize_limit_ = datasize_limit;
+  freqms_limit_ = freqms_limit;
+  zero_cnt_ = tb_config::kInvalidValue;
 }
 
 FlowCtrlItem& FlowCtrlItem::operator=(const FlowCtrlItem& target) {
   if (this == &target) return *this;
-  this->type_ = target.type_;
-  this->start_time_ = target.start_time_;
-  this->end_time_ = target.end_time_;
-  this->datadlt_m_ = target.datadlt_m_;
-  this->datasize_limit_ = target.datasize_limit_;
-  this->freqms_limit_ = target.freqms_limit_;
-  this->zero_cnt_ = target.zero_cnt_;
+  type_ = target.type_;
+  start_time_ = target.start_time_;
+  end_time_ = target.end_time_;
+  datadlt_m_ = target.datadlt_m_;
+  datasize_limit_ = target.datasize_limit_;
+  freqms_limit_ = target.freqms_limit_;
+  zero_cnt_ = target.zero_cnt_;
   return *this;
 }
 
 int32_t FlowCtrlItem::GetFreLimit(int32_t msg_zero_cnt) const {
-  if (this->type_ != 1) {
+  if (type_ != 1) {
     return -1;
   }
-  if (msg_zero_cnt >= this->zero_cnt_) {
-    return this->freqms_limit_;
+  if (msg_zero_cnt >= zero_cnt_) {
+    return freqms_limit_;
   }
   return -1;
 }
 
 void FlowCtrlItem::ResetFlowCtrlValue(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
                                       int32_t min_data_filter_freqms) {
-  this->type_ = type;
-  this->start_time_ = 2500;
-  this->end_time_ = tb_config::kInvalidValue;
-  this->datadlt_m_ = tb_config::kInvalidValue;
-  this->datasize_limit_ = datasize_limit;
-  this->freqms_limit_ = freqms_limit;
-  this->zero_cnt_ = min_data_filter_freqms;
+  type_ = type;
+  start_time_ = 2500;
+  end_time_ = tb_config::kInvalidValue;
+  datadlt_m_ = tb_config::kInvalidValue;
+  datasize_limit_ = datasize_limit;
+  freqms_limit_ = freqms_limit;
+  zero_cnt_ = min_data_filter_freqms;
 }
 
 void FlowCtrlItem::Clear() {
-  this->type_ = 0;
-  this->start_time_ = 2500;
-  this->end_time_ = tb_config::kInvalidValue;
-  this->datadlt_m_ = tb_config::kInvalidValue;
-  this->datasize_limit_ = tb_config::kInvalidValue;
-  this->freqms_limit_ = tb_config::kInvalidValue;
-  this->zero_cnt_ = tb_config::kInvalidValue;
+  type_ = 0;
+  start_time_ = 2500;
+  end_time_ = tb_config::kInvalidValue;
+  datadlt_m_ = tb_config::kInvalidValue;
+  datasize_limit_ = tb_config::kInvalidValue;
+  freqms_limit_ = tb_config::kInvalidValue;
+  zero_cnt_ = tb_config::kInvalidValue;
 }
 
 bool FlowCtrlItem::GetDataLimit(int64_t datadlt_m, int32_t curr_time,
                                 FlowCtrlResult& flowctrl_result) const {
-  if (this->type_ != 0 || datadlt_m <= this->datadlt_m_) {
+  if (type_ != 0 || datadlt_m <= datadlt_m_) {
     return false;
   }
-  if (curr_time < this->start_time_ || curr_time > this->end_time_) {
+  if (curr_time < start_time_ || curr_time > end_time_) {
     return false;
   }
-  flowctrl_result.SetDataDltAndFreqLimit(this->datasize_limit_, this->freqms_limit_);
+  flowctrl_result.SetDataDltAndFreqLimit(datasize_limit_, freqms_limit_);
   return true;
 }
 
 FlowCtrlRuleHandler::FlowCtrlRuleHandler() {
-  this->flowctrl_id_.GetAndSet(tb_config::kInvalidValue);
-  this->flowctrl_info_ = "";
-  this->min_zero_cnt_.Set(tb_config::kMaxIntValue);
-  this->qrypriority_id_.Set(tb_config::kInvalidValue);
-  this->min_datadlt_limt_.Set(tb_config::kMaxLongValue);
-  this->datalimit_start_time_.Set(2500);
-  this->datalimit_end_time_.Set(tb_config::kInvalidValue);
-  this->last_update_time_ = Utils::GetCurrentTimeMillis();
+  flowctrl_id_.GetAndSet(tb_config::kInvalidValue);
+  flowctrl_info_ = "";
+  min_zero_cnt_.Set(tb_config::kMaxIntValue);
+  qrypriority_id_.Set(tb_config::kInvalidValue);
+  min_datadlt_limt_.Set(tb_config::kMaxLongValue);
+  datalimit_start_time_.Set(2500);
+  datalimit_end_time_.Set(tb_config::kInvalidValue);
+  last_update_time_ = Utils::GetCurrentTimeMillis();
 }
 
 FlowCtrlRuleHandler::~FlowCtrlRuleHandler() {
-  // 
+  //
 }
 
 void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id,
                                                 int64_t flowctrl_id, const string& flowctrl_info) {
   map<int32_t, vector<FlowCtrlItem> > tmp_flowctrl_map;
-  if (flowctrl_id == this->flowctrl_id_.Get()) {
+  if (flowctrl_id == flowctrl_id_.Get()) {
     return;
   }
-  int64_t curr_flowctrl_id = this->flowctrl_id_.Get();
+  int64_t curr_flowctrl_id = flowctrl_id_.Get();
   if (flowctrl_info.length() > 0) {
     parseFlowCtrlInfo(flowctrl_info, tmp_flowctrl_map);
   }
   lock_guard<mutex> lck(config_lock_);
-  this->flowctrl_id_.Set(flowctrl_id);
-  this->qrypriority_id_.Set(qrypriority_id);
+  flowctrl_id_.Set(flowctrl_id);
+  qrypriority_id_.Set(qrypriority_id);
   clearStatisData();
   if (tmp_flowctrl_map.empty()) {
-    this->flowctrl_rules_.clear();
-    this->flowctrl_info_ = "";
+    flowctrl_rules_.clear();
+    flowctrl_info_ = "";
   } else {
-    this->flowctrl_rules_ = tmp_flowctrl_map;
-    this->flowctrl_info_ = flowctrl_info;
+    flowctrl_rules_ = tmp_flowctrl_map;
+    flowctrl_info_ = flowctrl_info;
     initialStatisData();
   }
-  this->last_update_time_ = Utils::GetCurrentTimeMillis();
+  last_update_time_ = Utils::GetCurrentTimeMillis();
   if (is_default) {
     LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
              flowctrl_id);
@@ -215,55 +215,55 @@ void FlowCtrlRuleHandler::initialStatisData() {
   vector<FlowCtrlItem>::iterator it_vec;
   map<int, vector<FlowCtrlItem> >::iterator it_map;
 
-  it_map = this->flowctrl_rules_.find(0);
-  if (it_map != this->flowctrl_rules_.end()) {
+  it_map = flowctrl_rules_.find(0);
+  if (it_map != flowctrl_rules_.end()) {
     for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
       if (it_vec->GetType() != 0) {
         continue;
       }
 
-      if (it_vec->GetDltInM() < this->min_datadlt_limt_.Get()) {
-        this->min_datadlt_limt_.Set(it_vec->GetDltInM());
+      if (it_vec->GetDltInM() < min_datadlt_limt_.Get()) {
+        min_datadlt_limt_.Set(it_vec->GetDltInM());
       }
-      if (it_vec->GetStartTime() < this->datalimit_start_time_.Get()) {
-        this->datalimit_start_time_.Set(it_vec->GetStartTime());
+      if (it_vec->GetStartTime() < datalimit_start_time_.Get()) {
+        datalimit_start_time_.Set(it_vec->GetStartTime());
       }
-      if (it_vec->GetEndTime() > this->datalimit_end_time_.Get()) {
-        this->datalimit_end_time_.Set(it_vec->GetEndTime());
+      if (it_vec->GetEndTime() > datalimit_end_time_.Get()) {
+        datalimit_end_time_.Set(it_vec->GetEndTime());
       }
     }
   }
-  it_map = this->flowctrl_rules_.find(1);
-  if (it_map != this->flowctrl_rules_.end()) {
+  it_map = flowctrl_rules_.find(1);
+  if (it_map != flowctrl_rules_.end()) {
     for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
       if (it_vec->GetType() != 1) {
         continue;
       }
-      if (it_vec->GetZeroCnt() < this->min_zero_cnt_.Get()) {
-        this->min_zero_cnt_.Set(it_vec->GetZeroCnt());
+      if (it_vec->GetZeroCnt() < min_zero_cnt_.Get()) {
+        min_zero_cnt_.Set(it_vec->GetZeroCnt());
       }
     }
   }
-  it_map = this->flowctrl_rules_.find(3);
-  if (it_map != this->flowctrl_rules_.end()) {
+  it_map = flowctrl_rules_.find(3);
+  if (it_map != flowctrl_rules_.end()) {
     for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
       if (it_vec->GetType() != 3) {
         continue;
       }
       it_vec->GetDataSizeLimit();
-      this->filter_ctrl_item_.ResetFlowCtrlValue(3, (int)(it_vec->GetDataSizeLimit()),
+      filter_ctrl_item_.ResetFlowCtrlValue(3, (int32_t)(it_vec->GetDataSizeLimit()),
                                                  it_vec->GetFreqMsLimit(), it_vec->GetZeroCnt());
     }
   }
 }
 
 void FlowCtrlRuleHandler::clearStatisData() {
-  this->min_zero_cnt_.GetAndSet(tb_config::kMaxIntValue);
-  this->min_datadlt_limt_.GetAndSet(tb_config::kMaxLongValue);
-  this->qrypriority_id_.Set(tb_config::kInvalidValue);
-  this->datalimit_start_time_.Set(2500);
-  this->datalimit_end_time_.Set(tb_config::kInvalidValue);
-  this->filter_ctrl_item_.Clear();
+  min_zero_cnt_.GetAndSet(tb_config::kMaxIntValue);
+  min_datadlt_limt_.GetAndSet(tb_config::kMaxLongValue);
+  qrypriority_id_.Set(tb_config::kInvalidValue);
+  datalimit_start_time_.Set(2500);
+  datalimit_end_time_.Set(tb_config::kInvalidValue);
+  filter_ctrl_item_.Clear();
 }
 
 bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt,
@@ -276,16 +276,16 @@ bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt,
   time_t cur_time = time(NULL);
   gmtime_r(&cur_time, &utc_tm);
   int curr_time = (utc_tm.tm_hour + 8) % 24 * 100 + utc_tm.tm_min;
-  if ((last_datadlt < this->min_datadlt_limt_.Get())
-      || (curr_time < this->datalimit_start_time_.Get())
-      || (curr_time > this->datalimit_end_time_.Get())) {
+  if ((last_datadlt < min_datadlt_limt_.Get())
+      || (curr_time < datalimit_start_time_.Get())
+      || (curr_time > datalimit_end_time_.Get())) {
     return false;
   }
   // search total flowctrl rule
   lock_guard<mutex> lck(config_lock_);
-  it_map = this->flowctrl_rules_.find(0);
-  if (it_map != this->flowctrl_rules_.end()) {
-    for (it_vec = it_map->second.begin();it_vec != it_map->second.end(); ++it_vec) {
+  it_map = flowctrl_rules_.find(0);
+  if (it_map != flowctrl_rules_.end()) {
+    for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
       if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
         result = true;
         break;
@@ -301,13 +301,13 @@ int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt,
   vector<FlowCtrlItem>::const_iterator it_vec;
   map<int, vector<FlowCtrlItem> >::const_iterator it_map;
   // check min zero count
-  if (msg_zero_cnt < this->min_zero_cnt_.Get()) {
+  if (msg_zero_cnt < min_zero_cnt_.Get()) {
     return limit_data;
   }
   // search rule allow value
   lock_guard<mutex> lck(config_lock_);
-  it_map = this->flowctrl_rules_.find(1);
-  if (it_map != this->flowctrl_rules_.end()) {
+  it_map = flowctrl_rules_.find(1);
+  if (it_map != flowctrl_rules_.end()) {
     for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
       limit_data = it_vec->GetFreLimit(msg_zero_cnt);
       if (limit_data >= 0) {
@@ -321,17 +321,15 @@ int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt,
 void FlowCtrlRuleHandler::GetFilterCtrlItem(FlowCtrlItem& result) const {
   result.Clear();
   lock_guard<mutex> lck(config_lock_);
-  result = this->filter_ctrl_item_;
+  result = filter_ctrl_item_;
 }
 
 void FlowCtrlRuleHandler::GetFlowCtrlInfo(string& flowctrl_info) const {
   flowctrl_info.clear();
   lock_guard<mutex> lck(config_lock_);
-  flowctrl_info = this->flowctrl_info_;
+  flowctrl_info = flowctrl_info_;
 }
 
-
-
 bool FlowCtrlRuleHandler::compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2) {
   if (o1.GetStartTime() >= o2.GetStartTime()) {
     return true;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.h
new file mode 100644
index 0000000..0131c6f
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.h
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_FLOW_CONTROL_H_
+#define TUBEMQ_CLIENT_FLOW_CONTROL_H_
+
+#include <rapidjson/document.h>
+#include <stdint.h>
+
+#include <algorithm>
+#include <list>
+#include <map>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "tubemq/tubemq_atomic.h"
+
+namespace tubemq {
+
+using std::map;
+using std::mutex;
+using std::string;
+using std::vector;
+
+class FlowCtrlResult {
+ public:
+  FlowCtrlResult();
+  FlowCtrlResult(int64_t datasize_limit, int32_t freqms_limit);
+  FlowCtrlResult& operator=(const FlowCtrlResult& target);
+  void SetDataDltAndFreqLimit(int64_t datasize_limit, int32_t freqms_limit);
+  void SetDataSizeLimit(int64_t datasize_limit);
+  void SetFreqMsLimit(int32_t freqms_limit);
+  int64_t GetDataSizeLimit();
+  int32_t GetFreqMsLimit();
+
+ private:
+  int64_t datasize_limit_;
+  int32_t freqms_limit_;
+};
+
+class FlowCtrlItem {
+ public:
+  FlowCtrlItem();
+  FlowCtrlItem(int32_t type, int32_t zero_cnt, int32_t freqms_limit);
+  FlowCtrlItem(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
+               int32_t min_data_filter_freqms);
+  FlowCtrlItem(int32_t type, int32_t start_time, int32_t end_time, int64_t datadlt_m,
+               int64_t datasize_limit, int32_t freqms_limit);
+  FlowCtrlItem& operator=(const FlowCtrlItem& target);
+  void Clear();
+  void ResetFlowCtrlValue(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
+                          int32_t min_data_filter_freqms);
+  int32_t GetFreLimit(int32_t msg_zero_cnt) const;
+  bool GetDataLimit(int64_t datadlt_m,
+    int32_t curr_time, FlowCtrlResult& flowctrl_result) const;
+  const int32_t GetType() const { return type_; }
+  const int32_t GetZeroCnt() const { return zero_cnt_; }
+  const int32_t GetStartTime() const { return start_time_; }
+  const int32_t GetEndTime() const { return end_time_; }
+  const int64_t GetDataSizeLimit() const { return datasize_limit_; }
+  const int32_t GetFreqMsLimit() const { return freqms_limit_; }
+  const int64_t GetDltInM() const { return datadlt_m_; }
+
+ private:
+  int32_t type_;
+  int32_t start_time_;
+  int32_t end_time_;
+  int64_t datadlt_m_;
+  int64_t datasize_limit_;
+  int32_t freqms_limit_;
+  int32_t zero_cnt_;
+};
+
+class FlowCtrlRuleHandler {
+ public:
+  FlowCtrlRuleHandler();
+  ~FlowCtrlRuleHandler();
+  void UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id, int64_t flowctrl_id,
+                             const string& flowctrl_info);
+  bool GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result) const;
+  int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit) const;
+  void GetFilterCtrlItem(FlowCtrlItem& result) const;
+  void GetFlowCtrlInfo(string& flowctrl_info) const;
+  int32_t GetMinZeroCnt() const { return this->min_zero_cnt_.Get(); }
+  int32_t GetQryPriorityId() const { return this->qrypriority_id_.Get(); }
+  void SetQryPriorityId(int32_t qrypriority_id) { this->qrypriority_id_.Set(qrypriority_id); }
+  const int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
+
+ private:
+  void initialStatisData();
+  void clearStatisData();
+  static bool compareFeqQueue(const FlowCtrlItem& queue1, const FlowCtrlItem& queue2);
+  static bool compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2);
+  bool parseStringMember(string& err_info, const rapidjson::Value& root, const char* key,
+                         string& value, bool compare_value, string required_val);
+  bool parseLongMember(string& err_info, const rapidjson::Value& root, const char* key,
+                       int64_t& value, bool compare_value, int64_t required_val);
+  bool parseIntMember(string& err_info, const rapidjson::Value& root, const char* key,
+                      int32_t& value, bool compare_value, int32_t required_val);
+  bool parseFlowCtrlInfo(const string& flowctrl_info,
+                         map<int32_t, vector<FlowCtrlItem> >& flowctrl_info_map);
+  bool parseDataLimit(string& err_info, const rapidjson::Value& root,
+                      vector<FlowCtrlItem>& flowCtrlItems);
+  bool parseFreqLimit(string& err_info, const rapidjson::Value& root,
+                      vector<FlowCtrlItem>& flowctrl_items);
+  bool parseLowFetchLimit(string& err_info, const rapidjson::Value& root,
+                          vector<FlowCtrlItem>& flowctrl_items);
+  bool parseTimeMember(string& err_info, const rapidjson::Value& root, const char* key,
+                       int32_t& value);
+
+ private:
+  mutable mutex config_lock_;
+  string flowctrl_info_;
+  FlowCtrlItem filter_ctrl_item_;
+  map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_;
+  int64_t last_update_time_;
+  AtomicLong flowctrl_id_;
+  AtomicInteger qrypriority_id_;
+  AtomicInteger min_zero_cnt_;
+  AtomicLong min_datadlt_limt_;
+  AtomicInteger datalimit_start_time_;
+  AtomicInteger datalimit_end_time_;
+};
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_FLOW_CONTROL_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/logger.cc b/tubemq-client-twins/tubemq-client-cpp/src/logger.cc
index f374d9b..2e51e25 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/logger.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/logger.cc
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include "tubemq/logger.h"
+#include "logger.h"
 
 #include <log4cplus/fileappender.h>
 #include <log4cplus/layout.h>
@@ -27,14 +27,14 @@
 
 #include <string>
 
-#include "tubemq/singleton.h"
+#include "singleton.h"
 
 namespace tubemq {
 
-Logger& GetLogger() { return Singleton<Logger>::Instance(); }
-
 static const uint32_t kMBSize = 1024 * 1024;
 
+Logger& GetLogger() { return Singleton<Logger>::Instance(); }
+
 bool Logger::Init(const std::string& path, Logger::Level level, uint32_t file_max_size,
                   uint32_t file_num) {
   base_path_ = path;
@@ -67,6 +67,7 @@ void Logger::setup() {
   bool immediate_fush = true;
   std::string pattern = "[%D{%Y-%m-%d %H:%M:%S.%q}][tid:%t]%m%n";
   auto logger_d = log4cplus::Logger::getInstance(instance_);
+  logger_d.removeAllAppenders();
   logger_d.setLogLevel(log4cplus::TRACE_LOG_LEVEL);
   log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> append_d(
       new log4cplus::RollingFileAppender(base_path_ + ".log", file_max_size_ * kMBSize, file_num_,
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/logger.h b/tubemq-client-twins/tubemq-client-cpp/src/logger.h
new file mode 100644
index 0000000..64933a6
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/logger.h
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_LOG_FILE_
+#define _TUBEMQ_LOG_FILE_
+
+#include <stdint.h>
+
+#include <string>
+#include <vector>
+
+namespace tubemq {
+class Logger;
+
+Logger& GetLogger();
+
+#define LOG_LEVEL(level, fmt, ...)                                                   \
+  {                                                                                  \
+    if (tubemq::GetLogger().IsEnable(level)) {                                       \
+      tubemq::GetLogger().Write("[%s:%d][%s]" fmt, __func__, __LINE__,               \
+                                tubemq::Logger::Level2String(level), ##__VA_ARGS__); \
+    }                                                                                \
+  }
+
+#define LOG_TRACE(fmt, ...) \
+  LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kTrace, fmt, ##__VA_ARGS__)
+#define LOG_DEBUG(fmt, ...) \
+  LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kDebug, fmt, ##__VA_ARGS__)
+#define LOG_INFO(fmt, ...) \
+  LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kInfo, fmt, ##__VA_ARGS__)
+#define LOG_WARN(fmt, ...) \
+  LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kWarn, fmt, ##__VA_ARGS__)
+#define LOG_ERROR(fmt, ...) \
+  LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kError, fmt, ##__VA_ARGS__)
+
+#define LOG_TUBEMQ(logger, level, fmt, ...)                                                    \
+  {                                                                                            \
+    if (logger.IsEnable(level)) {                                                              \
+      logger.Write("[%s:%d][%s]" fmt, __func__, __LINE__, tubemq::Logger::Level2String(level), \
+                   ##__VA_ARGS__);                                                             \
+    }                                                                                          \
+  }
+
+class Logger {
+ public:
+  enum Level {
+    kTrace = 0,
+    kDebug = 1,
+    kInfo = 2,
+    kWarn = 3,
+    kError = 4,
+  };
+
+  // size: MB
+  Logger()
+      : file_max_size_(100),
+        file_num_(10),
+        level_(kError),
+        base_path_("tubemq"),
+        instance_("TubeMQ") {}
+
+  ~Logger(void) {}
+
+  // path example: ../log/tubemq
+  // size: MB
+  bool Init(const std::string& path, Level level, uint32_t file_max_size = 100,
+            uint32_t file_num = 10);
+
+  bool Write(const char* sFormat, ...) __attribute__((format(printf, 2, 3)));
+  inline bool WriteStream(const std::string& msg) { return writeStream(msg.c_str()); }
+
+  inline void SetInstance(const std::string& instance) { instance_ = instance; }
+  inline bool IsEnable(Level level) {
+    if (level_ <= level) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  static const char* Level2String(Level level) {
+    static const char* level_names[] = {
+        "TRACE", "DEBUG", "INFO", "WARN", "ERROR",
+    };
+    return level_names[level];
+  }
+
+ private:
+  void setup();
+  bool writeStream(const char* msg);
+
+ private:
+  uint32_t file_max_size_;
+  uint16_t file_num_;
+  uint8_t level_;
+
+  std::string base_path_;
+  std::string instance_;
+  std::string err_msg_;
+};
+}  // namespace tubemq
+#endif  // _TUBEMQ_LOG_FILE_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 4f03dfc..38f6f46 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -17,16 +17,16 @@
  * under the License.
  */
 
-#include "tubemq/meta_info.h"
+#include "meta_info.h"
 
 #include <stdlib.h>
 
 #include <sstream>
 #include <vector>
 
-#include "tubemq/const_config.h"
+#include "const_config.h"
 #include "tubemq/tubemq_errcode.h"
-#include "tubemq/utils.h"
+#include "utils.h"
 
 namespace tubemq {
 
@@ -34,9 +34,9 @@ using std::stringstream;
 using std::vector;
 
 NodeInfo::NodeInfo() {
-  this->node_id_ = 0;
-  this->node_host_ = " ";
-  this->node_port_ = tb_config::kBrokerPortDef;
+  node_id_ = 0;
+  node_host_ = " ";
+  node_port_ = tb_config::kBrokerPortDef;
   buildStrInfo();
 }
 
@@ -45,34 +45,34 @@ NodeInfo::NodeInfo(bool is_broker, const string& node_info) {
   vector<string> result;
   Utils::Split(node_info, result, delimiter::kDelimiterColon);
   if (is_broker) {
-    this->node_id_ = atoi(result[0].c_str());
-    this->node_host_ = result[1];
-    this->node_port_ = tb_config::kBrokerPortDef;
+    node_id_ = atoi(result[0].c_str());
+    node_host_ = result[1];
+    node_port_ = tb_config::kBrokerPortDef;
     if (result.size() >= 3) {
-      this->node_port_ = atoi(result[2].c_str());
+      node_port_ = atoi(result[2].c_str());
     }
   } else {
-    this->node_id_ = 0;
-    this->node_host_ = result[0];
-    this->node_port_ = tb_config::kBrokerPortDef;
+    node_id_ = 0;
+    node_host_ = result[0];
+    node_port_ = tb_config::kBrokerPortDef;
     if (result.size() >= 2) {
-      this->node_port_ = atoi(result[1].c_str());
+      node_port_ = atoi(result[1].c_str());
     }
   }
   buildStrInfo();
 }
 
 NodeInfo::NodeInfo(const string& node_host, uint32_t node_port) {
-  this->node_id_ = tb_config::kInvalidValue;
-  this->node_host_ = node_host;
-  this->node_port_ = node_port;
+  node_id_ = tb_config::kInvalidValue;
+  node_host_ = node_host;
+  node_port_ = node_port;
   buildStrInfo();
 }
 
 NodeInfo::NodeInfo(int node_id, const string& node_host, uint32_t node_port) {
-  this->node_id_ = node_id;
-  this->node_host_ = node_host;
-  this->node_port_ = node_port;
+  node_id_ = node_id;
+  node_host_ = node_host;
+  node_port_ = node_port;
   buildStrInfo();
 }
 
@@ -82,11 +82,12 @@ NodeInfo::~NodeInfo() {
 
 NodeInfo& NodeInfo::operator=(const NodeInfo& target) {
   if (this != &target) {
-    this->node_id_ = target.node_id_;
-    this->node_host_ = target.node_host_;
-    this->node_port_ = target.node_port_;
-    this->addr_info_ = target.addr_info_;
-    this->node_info_ = target.node_info_;
+    node_id_ = target.node_id_;
+    node_host_ = target.node_host_;
+    node_port_ = target.node_port_;
+    addr_info_ = target.addr_info_;
+    node_info_ = target.node_info_;
+    buildStrInfo();
   }
   return *this;
 }
@@ -95,50 +96,50 @@ bool NodeInfo::operator==(const NodeInfo& target) {
   if (this == &target) {
     return true;
   }
-  if (this->node_info_ == target.node_info_) {
+  if (node_info_ == target.node_info_) {
     return true;
   }
   return false;
 }
 
 bool NodeInfo::operator<(const NodeInfo& target) const {
-  return this->node_info_ < target.node_info_;
+  return node_info_ < target.node_info_;
 }
-const uint32_t NodeInfo::GetNodeId() const { return this->node_id_; }
+const uint32_t NodeInfo::GetNodeId() const { return node_id_; }
 
-const string& NodeInfo::GetHost() const { return this->node_host_; }
+const string& NodeInfo::GetHost() const { return node_host_; }
 
-const uint32_t NodeInfo::GetPort() const { return this->node_port_; }
+const uint32_t NodeInfo::GetPort() const { return node_port_; }
 
-const string& NodeInfo::GetAddrInfo() const { return this->addr_info_; }
+const string& NodeInfo::GetAddrInfo() const { return addr_info_; }
 
-const string& NodeInfo::GetNodeInfo() const { return this->node_info_; }
+const string& NodeInfo::GetNodeInfo() const { return node_info_; }
 
 void NodeInfo::buildStrInfo() {
   stringstream ss1;
-  ss1 << this->node_host_;
+  ss1 << node_host_;
   ss1 << delimiter::kDelimiterColon;
-  ss1 << this->node_port_;
-  this->addr_info_ = ss1.str();
+  ss1 << node_port_;
+  addr_info_ = ss1.str();
 
   stringstream ss2;
-  ss2 << this->node_id_;
+  ss2 << node_id_;
   ss2 << delimiter::kDelimiterColon;
-  ss2 << this->addr_info_;
-  this->node_info_ = ss2.str();
+  ss2 << addr_info_;
+  node_info_ = ss2.str();
 }
 
 Partition::Partition() {
-  this->topic_ = " ";
-  this->partition_id_ = 0;
+  topic_ = " ";
+  partition_id_ = 0;
   buildPartitionKey();
 }
 
 // partition_info = broker_info#topic:partitionId
 Partition::Partition(const string& partition_info) {
   // initial process
-  this->topic_ = " ";
-  this->partition_id_ = 0;
+  topic_ = " ";
+  partition_id_ = 0;
   // parse partition_info string
   string::size_type pos = 0;
   string seg_key = delimiter::kDelimiterPound;
@@ -148,7 +149,8 @@ Partition::Partition(const string& partition_info) {
   if (pos != string::npos) {
     string broker_info = partition_info.substr(0, pos);
     broker_info = Utils::Trim(broker_info);
-    this->broker_info_ = NodeInfo(true, broker_info);
+    NodeInfo tmp_node(true, broker_info);
+    broker_info_ = tmp_node;
     string part_str = partition_info.substr(pos + seg_key.size(), partition_info.size());
     part_str = Utils::Trim(part_str);
     pos = part_str.find(token_key);
@@ -157,8 +159,8 @@ Partition::Partition(const string& partition_info) {
       string part_id_str = part_str.substr(pos + token_key.size(), part_str.size());
       topic_str = Utils::Trim(topic_str);
       part_id_str = Utils::Trim(part_id_str);
-      this->topic_ = topic_str;
-      this->partition_id_ = atoi(part_id_str.c_str());
+      topic_ = topic_str;
+      partition_id_ = atoi(part_id_str.c_str());
     }
   }
   buildPartitionKey();
@@ -167,21 +169,21 @@ Partition::Partition(const string& partition_info) {
 // part_str = topic:partition_id
 Partition::Partition(const NodeInfo& broker_info, const string& part_str) {
   vector<string> result;
-  this->topic_ = " ";
-  this->partition_id_ = 0;
-  this->broker_info_ = broker_info;
+  topic_ = " ";
+  partition_id_ = 0;
+  broker_info_ = broker_info;
   Utils::Split(part_str, result, delimiter::kDelimiterColon);
   if (result.size() >= 2) {
-    this->topic_ = result[0];
-    this->partition_id_ = atoi(result[1].c_str());
+    topic_ = result[0];
+    partition_id_ = atoi(result[1].c_str());
   }
   buildPartitionKey();
 }
 
 Partition::Partition(const NodeInfo& broker_info, const string& topic, uint32_t partition_id) {
-  this->topic_ = topic;
-  this->partition_id_ = partition_id;
-  this->broker_info_ = broker_info;
+  topic_ = topic;
+  partition_id_ = partition_id;
+  broker_info_ = broker_info;
   buildPartitionKey();
 }
 
@@ -191,11 +193,12 @@ Partition::~Partition() {
 
 Partition& Partition::operator=(const Partition& target) {
   if (this != &target) {
-    this->topic_ = target.topic_;
-    this->partition_id_ = target.partition_id_;
-    this->broker_info_ = target.broker_info_;
-    this->partition_key_ = target.partition_key_;
-    this->partition_info_ = target.partition_info_;
+    topic_ = target.topic_;
+    partition_id_ = target.partition_id_;
+    broker_info_ = target.broker_info_;
+    partition_key_ = target.partition_key_;
+    partition_info_ = target.partition_info_;
+    buildPartitionKey();
   }
   return *this;
 }
@@ -204,44 +207,44 @@ bool Partition::operator==(const Partition& target) {
   if (this == &target) {
     return true;
   }
-  if (this->partition_info_ == target.partition_info_) {
+  if (partition_info_ == target.partition_info_) {
     return true;
   }
   return false;
 }
 
-const uint32_t Partition::GetBrokerId() const { return this->broker_info_.GetNodeId(); }
+const uint32_t Partition::GetBrokerId() const { return broker_info_.GetNodeId(); }
 
-const string& Partition::GetBrokerHost() const { return this->broker_info_.GetHost(); }
+const string& Partition::GetBrokerHost() const { return broker_info_.GetHost(); }
 
-const uint32_t Partition::GetBrokerPort() const { return this->broker_info_.GetPort(); }
+const uint32_t Partition::GetBrokerPort() const { return broker_info_.GetPort(); }
 
-const string& Partition::GetPartitionKey() const { return this->partition_key_; }
+const string& Partition::GetPartitionKey() const { return partition_key_; }
 
-const string& Partition::GetTopic() const { return this->topic_; }
+const string& Partition::GetTopic() const { return topic_; }
 
-const NodeInfo& Partition::GetBrokerInfo() const { return this->broker_info_; }
+const NodeInfo& Partition::GetBrokerInfo() const { return broker_info_; }
 
-const uint32_t Partition::GetPartitionId() const { return this->partition_id_; }
+const uint32_t Partition::GetPartitionId() const { return partition_id_; }
 
-const string& Partition::ToString() const { return this->partition_info_; }
+const string& Partition::ToString() const { return partition_info_; }
 
 void Partition::buildPartitionKey() {
   stringstream ss1;
-  ss1 << this->broker_info_.GetNodeId();
+  ss1 << broker_info_.GetNodeId();
   ss1 << delimiter::kDelimiterColon;
-  ss1 << this->topic_;
+  ss1 << topic_;
   ss1 << delimiter::kDelimiterColon;
-  ss1 << this->partition_id_;
-  this->partition_key_ = ss1.str();
+  ss1 << partition_id_;
+  partition_key_ = ss1.str();
 
   stringstream ss2;
-  ss2 << this->broker_info_.GetNodeInfo();
+  ss2 << broker_info_.GetNodeInfo();
   ss2 << delimiter::kDelimiterPound;
-  ss2 << this->topic_;
+  ss2 << topic_;
   ss2 << delimiter::kDelimiterColon;
-  ss2 << this->partition_id_;
-  this->partition_info_ = ss2.str();
+  ss2 << partition_id_;
+  partition_info_ = ss2.str();
 }
 
 PartitionExt::PartitionExt() : Partition() {
@@ -266,24 +269,24 @@ PartitionExt& PartitionExt::operator=(const PartitionExt& target) {
     // parent class
     Partition::operator=(target);
     // child class
-    this->is_last_consumed_ = target.is_last_consumed_;
-    this->cur_flowctrl_ = target.cur_flowctrl_;
-    this->cur_freqctrl_ = target.cur_freqctrl_;
-    this->next_stage_updtime_ = target.next_stage_updtime_;
-    this->next_slice_updtime_ = target.next_slice_updtime_;
-    this->limit_slice_msgsize_ = target.limit_slice_msgsize_;
-    this->cur_stage_msgsize_ = target.cur_stage_msgsize_;
-    this->cur_slice_msgsize_ = target.cur_slice_msgsize_;
-    this->total_zero_cnt_ = target.total_zero_cnt_;
-    this->booked_time_ = target.booked_time_;
-    this->booked_errcode_ = target.booked_errcode_;
-    this->booked_esc_limit_ = target.booked_esc_limit_;
-    this->booked_msgsize_ = target.booked_msgsize_;
-    this->booked_dlt_limit_ = target.booked_dlt_limit_;
-    this->booked_curdata_dlt_ = target.booked_curdata_dlt_;
-    this->booked_require_slow_ = target.booked_require_slow_;
-    this->booked_errcode_ = target.booked_errcode_;
-    this->booked_errcode_ = target.booked_errcode_;
+    is_last_consumed_ = target.is_last_consumed_;
+    cur_flowctrl_ = target.cur_flowctrl_;
+    cur_freqctrl_ = target.cur_freqctrl_;
+    next_stage_updtime_ = target.next_stage_updtime_;
+    next_slice_updtime_ = target.next_slice_updtime_;
+    limit_slice_msgsize_ = target.limit_slice_msgsize_;
+    cur_stage_msgsize_ = target.cur_stage_msgsize_;
+    cur_slice_msgsize_ = target.cur_slice_msgsize_;
+    total_zero_cnt_ = target.total_zero_cnt_;
+    booked_time_ = target.booked_time_;
+    booked_errcode_ = target.booked_errcode_;
+    booked_esc_limit_ = target.booked_esc_limit_;
+    booked_msgsize_ = target.booked_msgsize_;
+    booked_dlt_limit_ = target.booked_dlt_limit_;
+    booked_curdata_dlt_ = target.booked_curdata_dlt_;
+    booked_require_slow_ = target.booked_require_slow_;
+    booked_errcode_ = target.booked_errcode_;
+    booked_errcode_ = target.booked_errcode_;
   }
   return *this;
 }
@@ -291,21 +294,21 @@ PartitionExt& PartitionExt::operator=(const PartitionExt& target) {
 
 void PartitionExt::BookConsumeData(int32_t errcode, int32_t msg_size,
   bool req_esc_limit, int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow) {
-  this->booked_time_ = Utils::GetCurrentTimeMillis();
-  this->booked_errcode_ = errcode;
-  this->booked_esc_limit_ = req_esc_limit;
-  this->booked_msgsize_ = msg_size;
-  this->booked_dlt_limit_ = rsp_dlt_limit;
-  this->booked_curdata_dlt_ = last_datadlt;
-  this->booked_require_slow_ = require_slow;
+  booked_time_ = Utils::GetCurrentTimeMillis();
+  booked_errcode_ = errcode;
+  booked_esc_limit_ = req_esc_limit;
+  booked_msgsize_ = msg_size;
+  booked_dlt_limit_ = rsp_dlt_limit;
+  booked_curdata_dlt_ = last_datadlt;
+  booked_require_slow_ = require_slow;
 }
 
 int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
   const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed) {
-  int64_t dlt_time = Utils::GetCurrentTimeMillis() - this->booked_time_;
+  int64_t dlt_time = Utils::GetCurrentTimeMillis() - booked_time_;
   return ProcConsumeResult(def_flowctrl_handler, group_flowctrl_handler, filter_consume,
-    last_consumed, this->booked_errcode_, this->booked_msgsize_, this->booked_esc_limit_,
-    this->booked_dlt_limit_, this->booked_curdata_dlt_, this->booked_require_slow_) - dlt_time;
+    last_consumed, booked_errcode_, booked_msgsize_, booked_esc_limit_,
+    booked_dlt_limit_, booked_curdata_dlt_, booked_require_slow_) - dlt_time;
 }
 
 int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
@@ -314,7 +317,7 @@ int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_
   int64_t last_datadlt, bool require_slow) {
   // #lizard forgives
   // record consume status
-  this->is_last_consumed_ = last_consumed;
+  is_last_consumed_ = last_consumed;
   // Update strategy data values
   updateStrategyData(def_flowctrl_handler, group_flowctrl_handler, msg_size, last_datadlt);
   // Perform different strategies based on error codes
@@ -322,36 +325,36 @@ int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_
     case err_code::kErrNotFound:
     case err_code::kErrSuccess:
       if (msg_size == 0 && errcode != err_code::kErrSuccess) {
-        this->total_zero_cnt_ += 1;
+        total_zero_cnt_ += 1;
       } else {
-        this->total_zero_cnt_ = 0;
+        total_zero_cnt_ = 0;
       }
-      if (this->total_zero_cnt_ > 0) {
+      if (total_zero_cnt_ > 0) {
         if (group_flowctrl_handler.GetMinZeroCnt() != tb_config::kMaxIntValue) {
           return (int64_t)(group_flowctrl_handler.GetCurFreqLimitTime(
-            this->total_zero_cnt_, (int32_t)rsp_dlt_limit));
+            total_zero_cnt_, (int32_t)rsp_dlt_limit));
         } else {
           return (int64_t)def_flowctrl_handler.GetCurFreqLimitTime(
-            this->total_zero_cnt_, (int32_t)rsp_dlt_limit);
+            total_zero_cnt_, (int32_t)rsp_dlt_limit);
         }
       }
       if (req_esc_limit) {
         return 0;
       } else {
-        if (this->cur_stage_msgsize_ >= this->cur_flowctrl_.GetDataSizeLimit()
-          || this->cur_slice_msgsize_ >= this->limit_slice_msgsize_) {
-          return this->cur_flowctrl_.GetFreqMsLimit() > rsp_dlt_limit
-            ? this->cur_flowctrl_.GetFreqMsLimit() : rsp_dlt_limit;
+        if (cur_stage_msgsize_ >= cur_flowctrl_.GetDataSizeLimit()
+          || cur_slice_msgsize_ >= limit_slice_msgsize_) {
+          return cur_flowctrl_.GetFreqMsLimit() > rsp_dlt_limit
+            ? cur_flowctrl_.GetFreqMsLimit() : rsp_dlt_limit;
         }
         if (errcode == err_code::kErrSuccess) {
-          if (filter_consume && this->cur_freqctrl_.GetFreqMsLimit() >= 0) {
+          if (filter_consume && cur_freqctrl_.GetFreqMsLimit() >= 0) {
             if (require_slow) {
-              return this->cur_freqctrl_.GetZeroCnt();
+              return cur_freqctrl_.GetZeroCnt();
             } else {
-              return this->cur_freqctrl_.GetFreqMsLimit();
+              return cur_freqctrl_.GetFreqMsLimit();
             }
-          } else if (!filter_consume && this->cur_freqctrl_.GetDataSizeLimit() >=0) {
-            return this->cur_freqctrl_.GetDataSizeLimit();
+          } else if (!filter_consume && cur_freqctrl_.GetDataSizeLimit() >=0) {
+            return cur_freqctrl_.GetDataSizeLimit();
           }
         }
         return rsp_dlt_limit;
@@ -364,68 +367,68 @@ int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_
 }
 
 void PartitionExt::SetLastConsumed(bool last_consumed) {
-  this->is_last_consumed_ = last_consumed;
+  is_last_consumed_ = last_consumed;
 }
 
-bool PartitionExt::IsLastConsumed() {
-  return this->is_last_consumed_;
+bool PartitionExt::IsLastConsumed() const {
+  return is_last_consumed_;
 }
 
 void PartitionExt::resetParameters() {
-  this->is_last_consumed_ = false;
-  this->cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 20);
-  this->next_stage_updtime_ = 0;
-  this->next_slice_updtime_ = 0;
-  this->limit_slice_msgsize_ = 0;
-  this->cur_stage_msgsize_ = 0;
-  this->cur_slice_msgsize_ = 0;
-  this->total_zero_cnt_ = 0;
-  this->booked_time_ = 0;
-  this->booked_errcode_ = 0;
-  this->booked_esc_limit_ = false;
-  this->booked_msgsize_ = 0;
-  this->booked_dlt_limit_ = 0;
-  this->booked_curdata_dlt_ = 0;
-  this->booked_require_slow_ = false;
+  is_last_consumed_ = false;
+  cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 20);
+  next_stage_updtime_ = 0;
+  next_slice_updtime_ = 0;
+  limit_slice_msgsize_ = 0;
+  cur_stage_msgsize_ = 0;
+  cur_slice_msgsize_ = 0;
+  total_zero_cnt_ = 0;
+  booked_time_ = 0;
+  booked_errcode_ = 0;
+  booked_esc_limit_ = false;
+  booked_msgsize_ = 0;
+  booked_dlt_limit_ = 0;
+  booked_curdata_dlt_ = 0;
+  booked_require_slow_ = false;
 }
 
 void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_handler,
   const FlowCtrlRuleHandler& group_flowctrl_handler, int32_t msg_size, int64_t last_datadlt) {
   bool result = false;
   // Accumulated data received
-  this->cur_stage_msgsize_ += msg_size;
-  this->cur_slice_msgsize_ += msg_size;
+  cur_stage_msgsize_ += msg_size;
+  cur_slice_msgsize_ += msg_size;
   int64_t curr_time = Utils::GetCurrentTimeMillis();
   // Update strategy data values
-  if (curr_time > this->next_stage_updtime_) {
-    this->cur_stage_msgsize_ = 0;
-    this->cur_slice_msgsize_ = 0;
+  if (curr_time > next_stage_updtime_) {
+    cur_stage_msgsize_ = 0;
+    cur_slice_msgsize_ = 0;
     if (last_datadlt >= 0) {
-      result = group_flowctrl_handler.GetCurDataLimit(last_datadlt, this->cur_flowctrl_);
+      result = group_flowctrl_handler.GetCurDataLimit(last_datadlt, cur_flowctrl_);
       if (!result) {
-        result = def_flowctrl_handler.GetCurDataLimit(last_datadlt, this->cur_flowctrl_);
+        result = def_flowctrl_handler.GetCurDataLimit(last_datadlt, cur_flowctrl_);
         if (!result) {
-          this->cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 0);
+          cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 0);
         }
       }
-      group_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
-      if (this->cur_freqctrl_.GetFreqMsLimit() < 0) {
-        def_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
+      group_flowctrl_handler.GetFilterCtrlItem(cur_freqctrl_);
+      if (cur_freqctrl_.GetFreqMsLimit() < 0) {
+        def_flowctrl_handler.GetFilterCtrlItem(cur_freqctrl_);
       }
       curr_time = Utils::GetCurrentTimeMillis();
     }
-    this->limit_slice_msgsize_ = this->cur_flowctrl_.GetDataSizeLimit() / 12;
-    this->next_stage_updtime_ = curr_time + 60000;
-    this->next_slice_updtime_ = curr_time + 5000;
-  } else if (curr_time > this->next_slice_updtime_) {
-    this->cur_slice_msgsize_ = 0;
-    this->next_slice_updtime_ = curr_time + 5000;
+    limit_slice_msgsize_ = cur_flowctrl_.GetDataSizeLimit() / 12;
+    next_stage_updtime_ = curr_time + 60000;
+    next_slice_updtime_ = curr_time + 5000;
+  } else if (curr_time > next_slice_updtime_) {
+    cur_slice_msgsize_ = 0;
+    next_slice_updtime_ = curr_time + 5000;
   }
 }
 
 SubscribeInfo::SubscribeInfo() {
-  this->consumer_id_ = " ";
-  this->group_ = " ";
+  consumer_id_ = " ";
+  group_ = " ";
   buildSubInfo();
 }
 
@@ -434,8 +437,8 @@ SubscribeInfo::SubscribeInfo(const string& sub_info) {
   string::size_type pos = 0;
   string seg_key = delimiter::kDelimiterPound;
   string at_key = delimiter::kDelimiterAt;
-  this->consumer_id_ = " ";
-  this->group_ = " ";
+  consumer_id_ = " ";
+  group_ = " ";
   // parse sub_info
   pos = sub_info.find(seg_key);
   if (pos != string::npos) {
@@ -443,109 +446,111 @@ SubscribeInfo::SubscribeInfo(const string& sub_info) {
     consumer_info = Utils::Trim(consumer_info);
     string partition_info = sub_info.substr(pos + seg_key.size(), sub_info.size());
     partition_info = Utils::Trim(partition_info);
-    this->partitionext_ = PartitionExt(partition_info);
+    PartitionExt tmp_part(partition_info);
+    partitionext_ = tmp_part;
     pos = consumer_info.find(at_key);
-    this->consumer_id_ = consumer_info.substr(0, pos);
-    this->consumer_id_ = Utils::Trim(this->consumer_id_);
-    this->group_ = consumer_info.substr(pos + at_key.size(), consumer_info.size());
-    this->group_ = Utils::Trim(this->group_);
+    consumer_id_ = consumer_info.substr(0, pos);
+    consumer_id_ = Utils::Trim(consumer_id_);
+    group_ = consumer_info.substr(pos + at_key.size(), consumer_info.size());
+    group_ = Utils::Trim(group_);
   }
   buildSubInfo();
 }
 
 SubscribeInfo::SubscribeInfo(const string& consumer_id,
         const string& group_name, const PartitionExt& partition_ext) {
-  this->consumer_id_ = consumer_id;
-  this->group_ = group_name;
-  this->partitionext_ = partition_ext;
+  consumer_id_ = consumer_id;
+  group_ = group_name;
+  partitionext_ = partition_ext;
   buildSubInfo();
 }
 
 SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) {
   if (this != &target) {
-    this->consumer_id_ = target.consumer_id_;
-    this->group_ = target.group_;
-    this->partitionext_ = target.partitionext_;
+    consumer_id_ = target.consumer_id_;
+    group_ = target.group_;
+    partitionext_ = target.partitionext_;
+    buildSubInfo();
   }
   return *this;
 }
 
-const string& SubscribeInfo::GetConsumerId() const { return this->consumer_id_; }
+const string& SubscribeInfo::GetConsumerId() const { return consumer_id_; }
 
-const string& SubscribeInfo::GetGroup() const { return this->group_; }
+const string& SubscribeInfo::GetGroup() const { return group_; }
 
-const PartitionExt& SubscribeInfo::GetPartitionExt() const { return this->partitionext_; }
+const PartitionExt& SubscribeInfo::GetPartitionExt() const { return partitionext_; }
 
-const uint32_t SubscribeInfo::GgetBrokerId() const { return this->partitionext_.GetBrokerId(); }
+const uint32_t SubscribeInfo::GgetBrokerId() const { return partitionext_.GetBrokerId(); }
 
-const string& SubscribeInfo::GetBrokerHost() const { return this->partitionext_.GetBrokerHost(); }
+const string& SubscribeInfo::GetBrokerHost() const { return partitionext_.GetBrokerHost(); }
 
-const uint32_t SubscribeInfo::GetBrokerPort() const { return this->partitionext_.GetBrokerPort(); }
+const uint32_t SubscribeInfo::GetBrokerPort() const { return partitionext_.GetBrokerPort(); }
 
-const string& SubscribeInfo::GetTopic() const { return this->partitionext_.GetTopic(); }
+const string& SubscribeInfo::GetTopic() const { return partitionext_.GetTopic(); }
 
 const uint32_t SubscribeInfo::GetPartitionId() const {
-  return this->partitionext_.GetPartitionId();
+  return partitionext_.GetPartitionId();
 }
 
-const string& SubscribeInfo::ToString() const { return this->sub_info_; }
+const string& SubscribeInfo::ToString() const { return sub_info_; }
 
 void SubscribeInfo::buildSubInfo() {
   stringstream ss;
-  ss << this->consumer_id_;
+  ss << consumer_id_;
   ss << delimiter::kDelimiterAt;
-  ss << this->group_;
+  ss << group_;
   ss << delimiter::kDelimiterPound;
-  ss << this->partitionext_.ToString();
-  this->sub_info_ = ss.str();
+  ss << partitionext_.ToString();
+  sub_info_ = ss.str();
 }
 
 ConsumerEvent::ConsumerEvent() {
-  this->rebalance_id_ = tb_config::kInvalidValue;
-  this->event_type_ = tb_config::kInvalidValue;
-  this->event_status_ = tb_config::kInvalidValue;
+  rebalance_id_ = tb_config::kInvalidValue;
+  event_type_ = tb_config::kInvalidValue;
+  event_status_ = tb_config::kInvalidValue;
 }
 
 ConsumerEvent::ConsumerEvent(const ConsumerEvent& target) {
-  this->rebalance_id_ = target.rebalance_id_;
-  this->event_type_ = target.event_type_;
-  this->event_status_ = target.event_status_;
-  this->subscribe_list_ = target.subscribe_list_;
+  rebalance_id_ = target.rebalance_id_;
+  event_type_ = target.event_type_;
+  event_status_ = target.event_status_;
+  subscribe_list_ = target.subscribe_list_;
 }
 
 ConsumerEvent::ConsumerEvent(int64_t rebalance_id, int32_t event_type,
                              const list<SubscribeInfo>& subscribeInfo_lst, int32_t event_status) {
   list<SubscribeInfo>::const_iterator it;
-  this->rebalance_id_ = rebalance_id;
-  this->event_type_ = event_type;
-  this->event_status_ = event_status;
+  rebalance_id_ = rebalance_id;
+  event_type_ = event_type;
+  event_status_ = event_status;
   for (it = subscribeInfo_lst.begin(); it != subscribeInfo_lst.end(); ++it) {
-    this->subscribe_list_.push_back(*it);
+    subscribe_list_.push_back(*it);
   }
 }
 
 ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) {
   if (this != &target) {
-    this->rebalance_id_ = target.rebalance_id_;
-    this->event_type_ = target.event_type_;
-    this->event_status_ = target.event_status_;
-    this->subscribe_list_ = target.subscribe_list_;
+    rebalance_id_ = target.rebalance_id_;
+    event_type_ = target.event_type_;
+    event_status_ = target.event_status_;
+    subscribe_list_ = target.subscribe_list_;
   }
   return *this;
 }
 
-const int64_t ConsumerEvent::GetRebalanceId() const { return this->rebalance_id_; }
+const int64_t ConsumerEvent::GetRebalanceId() const { return rebalance_id_; }
 
-const int32_t ConsumerEvent::GetEventType() const { return this->event_type_; }
+const int32_t ConsumerEvent::GetEventType() const { return event_type_; }
 
-const int32_t ConsumerEvent::GetEventStatus() const { return this->event_status_; }
+const int32_t ConsumerEvent::GetEventStatus() const { return event_status_; }
 
-void ConsumerEvent::SetEventType(int32_t event_type) { this->event_type_ = event_type; }
+void ConsumerEvent::SetEventType(int32_t event_type) { event_type_ = event_type; }
 
-void ConsumerEvent::SetEventStatus(int32_t event_status) { this->event_status_ = event_status; }
+void ConsumerEvent::SetEventStatus(int32_t event_status) { event_status_ = event_status; }
 
 const list<SubscribeInfo>& ConsumerEvent::GetSubscribeInfoList() const {
-  return this->subscribe_list_;
+  return subscribe_list_;
 }
 
 string ConsumerEvent::ToString() {
@@ -553,13 +558,13 @@ string ConsumerEvent::ToString() {
   stringstream ss;
   list<SubscribeInfo>::const_iterator it;
   ss << "ConsumerEvent [rebalanceId=";
-  ss << this->rebalance_id_;
+  ss << rebalance_id_;
   ss << ", type=";
-  ss << this->event_type_;
+  ss << event_type_;
   ss << ", status=";
-  ss << this->event_status_;
+  ss << event_status_;
   ss << ", subscribeInfoList=[";
-  for (it = this->subscribe_list_.begin(); it != this->subscribe_list_.end(); ++it) {
+  for (it = subscribe_list_.begin(); it != subscribe_list_.end(); ++it) {
     if (count++ > 0) {
       ss << ",";
     }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.h
new file mode 100644
index 0000000..6117d65
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.h
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_META_INFO_H_
+#define TUBEMQ_CLIENT_META_INFO_H_
+
+#include <stdint.h>
+
+#include <list>
+#include <string>
+
+#include "flowctrl_def.h"
+
+namespace tubemq {
+
+using std::list;
+using std::map;
+using std::string;
+
+class NodeInfo {
+ public:
+  NodeInfo();
+  NodeInfo(bool is_broker, const string& node_info);
+  NodeInfo(const string& node_host, uint32_t node_port);
+  NodeInfo(int32_t node_id, const string& node_host, uint32_t node_port);
+  ~NodeInfo();
+  NodeInfo& operator=(const NodeInfo& target);
+  bool operator==(const NodeInfo& target);
+  bool operator<(const NodeInfo& target) const;
+  const uint32_t GetNodeId() const;
+  const string& GetHost() const;
+  const uint32_t GetPort() const;
+  const string& GetAddrInfo() const;
+  const string& GetNodeInfo() const;
+
+ private:
+  void buildStrInfo();
+
+ private:
+  uint32_t node_id_;
+  string node_host_;
+  uint32_t node_port_;
+  // ip:port
+  string addr_info_;
+  // id:ip:port
+  string node_info_;
+};
+
+class Partition {
+ public:
+  Partition();
+  explicit Partition(const string& partition_info);
+  Partition(const NodeInfo& broker_info, const string& part_str);
+  Partition(const NodeInfo& broker_info, const string& topic, uint32_t partition_id);
+  ~Partition();
+  Partition& operator=(const Partition& target);
+  bool operator==(const Partition& target);
+  const uint32_t GetBrokerId() const;
+  const string& GetBrokerHost() const;
+  const uint32_t GetBrokerPort() const;
+  const string& GetPartitionKey() const;
+  const string& GetTopic() const;
+  const NodeInfo& GetBrokerInfo() const;
+  const uint32_t GetPartitionId() const;
+  const string& ToString() const;
+
+ private:
+  void buildPartitionKey();
+
+ private:
+  string topic_;
+  NodeInfo broker_info_;
+  uint32_t partition_id_;
+  string partition_key_;
+  string partition_info_;
+};
+
+class PartitionExt : public Partition {
+ public:
+  PartitionExt();
+  explicit PartitionExt(const string& partition_info);
+  PartitionExt(const NodeInfo& broker_info, const string& part_str);
+  ~PartitionExt();
+  PartitionExt& operator=(const PartitionExt& target);
+  void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit,
+    int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow);
+  int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+    const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed);
+  int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+    const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed,
+    int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t rsp_dlt_limit,
+    int64_t last_datadlt, bool require_slow);
+  void SetLastConsumed(bool last_consumed);
+  bool IsLastConsumed() const;
+
+ private:
+  void resetParameters();
+  void updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_handler,
+    const FlowCtrlRuleHandler& group_flowctrl_handler, int32_t msg_size, int64_t last_datadlt);
+
+ private:
+  bool is_last_consumed_;
+  FlowCtrlResult cur_flowctrl_;
+  FlowCtrlItem cur_freqctrl_;
+  int64_t next_stage_updtime_;
+  int64_t next_slice_updtime_;
+  int64_t limit_slice_msgsize_;
+  int64_t cur_stage_msgsize_;
+  int64_t cur_slice_msgsize_;
+  int32_t total_zero_cnt_;
+  int64_t booked_time_;
+  int32_t booked_errcode_;
+  bool    booked_esc_limit_;
+  int32_t booked_msgsize_;
+  int64_t booked_dlt_limit_;
+  int64_t booked_curdata_dlt_;
+  bool    booked_require_slow_;
+};
+
+class SubscribeInfo {
+ public:
+  SubscribeInfo();
+  explicit SubscribeInfo(const string& sub_info);
+  SubscribeInfo(const string& consumer_id,
+        const string& group_name, const PartitionExt& partition_ext);
+  SubscribeInfo& operator=(const SubscribeInfo& target);
+  const string& GetConsumerId() const;
+  const string& GetGroup() const;
+  const PartitionExt& GetPartitionExt() const;
+  const uint32_t GgetBrokerId() const;
+  const string& GetBrokerHost() const;
+  const uint32_t GetBrokerPort() const;
+  const string& GetTopic() const;
+  const uint32_t GetPartitionId() const;
+  const string& ToString() const;
+
+ private:
+  void buildSubInfo();
+
+ private:
+  string consumer_id_;
+  string group_;
+  PartitionExt partitionext_;
+  string sub_info_;
+};
+
+class ConsumerEvent {
+ public:
+  ConsumerEvent();
+  ConsumerEvent(const ConsumerEvent& target);
+  ConsumerEvent(int64_t rebalance_id, int32_t event_type,
+                const list<SubscribeInfo>& subscribeInfo_lst, int32_t event_status);
+  ConsumerEvent& operator=(const ConsumerEvent& target);
+  const int64_t GetRebalanceId() const;
+  const int32_t GetEventType() const;
+  const int32_t GetEventStatus() const;
+  void SetEventType(int32_t event_type);
+  void SetEventStatus(int32_t event_status);
+  const list<SubscribeInfo>& GetSubscribeInfoList() const;
+  string ToString();
+
+ private:
+  int64_t rebalance_id_;
+  int32_t event_type_;
+  int32_t event_status_;
+  list<SubscribeInfo> subscribe_list_;
+};
+
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_META_INFO_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/noncopyable.h b/tubemq-client-twins/tubemq-client-cpp/src/noncopyable.h
new file mode 100644
index 0000000..9afbf52
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/noncopyable.h
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBUMQ_NONCOPYABLE_H
+#define _TUBUMQ_NONCOPYABLE_H
+
+namespace tubemq {
+
+class noncopyable {
+ public:
+  noncopyable(const noncopyable&) = delete;
+  void operator=(const noncopyable&) = delete;
+
+ protected:
+  noncopyable() = default;
+  ~noncopyable() = default;
+};
+
+}  // namespace tubemq
+
+#endif  // _TUBUMQ_NONCOPYABLE_H
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index f97535c..8bc9067 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -17,75 +17,68 @@
  * under the License.
  */
 
-#include "tubemq/rmt_data_cache.h"
+#include "rmt_data_cache.h"
 
 #include <stdlib.h>
-#include <string>
-
-#include "tubemq/client_service.h"
-#include "tubemq/const_config.h"
-#include "tubemq/meta_info.h"
-#include "tubemq/utils.h"
 
+#include <string>
 
+#include "client_service.h"
+#include "const_config.h"
+#include "logger.h"
+#include "meta_info.h"
+#include "utils.h"
 
 namespace tubemq {
 
 using std::lock_guard;
 using std::unique_lock;
-using namespace std::placeholders;
-
 
 RmtDataCacheCsm::RmtDataCacheCsm() {
   under_groupctrl_.Set(false);
   last_checktime_.Set(0);
+  cur_part_cnt_.Set(0);
 }
 
 RmtDataCacheCsm::~RmtDataCacheCsm() {
-  // 
+  //
 }
 
-void RmtDataCacheCsm::SetConsumerInfo(const string& client_id,
-                                            const string& group_name) {
+void RmtDataCacheCsm::SetConsumerInfo(const string& client_id, const string& group_name) {
   consumer_id_ = client_id;
   group_name_ = group_name;
 }
 
-void RmtDataCacheCsm::UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
-                                                 const string& flowctrl_info) {
+void RmtDataCacheCsm::UpdateDefFlowCtrlInfo(int64_t flowctrl_id, const string& flowctrl_info) {
   if (flowctrl_id != def_flowctrl_handler_.GetFlowCtrlId()) {
-    def_flowctrl_handler_.UpdateDefFlowCtrlInfo(true,
-      tb_config::kInvalidValue, flowctrl_id, flowctrl_info);
+    def_flowctrl_handler_.UpdateDefFlowCtrlInfo(true, tb_config::kInvalidValue, flowctrl_id,
+                                                flowctrl_info);
   }
 }
-void RmtDataCacheCsm::UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
-                             int64_t flowctrl_id, const string& flowctrl_info) {
+void RmtDataCacheCsm::UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id, int64_t flowctrl_id,
+                                              const string& flowctrl_info) {
   if (flowctrl_id != group_flowctrl_handler_.GetFlowCtrlId()) {
-    group_flowctrl_handler_.UpdateDefFlowCtrlInfo(false,
-                qyrpriority_id, flowctrl_id, flowctrl_info);
+    group_flowctrl_handler_.UpdateDefFlowCtrlInfo(false, qyrpriority_id, flowctrl_id,
+                                                  flowctrl_info);
   }
   if (qyrpriority_id != group_flowctrl_handler_.GetQryPriorityId()) {
-    this->group_flowctrl_handler_.SetQryPriorityId(qyrpriority_id);
+    group_flowctrl_handler_.SetQryPriorityId(qyrpriority_id);
   }
   // update current if under group flowctrl
   int64_t cur_time = Utils::GetCurrentTimeMillis();
   if (cur_time - last_checktime_.Get() > 10000) {
     FlowCtrlResult flowctrl_result;
-    this->under_groupctrl_.Set(
-      group_flowctrl_handler_.GetCurDataLimit(
-        tb_config::kMaxLongValue, flowctrl_result));
+    under_groupctrl_.Set(
+        group_flowctrl_handler_.GetCurDataLimit(tb_config::kMaxLongValue, flowctrl_result));
     last_checktime_.Set(cur_time);
   }
 }
 
 const int64_t RmtDataCacheCsm::GetGroupQryPriorityId() const {
-  return this->group_flowctrl_handler_.GetQryPriorityId();
-}
-
-bool RmtDataCacheCsm::IsUnderGroupCtrl() {
-  return this->under_groupctrl_.Get();
+  return group_flowctrl_handler_.GetQryPriorityId();
 }
 
+bool RmtDataCacheCsm::IsUnderGroupCtrl() { return under_groupctrl_.Get(); }
 
 void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
   //
@@ -99,6 +92,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
   lock_guard<mutex> lck(meta_lock_);
   it_map = partitions_.find(partition_key);
   if (it_map == partitions_.end()) {
+    cur_part_cnt_.GetAndIncrement();
     partitions_[partition_key] = partition_ext;
     it_topic = topic_partition_.find(partition_ext.GetTopic());
     if (it_topic == topic_partition_.end()) {
@@ -126,8 +120,23 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
   resetIdlePartition(partition_key, true);
 }
 
-bool RmtDataCacheCsm::SelectPartition(string &err_info,
-                        PartitionExt& partition_ext, string& confirm_context) {
+int32_t RmtDataCacheCsm::GetCurConsumeStatus() {
+  lock_guard<mutex> lck(meta_lock_);
+  if (partitions_.empty()) {
+    return err_code::kErrNoPartAssigned;
+  }
+  if (index_partitions_.empty()) {
+    if (partition_useds_.empty()) {
+      return err_code::kErrAllPartInUse;
+    } else {
+      return err_code::kErrAllPartWaiting;
+    }
+  }
+  return err_code::kErrSuccess;
+}
+
+bool RmtDataCacheCsm::SelectPartition(int32_t& error_code, string& err_info,
+                                      PartitionExt& partition_ext, string& confirm_context) {
   bool result = false;
   int64_t booked_time = 0;
   string partition_key;
@@ -135,14 +144,22 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
   // lock operate
   lock_guard<mutex> lck(meta_lock_);
   if (partitions_.empty()) {
+    error_code = err_code::kErrNoPartAssigned;
     err_info = "No partition info in local cache, please retry later!";
     result = false;
   } else {
     if (index_partitions_.empty()) {
-      err_info = "No idle partition to consume, please retry later!";
+      if (partition_useds_.empty()) {
+        error_code = err_code::kErrAllPartInUse;
+        err_info = "No idle partition to consume, please retry later!";
+      } else {
+        error_code = err_code::kErrAllPartWaiting;
+        err_info = "All partitions reach max position, please retry later!";
+      }
       result = false;
     } else {
       result = false;
+      error_code = err_code::kErrAllPartInUse;
       err_info = "No idle partition to consume data 2, please retry later!";
       booked_time = Utils::GetCurrentTimeMillis();
       partition_key = index_partitions_.front();
@@ -160,10 +177,19 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
   return result;
 }
 
-void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
-                     int64_t curr_offset, int32_t err_code, bool esc_limit,
-                  int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt,
-                     bool require_slow) {
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t curr_offset) {
+  map<string, PartitionExt>::iterator it_part;
+  // book partition offset info
+  if (curr_offset >= 0) {
+    lock_guard<mutex> lck1(data_book_mutex_);
+    partition_offset_[partition_key] = curr_offset;
+  }
+}
+
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t curr_offset,
+                                        int32_t error_code, bool esc_limit, int32_t msg_size,
+                                        int64_t limit_dlt, int64_t cur_data_dlt,
+                                        bool require_slow) {
   map<string, PartitionExt>::iterator it_part;
   // book partition offset info
   if (curr_offset >= 0) {
@@ -174,44 +200,53 @@ void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
   lock_guard<mutex> lck2(meta_lock_);
   it_part = partitions_.find(partition_key);
   if (it_part != partitions_.end()) {
-    it_part->second.BookConsumeData(err_code, msg_size,
-              esc_limit, limit_dlt, cur_data_dlt, require_slow);
+    it_part->second.BookConsumeData(error_code, msg_size, esc_limit, limit_dlt, cur_data_dlt,
+                                    require_slow);
+  }
+}
+
+bool RmtDataCacheCsm::IsPartitionInUse(string partition_key, int64_t used_time) {
+  map<string, int64_t>::iterator it_used;
+  lock_guard<mutex> lck(meta_lock_);
+  it_used = partition_useds_.find(partition_key);
+  if (it_used == partition_useds_.end() || it_used->second != used_time) {
+    return false;
   }
+  return true;
 }
 
 // success process release partition
-bool RmtDataCacheCsm::RelPartition(string &err_info, bool filter_consume,
-                                 const string& confirm_context, bool is_consumed) {
+bool RmtDataCacheCsm::RelPartition(string& err_info, bool filter_consume,
+                                   const string& confirm_context, bool is_consumed) {
   return inRelPartition(err_info, true, filter_consume, confirm_context, is_consumed);
 }
 
 // release partiton without response return
-bool RmtDataCacheCsm::RelPartition(string &err_info,
-                                 const string& confirm_context, bool is_consumed) {
+bool RmtDataCacheCsm::RelPartition(string& err_info, const string& confirm_context,
+                                   bool is_consumed) {
   return inRelPartition(err_info, true, false, confirm_context, is_consumed);
 }
 
 // release partiton with error response return
-bool RmtDataCacheCsm::RelPartition(string &err_info, bool filter_consume,
-                              const string& confirm_context, bool is_consumed,
-                              int64_t curr_offset, int32_t err_code, bool esc_limit,
-                              int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt) {
+bool RmtDataCacheCsm::RelPartition(string& err_info, bool filter_consume,
+                                   const string& confirm_context, bool is_consumed,
+                                   int64_t curr_offset, int32_t error_code, bool esc_limit,
+                                   int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt) {
   int64_t booked_time;
-  string  partition_key;
+  string partition_key;
   // parse confirm context
-  bool result = parseConfirmContext(err_info,
-                      confirm_context, partition_key, booked_time);
+  bool result = parseConfirmContext(err_info, confirm_context, partition_key, booked_time);
   if (!result) {
     return false;
   }
-  BookedPartionInfo(partition_key, curr_offset, err_code,
-            esc_limit, msg_size, limit_dlt, cur_data_dlt, false);
-  return inRelPartition(err_info, true,
-    filter_consume, confirm_context, is_consumed);
+  BookedPartionInfo(partition_key, curr_offset, error_code, esc_limit, msg_size, limit_dlt,
+                    cur_data_dlt, false);
+  return inRelPartition(err_info, true, filter_consume, confirm_context, is_consumed);
 }
 
 void RmtDataCacheCsm::FilterPartitions(const list<SubscribeInfo>& subscribe_info_lst,
-            list<PartitionExt>& subscribed_partitions, list<PartitionExt>& unsub_partitions) {
+                                       list<PartitionExt>& subscribed_partitions,
+                                       list<PartitionExt>& unsub_partitions) {
   //
   map<string, PartitionExt>::iterator it_part;
   list<SubscribeInfo>::const_iterator it_lst;
@@ -244,8 +279,7 @@ void RmtDataCacheCsm::GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst)
   }
 }
 
-void RmtDataCacheCsm::GetAllBrokerPartitions(
-                    map<NodeInfo, list<PartitionExt> >& broker_parts) {
+void RmtDataCacheCsm::GetAllClosedBrokerParts(map<NodeInfo, list<PartitionExt> >& broker_parts) {
   map<string, PartitionExt>::iterator it_part;
   map<NodeInfo, list<PartitionExt> >::iterator it_broker;
 
@@ -287,7 +321,7 @@ void RmtDataCacheCsm::GetRegBrokers(list<NodeInfo>& brokers) {
 }
 
 void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
-                                            list<PartitionExt>& partition_list) {
+                                           list<PartitionExt>& partition_list) {
   set<string>::iterator it_key;
   map<NodeInfo, set<string> >::iterator it_broker;
   map<string, PartitionExt>::iterator it_part;
@@ -296,8 +330,7 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
   lock_guard<mutex> lck(meta_lock_);
   it_broker = broker_partition_.find(broker_info);
   if (it_broker != broker_partition_.end()) {
-    for (it_key = it_broker->second.begin();
-    it_key != it_broker->second.end(); it_key++) {
+    for (it_key = it_broker->second.begin(); it_key != it_broker->second.end(); it_key++) {
       it_part = partitions_.find(*it_key);
       if (it_part != partitions_.end()) {
         partition_list.push_back(it_part->second);
@@ -306,7 +339,6 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
   }
 }
 
-
 void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, int64_t> part_offset_map) {
   map<string, int64_t>::iterator it;
 
@@ -317,16 +349,13 @@ void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, int64_t> part_offset_ma
   }
 }
 
-
 //
-bool RmtDataCacheCsm::RemovePartition(string &err_info,
-                                  const string& confirm_context) {
+bool RmtDataCacheCsm::RemovePartition(string& err_info, const string& confirm_context) {
   int64_t booked_time;
-  string  partition_key;
+  string partition_key;
   set<string> partition_keys;
   // parse confirm context
-  bool result = parseConfirmContext(err_info,
-                      confirm_context, partition_key, booked_time);
+  bool result = parseConfirmContext(err_info, confirm_context, partition_key, booked_time);
   if (!result) {
     return false;
   }
@@ -360,7 +389,8 @@ void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) {
 }
 
 void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe_infos,
-        bool is_processing_rollback, map<NodeInfo, list<PartitionExt> >& broker_parts) {
+                                            bool is_processing_rollback,
+                                            map<NodeInfo, list<PartitionExt> >& broker_parts) {
   //
   string part_key;
   list<SubscribeInfo>::const_iterator it;
@@ -398,29 +428,56 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
   }
 }
 
+void RmtDataCacheCsm::handleExpiredPartitions(int64_t max_wait_period_ms) {
+  int64_t curr_time;
+  set<string> expired_keys;
+  set<string>::iterator it_lst;
+  map<string, int64_t>::iterator it_used;
+  map<string, PartitionExt>::iterator it_map;
+
+  lock_guard<mutex> lck(meta_lock_);
+  if (!partition_useds_.empty()) {
+    curr_time = Utils::GetCurrentTimeMillis();
+    for (it_used = partition_useds_.begin();
+      it_used != partition_useds_.end(); ++it_used) {
+      if (curr_time - it_used->second > max_wait_period_ms) {
+        expired_keys.insert(it_used->first);
+        it_map = partitions_.find(it_used->first);
+        if (it_map != partitions_.end()) {
+          it_map->second.SetLastConsumed(false);
+        }
+      }
+    }
+    if (!expired_keys.empty()) {
+      for (it_lst = expired_keys.begin();
+        it_lst != expired_keys.end(); it_lst++) {
+        resetIdlePartition(*it_lst, true);
+      }
+    }
+  }
+}
+
 
 
 bool RmtDataCacheCsm::IsPartitionFirstReg(const string& partition_key) {
-  bool result = false;
   map<string, bool>::iterator it;
-
   lock_guard<mutex> lck(data_book_mutex_);
   it = part_reg_booked_.find(partition_key);
   if (it == part_reg_booked_.end()) {
     part_reg_booked_[partition_key] = true;
   }
-  return result;
+  return part_reg_booked_[partition_key];
 }
 
 void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
   unique_lock<mutex> lck(event_read_mutex_);
-  this->rebalance_events_.push_back(event);
+  rebalance_events_.push_back(event);
   event_read_cond_.notify_all();
 }
 
 void RmtDataCacheCsm::TakeEvent(ConsumerEvent& event) {
   unique_lock<mutex> lck(event_read_mutex_);
-  while (this->rebalance_events_.empty()) {
+  while (rebalance_events_.empty()) {
     event_read_cond_.wait(lck);
   }
   event = rebalance_events_.front();
@@ -434,41 +491,65 @@ void RmtDataCacheCsm::ClearEvent() {
 
 void RmtDataCacheCsm::OfferEventResult(const ConsumerEvent& event) {
   lock_guard<mutex> lck(event_write_mutex_);
-  this->rebalance_events_.push_back(event);
+  rebalance_results_.push_back(event);
 }
 
 bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
   bool result = false;
   lock_guard<mutex> lck(event_write_mutex_);
-  if (!rebalance_events_.empty()) {
-    event = rebalance_events_.front();
-    rebalance_events_.pop_front();
+  if (!rebalance_results_.empty()) {
+    event = rebalance_results_.front();
+    rebalance_results_.pop_front();
     result = true;
   }
   return result;
 }
 
-void RmtDataCacheCsm::HandleTimeout(const string partition_key,
-                                          const asio::error_code& error) {
+void RmtDataCacheCsm::HandleTimeout(const string partition_key, const asio::error_code& error) {
   if (!error) {
     lock_guard<mutex> lck(meta_lock_);
     resetIdlePartition(partition_key, true);
   }
 }
 
+int RmtDataCacheCsm::IncrAndGetHBError(NodeInfo broker) {
+  int count = 0;
+  map<NodeInfo, int>::iterator it_map;
+  lock_guard<mutex> lck(status_mutex_);
+  it_map = broker_status_.find(broker);
+  if (it_map == broker_status_.end()) {
+    broker_status_[broker] = 1;
+    count = 1;
+  } else {
+    count = ++it_map->second;
+  }
+  return count;
+}
+
+void RmtDataCacheCsm::ResetHBError(NodeInfo broker) {
+  map<NodeInfo, int>::iterator it_map;
+  lock_guard<mutex> lck(status_mutex_);
+  it_map = broker_status_.find(broker);
+  if (it_map != broker_status_.end()) {
+    it_map->second = 0;
+  }
+}
+
+
 void RmtDataCacheCsm::addDelayTimer(const string& partition_key, int64_t delay_time) {
   // add timer
-  tuple<int64_t, SteadyTimerPtr> timer = 
-      std::make_tuple(Utils::GetCurrentTimeMillis(),
-      TubeMQService::Instance().GetTimerExecutorPool().Get()->CreateSteadyTimer());
+  tuple<int64_t, SteadyTimerPtr> timer = std::make_tuple(
+      Utils::GetCurrentTimeMillis(),
+      TubeMQService::Instance()->CreateTimer());
   std::get<1>(timer)->expires_after(std::chrono::milliseconds(delay_time));
-  std::get<1>(timer)->async_wait(std::bind(&RmtDataCacheCsm::HandleTimeout, this, partition_key, _1));
-  partition_timeouts_.insert(std::make_pair(partition_key, timer));          
+  std::get<1>(timer)->async_wait(
+      std::bind(&RmtDataCacheCsm::HandleTimeout, this, partition_key, std::placeholders::_1));
+  partition_timeouts_.insert(std::make_pair(partition_key, timer));
 }
 
 void RmtDataCacheCsm::resetIdlePartition(const string& partition_key, bool need_reuse) {
   map<string, PartitionExt>::iterator it_map;
-  map<string, tuple<int64_t, SteadyTimerPtr> >::iterator it_timeout; 
+  map<string, tuple<int64_t, SteadyTimerPtr> >::iterator it_timeout;
   partition_useds_.erase(partition_key);
   it_timeout = partition_timeouts_.find(partition_key);
   if (it_timeout != partition_timeouts_.end()) {
@@ -483,16 +564,16 @@ void RmtDataCacheCsm::resetIdlePartition(const string& partition_key, bool need_
   }
 }
 
-void RmtDataCacheCsm::buildConfirmContext(const string& partition_key,
-                                   int64_t booked_time, string& confirm_context) {
+void RmtDataCacheCsm::buildConfirmContext(const string& partition_key, int64_t booked_time,
+                                          string& confirm_context) {
   confirm_context.clear();
   confirm_context += partition_key;
   confirm_context += delimiter::kDelimiterAt;
   confirm_context += Utils::Long2str(booked_time);
 }
 
-bool RmtDataCacheCsm::parseConfirmContext(string &err_info,
-     const string& confirm_context, string& partition_key, int64_t& booked_time) {
+bool RmtDataCacheCsm::parseConfirmContext(string& err_info, const string& confirm_context,
+                                          string& partition_key, int64_t& booked_time) {
   //
   vector<string> result;
   Utils::Split(confirm_context, result, delimiter::kDelimiterAt);
@@ -528,19 +609,19 @@ void RmtDataCacheCsm::rmvMetaInfo(const string& partition_key) {
     }
     partitions_.erase(partition_key);
     part_subinfo_.erase(partition_key);
+    cur_part_cnt_.DecrementAndGet();
   }
 }
 
-bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
-                     bool filter_consume, const string& confirm_context, bool is_consumed) {
+bool RmtDataCacheCsm::inRelPartition(string& err_info, bool need_delay_check, bool filter_consume,
+                                     const string& confirm_context, bool is_consumed) {
   int64_t delay_time;
   int64_t booked_time;
-  string  partition_key;
+  string partition_key;
   map<string, PartitionExt>::iterator it_part;
   map<string, int64_t>::iterator it_used;
-  // parse confirm context  
-  bool result = parseConfirmContext(err_info,
-                      confirm_context, partition_key, booked_time);
+  // parse confirm context
+  bool result = parseConfirmContext(err_info, confirm_context, partition_key, booked_time);
   if (!result) {
     return false;
   }
@@ -565,8 +646,8 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
         index_partitions_.remove(partition_key);
         delay_time = 0;
         if (need_delay_check) {
-          delay_time = it_part->second.ProcConsumeResult(def_flowctrl_handler_,
-                        group_flowctrl_handler_, filter_consume, is_consumed);
+          delay_time = it_part->second.ProcConsumeResult(
+              def_flowctrl_handler_, group_flowctrl_handler_, filter_consume, is_consumed);
         }
         if (delay_time > 10) {
           addDelayTimer(partition_key, delay_time);
@@ -585,5 +666,4 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
   return result;
 }
 
-
 }  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.h
new file mode 100644
index 0000000..5b77eb7
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.h
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
+#define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
+
+#include <stdint.h>
+
+#include <condition_variable>
+#include <list>
+#include <map>
+#include <mutex>
+#include <set>
+#include <string>
+#include <tuple>
+
+#include "executor_pool.h"
+#include "flowctrl_def.h"
+#include "meta_info.h"
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_errcode.h"
+
+
+
+
+
+namespace tubemq {
+
+using std::condition_variable;
+using std::map;
+using std::set;
+using std::list;
+using std::mutex;
+using std::string;
+using std::tuple;
+
+
+
+// consumer remote data cache
+class RmtDataCacheCsm {
+ public:
+  RmtDataCacheCsm();
+  ~RmtDataCacheCsm();
+  void SetConsumerInfo(const string& client_id, const string& group_name);
+  void UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
+                                     const string& flowctrl_info);
+  void UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
+                 int64_t flowctrl_id, const string& flowctrl_info);
+  const int64_t GetGroupQryPriorityId() const;
+  const int64_t GetDefFlowCtrlId() const { return def_flowctrl_handler_.GetFlowCtrlId(); }
+  const int64_t GetGroupFlowCtrlId() const { return group_flowctrl_handler_.GetFlowCtrlId(); }
+  bool IsUnderGroupCtrl();
+  int32_t GetCurConsumeStatus();
+  void handleExpiredPartitions(int64_t max_wait_period_ms);
+  int32_t GetCurPartCount() const { return cur_part_cnt_.Get(); }
+  bool IsPartitionInUse(string partition_key, int64_t used_time);
+  void AddNewPartition(const PartitionExt& partition_ext);
+  bool SelectPartition(int32_t& error_code, string &err_info,
+           PartitionExt& partition_ext, string& confirm_context);
+  void BookedPartionInfo(const string& partition_key, int64_t curr_offset);
+  void BookedPartionInfo(const string& partition_key, int64_t curr_offset,
+                            int32_t error_code, bool esc_limit, int32_t msg_size,
+                            int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow);
+  bool RelPartition(string &err_info, bool filter_consume,
+                         const string& confirm_context, bool is_consumed);
+  bool RelPartition(string &err_info, const string& confirm_context, bool is_consumed);
+  bool RelPartition(string &err_info, bool filter_consume,
+                         const string& confirm_context, bool is_consumed,
+                         int64_t curr_offset, int32_t error_code, bool esc_limit,
+                         int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt);
+  void FilterPartitions(const list<SubscribeInfo>& subscribe_info_lst,
+          list<PartitionExt>& subscribed_partitions, list<PartitionExt>& unsub_partitions);
+  void GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst);
+  bool GetPartitionExt(const string& part_key, PartitionExt& partition_ext);
+  void GetRegBrokers(list<NodeInfo>& brokers);
+  void GetPartitionByBroker(const NodeInfo& broker_info,
+                                    list<PartitionExt>& partition_list);
+  void GetCurPartitionOffsets(map<string, int64_t> part_offset_map);
+  void GetAllClosedBrokerParts(map<NodeInfo, list<PartitionExt> >& broker_parts);
+  void RemovePartition(const list<PartitionExt>& partition_list);
+  void RemovePartition(const set<string>& partition_keys);
+  bool RemovePartition(string &err_info, const string& confirm_context);
+  void RemoveAndGetPartition(const list<SubscribeInfo>& subscribe_infos,
+        bool is_processing_rollback, map<NodeInfo, list<PartitionExt> >& broker_parts);
+  bool IsPartitionFirstReg(const string& partition_key);
+  void OfferEvent(const ConsumerEvent& event);
+  void TakeEvent(ConsumerEvent& event);
+  void ClearEvent();
+  void OfferEventResult(const ConsumerEvent& event);
+  bool PollEventResult(ConsumerEvent& event);
+  void HandleTimeout(const string partition_key, const asio::error_code& error);
+  int IncrAndGetHBError(NodeInfo broker);
+  void ResetHBError(NodeInfo broker);
+
+ private:
+  void addDelayTimer(const string& part_key, int64_t delay_time);
+  void resetIdlePartition(const string& partition_key, bool need_reuse);
+  void rmvMetaInfo(const string& partition_key);
+  void buildConfirmContext(const string& partition_key,
+                    int64_t booked_time, string& confirm_context);
+  bool parseConfirmContext(string &err_info,
+    const string& confirm_context, string& partition_key, int64_t& booked_time);
+  bool inRelPartition(string &err_info, bool need_delay_check,
+    bool filter_consume, const string& confirm_context, bool is_consumed);
+
+ private:
+  //
+  string consumer_id_;
+  string group_name_;
+  // flow ctrl
+  AtomicInteger cur_part_cnt_;
+  FlowCtrlRuleHandler group_flowctrl_handler_;
+  FlowCtrlRuleHandler def_flowctrl_handler_;
+  AtomicBoolean under_groupctrl_;
+  AtomicLong last_checktime_;
+  // meta info
+  mutable mutex meta_lock_;
+  // partiton allocated map
+  map<string, PartitionExt> partitions_;
+  // topic partiton map
+  map<string, set<string> > topic_partition_;
+  // broker parition map
+  map<NodeInfo, set<string> > broker_partition_;
+  map<string, SubscribeInfo>  part_subinfo_;
+  // for idle partitions occupy
+  list<string> index_partitions_;
+  // for partition used map
+  map<string, int64_t> partition_useds_;
+  // for partiton timer map
+  map<string, tuple<int64_t, SteadyTimerPtr> > partition_timeouts_;
+  // data book
+  mutable mutex data_book_mutex_;
+  // for partition offset cache
+  map<string, int64_t> partition_offset_;
+  // for partiton register booked
+  map<string, bool> part_reg_booked_;
+  // event
+  mutable mutex event_read_mutex_;
+  condition_variable event_read_cond_;
+  list<ConsumerEvent> rebalance_events_;
+  mutable mutex event_write_mutex_;
+  list<ConsumerEvent> rebalance_results_;
+  // status check
+  mutable mutex status_mutex_;
+  map<NodeInfo, int> broker_status_;
+};
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/singleton.h
similarity index 50%
copy from tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
copy to tubemq-client-twins/tubemq-client-cpp/src/singleton.h
index 1adcdf8..c945005 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/singleton.h
@@ -17,52 +17,46 @@
  * under the License.
  */
 
-#ifndef _TUBEMQ_THREAD_POOL_
-#define _TUBEMQ_THREAD_POOL_
+#ifndef _TUBEMQ_SINGLETON_H
+#define _TUBEMQ_SINGLETON_H
 
+#include <assert.h>
 #include <stdlib.h>
 
-#include <asio.hpp>
-#include <asio/ssl.hpp>
-#include <chrono>
-#include <functional>
-#include <memory>
 #include <mutex>
 #include <thread>
-#include <vector>
 
 #include "noncopyable.h"
 
 namespace tubemq {
-// ThreadPool use one io_context for thread pool
-class ThreadPool : noncopyable {
+
+template <typename T>
+class Singleton : noncopyable {
  public:
-  explicit ThreadPool(std::size_t size)
-      : io_context_(size), work_(asio::make_work_guard(io_context_)) {
-    for (size_t i = 0; i < size; ++i) {
-      workers_.emplace_back([this] { io_context_.run(); });
-    }
+  static T& Instance() {
+    std::call_once(once_, Singleton::init);
+    assert(value_ != nullptr);
+    return *value_;
   }
 
-  ~ThreadPool() {
-    work_.reset();
-    io_context_.stop();
-    for (std::thread &worker : workers_) {
-      worker.join();
-    }
-    workers_.clear();
-  }
+ protected:
+  Singleton() {}
+  ~Singleton() {}
 
-  template <class function>
-  void Post(function f) {
-    io_context_.post(f);
-  }
+ private:
+  static void init() { value_ = new T(); }
 
  private:
-  asio::io_context io_context_;
-  using io_context_work = asio::executor_work_guard<asio::io_context::executor_type>;
-  io_context_work work_;
-  std::vector<std::thread> workers_;
-};  // namespace tubemq
+  static std::once_flag once_;
+  static T* value_;
+};
+
+template <typename T>
+std::once_flag Singleton<T>::once_;
+
+template <typename T>
+T* Singleton<T>::value_ = nullptr;
+
 }  // namespace tubemq
-#endif  // _TUBEMQ_THREAD_POOL_
+
+#endif  // _TUBEMQ_SINGLETON_H
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
index 1adcdf8..803eb15 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
@@ -22,8 +22,7 @@
 
 #include <stdlib.h>
 
-#include <asio.hpp>
-#include <asio/ssl.hpp>
+
 #include <chrono>
 #include <functional>
 #include <memory>
@@ -31,6 +30,8 @@
 #include <thread>
 #include <vector>
 
+#include <asio.hpp>
+#include <asio/ssl.hpp>
 #include "noncopyable.h"
 
 namespace tubemq {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc
new file mode 100644
index 0000000..9ba1aef
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tubemq/tubemq_client.h"
+
+#include <signal.h>
+#include <unistd.h>
+
+#include <sstream>
+
+#include "baseconsumer.h"
+#include "client_service.h"
+#include "const_config.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+
+
+
+namespace tubemq {
+
+using std::lock_guard;
+using std::stringstream;
+
+bool StartTubeMQService(string& err_info, const string& conf_file) {
+  signal(SIGPIPE, SIG_IGN);
+  return TubeMQService::Instance()->Start(err_info, conf_file);
+}
+
+bool StopTubeMQService(string& err_info) {
+  int32_t count = TubeMQService::Instance()->GetClientObjCnt();
+  if (count > 0) {
+    stringstream ss;
+    ss << "Check found ";
+    ss << count;
+    ss << " clients not shutdown, please shutdown clients first!";
+    err_info = ss.str();
+    return false;
+  }
+  return TubeMQService::Instance()->Stop(err_info);
+}
+
+
+TubeMQConsumer::TubeMQConsumer() {
+  client_id_ = tb_config::kInvalidValue;
+  status_.Set(0);
+}
+
+TubeMQConsumer::~TubeMQConsumer() {
+  ShutDown();
+}
+
+bool TubeMQConsumer::Start(string& err_info,
+  const ConsumerConfig& config) {
+  if (!TubeMQService::Instance()->IsRunning()) {
+    err_info = "TubeMQ Service not startted!";
+    return false;
+  }
+  // check status
+  if (!status_.CompareAndSet(0, 1)) {
+    err_info = "Duplicated call!";
+    return false;
+  }
+  BaseConsumer* rmt_client = new BaseConsumer();
+  if (rmt_client == NULL) {
+    err_info = "No memory for create CONSUMER remote object!";
+    return false;
+  }
+  if (!rmt_client->Start(err_info, config)) {
+    rmt_client->ShutDown();
+    delete rmt_client;
+    return false;
+  }
+  client_id_ = rmt_client->GetClientIndex();
+  status_.Set(2);
+  err_info = "Ok!";
+  return true;
+}
+
+void TubeMQConsumer::ShutDown() {
+  if (!status_.CompareAndSet(2, 0)) {
+    return;
+  }
+  if (client_id_ != tb_config::kInvalidValue) {
+    BaseConsumer* rmt_client =
+      (BaseConsumer *)TubeMQService::Instance()->GetClientObj(client_id_);
+    if ((rmt_client != NULL)
+      && (rmt_client->GetClientIndex() == client_id_)) {
+      rmt_client->ShutDown();
+      delete rmt_client;
+    }
+    client_id_ = tb_config::kInvalidValue;
+  }
+}
+
+bool TubeMQConsumer::GetMessage(ConsumerResult& result) {
+  if (!TubeMQService::Instance()->IsRunning()) {
+    result.SetFailureResult(err_code::kErrMQServiceStop,
+      "TubeMQ Service stopped!");
+    return false;
+  }
+  if (status_.Get() != 2) {
+    result.SetFailureResult(err_code::kErrClientStop,
+      "TubeMQ Service not startted!");
+    return false;
+  }
+  if (client_id_ == tb_config::kInvalidValue) {
+    result.SetFailureResult(err_code::kErrClientStop,
+      "Tube client not call init function, please initial first!");
+    return false;
+  }
+  BaseConsumer* rmt_client =
+    (BaseConsumer*)TubeMQService::Instance()->GetClientObj(client_id_);
+  if ((rmt_client == NULL)
+    || (rmt_client->GetClientIndex() != client_id_)) {
+    result.SetFailureResult(err_code::kErrBadRequest,
+      "Rmt client CB has been released, please re-start this client");
+    return false;
+  }
+  return rmt_client->GetMessage(result);
+}
+
+bool TubeMQConsumer::Confirm(const string& confirm_context,
+  bool is_consumed, ConsumerResult& result) {
+  if (!TubeMQService::Instance()->IsRunning()) {
+    result.SetFailureResult(err_code::kErrMQServiceStop,
+      "TubeMQ Service stopped!");
+    return false;
+  }
+  if (status_.Get() != 2) {
+    result.SetFailureResult(err_code::kErrClientStop,
+      "TubeMQ Service not startted!");
+    return false;
+  }
+  if (client_id_ == tb_config::kInvalidValue) {
+    result.SetFailureResult(err_code::kErrClientStop,
+      "Tube client not call init function, please initial first!");
+    return false;
+  }
+  BaseConsumer* rmt_client =
+    (BaseConsumer*)TubeMQService::Instance()->GetClientObj(client_id_);
+  if ((rmt_client == NULL)
+    || (rmt_client->GetClientIndex() != client_id_)) {
+    result.SetFailureResult(err_code::kErrBadRequest,
+      "Rmt client CB has been released, please re-start this client");
+    return false;
+  }
+  return rmt_client->Confirm(confirm_context, is_consumed, result);
+}
+
+bool TubeMQConsumer::GetCurConsumedInfo(map<string, ConsumeOffsetInfo>& consume_info_map) {
+  if (!TubeMQService::Instance()->IsRunning()) {
+    return false;
+  }
+  if ((status_.Get() != 2)
+    || (client_id_ == tb_config::kInvalidValue)) {
+    return false;
+  }
+  BaseConsumer* rmt_client =
+    (BaseConsumer*)TubeMQService::Instance()->GetClientObj(client_id_);
+  if ((rmt_client == NULL)
+    || (rmt_client->GetClientIndex() != client_id_)) {
+    return false;
+  }
+  return rmt_client->GetCurConsumedInfo(consume_info_map);
+}
+
+}  // namespace tubemq
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
index 89759df..c186701 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
@@ -185,9 +185,10 @@ class TubeMQCodec final : public CodecProtocol {
   // return code: -1 failed; 0-Unfinished; > 0 package buffer size
   virtual int32_t Check(BufferPtr &in, Any &out, uint32_t &request_id, bool &has_request_id,
                         size_t &package_length) {
+    LOG_TRACE("check in:%s", in->String().c_str());
     // check package is valid
     if (in->length() < 12) {
-      package_length = 12;
+      // package_length = 12;
       LOG_TRACE("Check: data's length < 12, is %ld, out", in->length());
       return 0;
     }
@@ -206,13 +207,11 @@ class TubeMQCodec final : public CodecProtocol {
     }
     // check data list
     uint32_t item_len = 0;
+    // package_length = 12;
     auto check_buf = in->Slice();
     for (uint32_t i = 0; i < list_size; i++) {
+      // package_length += 4;
       if (check_buf->length() < 4) {
-        package_length += 4;
-        if (i > 0) {
-          package_length += i * rpc_config::kRpcMaxBufferSize;
-        }
         LOG_TRACE("Check: buffer Remaining length < 4, is %ld, out", check_buf->length());
         return 0;
       }
@@ -226,11 +225,8 @@ class TubeMQCodec final : public CodecProtocol {
                   rpc_config::kRpcMaxBufferSize);
         return -1;
       }
+      // package_length += item_len;
       if (item_len > check_buf->length()) {
-        package_length += 4 + item_len;
-        if (i > 0) {
-          package_length += i * rpc_config::kRpcMaxBufferSize;
-        }
         LOG_TRACE("Check: item_len(%d) > remaining length(%ld), out", item_len,
                   check_buf->length());
         return 0;
@@ -248,8 +244,8 @@ class TubeMQCodec final : public CodecProtocol {
       in->Skip(item_len);
     }
     out = buf;
-    LOG_TRACE("Check: received message check finished, request_id=%d, readed_len:%d",
-      request_id, readed_len);
+    LOG_TRACE("Check: received message check finished, request_id=%d, readed_len:%d", request_id,
+              readed_len);
     return readed_len;
   }
 
@@ -310,8 +306,7 @@ class TubeMQCodec final : public CodecProtocol {
     }
     return block_cnt;
   }
-
- };
+};
 }  // namespace tubemq
 #endif
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
index b976add..a2534ff 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
@@ -22,9 +22,9 @@
 #include <sstream>
 #include <vector>
 
-#include "tubemq/const_config.h"
-#include "tubemq/const_rpc.h"
-#include "tubemq/utils.h"
+#include "const_config.h"
+#include "const_rpc.h"
+#include "utils.h"
 
 namespace tubemq {
 
@@ -33,17 +33,17 @@ using std::stringstream;
 using std::vector;
 
 BaseConfig::BaseConfig() {
-  this->master_addrinfo_ = "";
-  this->auth_enable_ = false;
-  this->auth_usrname_ = "";
-  this->auth_usrpassword_ = "";
-  this->tls_enabled_ = false;
-  this->tls_trust_store_path_ = "";
-  this->tls_trust_store_password_ = "";
-  this->rpc_read_timeout_sec_ = rpc_config::kRpcTimoutDefSec;
-  this->heartbeat_period_sec_ = tb_config::kHeartBeatPeriodDef;
-  this->max_heartbeat_retry_times_ = tb_config::kHeartBeatFailRetryTimesDef;
-  this->heartbeat_period_afterfail_sec_ = tb_config::kHeartBeatSleepPeriodDef;
+  master_addrinfo_ = "";
+  auth_enable_ = false;
+  auth_usrname_ = "";
+  auth_usrpassword_ = "";
+  tls_enabled_ = false;
+  tls_trust_store_path_ = "";
+  tls_trust_store_password_ = "";
+  rpc_read_timeout_ms_ = tb_config::kRpcTimoutDefMs;
+  heartbeat_period_ms_ = tb_config::kHeartBeatPeriodDefMs;
+  max_heartbeat_retry_times_ = tb_config::kHeartBeatFailRetryTimesDef;
+  heartbeat_period_afterfail_ms_ = tb_config::kHeartBeatSleepPeriodDefMs;
 }
 
 BaseConfig::~BaseConfig() {
@@ -52,17 +52,17 @@ BaseConfig::~BaseConfig() {
 
 BaseConfig& BaseConfig::operator=(const BaseConfig& target) {
   if (this != &target) {
-    this->master_addrinfo_ = target.master_addrinfo_;
-    this->auth_enable_ = target.auth_enable_;
-    this->auth_usrname_ = target.auth_usrname_;
-    this->auth_usrpassword_ = target.auth_usrpassword_;
-    this->tls_enabled_ = target.tls_enabled_;
-    this->tls_trust_store_path_ = target.tls_trust_store_path_;
-    this->tls_trust_store_password_ = target.tls_trust_store_password_;
-    this->rpc_read_timeout_sec_ = target.rpc_read_timeout_sec_;
-    this->heartbeat_period_sec_ = target.heartbeat_period_sec_;
-    this->max_heartbeat_retry_times_ = target.max_heartbeat_retry_times_;
-    this->heartbeat_period_afterfail_sec_ = target.heartbeat_period_afterfail_sec_;
+    master_addrinfo_ = target.master_addrinfo_;
+    auth_enable_ = target.auth_enable_;
+    auth_usrname_ = target.auth_usrname_;
+    auth_usrpassword_ = target.auth_usrpassword_;
+    tls_enabled_ = target.tls_enabled_;
+    tls_trust_store_path_ = target.tls_trust_store_path_;
+    tls_trust_store_password_ = target.tls_trust_store_password_;
+    rpc_read_timeout_ms_ = target.rpc_read_timeout_ms_;
+    heartbeat_period_ms_ = target.heartbeat_period_ms_;
+    max_heartbeat_retry_times_ = target.max_heartbeat_retry_times_;
+    heartbeat_period_afterfail_ms_ = target.heartbeat_period_afterfail_ms_;
   }
   return *this;
 }
@@ -91,14 +91,14 @@ bool BaseConfig::SetMasterAddrInfo(string& err_info, const string& master_addrin
     err_info = "Illegal parameter: master_addrinfo is blank!";
     return false;
   }
-  this->master_addrinfo_ = trimed_master_addr_info;
+  master_addrinfo_ = trimed_master_addr_info;
   err_info = "Ok";
   return true;
 }
 
 bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable, const string& trust_store_path,
                             const string& trust_store_password) {
-  this->tls_enabled_ = tls_enable;
+  tls_enabled_ = tls_enable;
   if (tls_enable) {
     string trimed_trust_store_path = Utils::Trim(trust_store_path);
     if (trimed_trust_store_path.empty()) {
@@ -110,11 +110,11 @@ bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable, const string& tru
       err_info = "Illegal parameter: trust_store_password is empty!";
       return false;
     }
-    this->tls_trust_store_path_ = trimed_trust_store_path;
-    this->tls_trust_store_password_ = trimed_trust_store_password;
+    tls_trust_store_path_ = trimed_trust_store_path;
+    tls_trust_store_password_ = trimed_trust_store_password;
   } else {
-    this->tls_trust_store_path_ = "";
-    this->tls_trust_store_password_ = "";
+    tls_trust_store_path_ = "";
+    tls_trust_store_password_ = "";
   }
   err_info = "Ok";
   return true;
@@ -122,7 +122,7 @@ bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable, const string& tru
 
 bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable, const string& usr_name,
                                   const string& usr_password) {
-  this->auth_enable_ = authentic_enable;
+  auth_enable_ = authentic_enable;
   if (authentic_enable) {
     string trimed_usr_name = Utils::Trim(usr_name);
     if (trimed_usr_name.empty()) {
@@ -134,102 +134,103 @@ bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable, const
       err_info = "Illegal parameter: usr_password is empty!";
       return false;
     }
-    this->auth_usrname_ = trimed_usr_name;
-    this->auth_usrpassword_ = trimed_usr_password;
+    auth_usrname_ = trimed_usr_name;
+    auth_usrpassword_ = trimed_usr_password;
   } else {
-    this->auth_usrname_ = "";
-    this->auth_usrpassword_ = "";
+    auth_usrname_ = "";
+    auth_usrpassword_ = "";
   }
   err_info = "Ok";
   return true;
 }
 
-const string& BaseConfig::GetMasterAddrInfo() const { return this->master_addrinfo_; }
+const string& BaseConfig::GetMasterAddrInfo() const { return master_addrinfo_; }
 
-bool BaseConfig::IsTlsEnabled() { return this->tls_enabled_; }
+bool BaseConfig::IsTlsEnabled() { return tls_enabled_; }
 
-const string& BaseConfig::GetTrustStorePath() const { return this->tls_trust_store_path_; }
+const string& BaseConfig::GetTrustStorePath() const { return tls_trust_store_path_; }
 
-const string& BaseConfig::GetTrustStorePassword() const { return this->tls_trust_store_password_; }
+const string& BaseConfig::GetTrustStorePassword() const { return tls_trust_store_password_; }
 
-bool BaseConfig::IsAuthenticEnabled() { return this->auth_enable_; }
+bool BaseConfig::IsAuthenticEnabled() { return auth_enable_; }
 
-const string& BaseConfig::GetUsrName() const { return this->auth_usrname_; }
+const string& BaseConfig::GetUsrName() const { return auth_usrname_; }
 
-const string& BaseConfig::GetUsrPassWord() const { return this->auth_usrpassword_; }
+const string& BaseConfig::GetUsrPassWord() const { return auth_usrpassword_; }
 
-void BaseConfig::SetRpcReadTimeoutSec(int rpc_read_timeout_sec) {
-  if (rpc_read_timeout_sec >= rpc_config::kRpcTimoutMaxSec) {
-    this->rpc_read_timeout_sec_ = rpc_config::kRpcTimoutMaxSec;
-  } else if (rpc_read_timeout_sec <= rpc_config::kRpcTimoutMinSec) {
-    this->rpc_read_timeout_sec_ = rpc_config::kRpcTimoutMinSec;
+void BaseConfig::SetRpcReadTimeoutMs(int rpc_read_timeout_ms) {
+  if (rpc_read_timeout_ms >= tb_config::kRpcTimoutMaxMs) {
+    rpc_read_timeout_ms_ = tb_config::kRpcTimoutMaxMs;
+  } else if (rpc_read_timeout_ms <= tb_config::kRpcTimoutMinMs) {
+    rpc_read_timeout_ms_ = tb_config::kRpcTimoutMinMs;
   } else {
-    this->rpc_read_timeout_sec_ = rpc_read_timeout_sec;
+    rpc_read_timeout_ms_ = rpc_read_timeout_ms;
   }
 }
 
-int32_t BaseConfig::GetRpcReadTimeoutSec() { return this->rpc_read_timeout_sec_; }
+int32_t BaseConfig::GetRpcReadTimeoutMs() { return rpc_read_timeout_ms_; }
 
-void BaseConfig::SetHeartbeatPeriodSec(int32_t heartbeat_period_sec) {
-  this->heartbeat_period_sec_ = heartbeat_period_sec;
+void BaseConfig::SetHeartbeatPeriodMs(int32_t heartbeat_period_ms) {
+  heartbeat_period_ms_ = heartbeat_period_ms;
 }
 
-int32_t BaseConfig::GetHeartbeatPeriodSec() { return this->heartbeat_period_sec_; }
+int32_t BaseConfig::GetHeartbeatPeriodMs() { return heartbeat_period_ms_; }
 
 void BaseConfig::SetMaxHeartBeatRetryTimes(int32_t max_heartbeat_retry_times) {
-  this->max_heartbeat_retry_times_ = max_heartbeat_retry_times;
+  max_heartbeat_retry_times_ = max_heartbeat_retry_times;
 }
 
-int32_t BaseConfig::GetMaxHeartBeatRetryTimes() { return this->max_heartbeat_retry_times_; }
+int32_t BaseConfig::GetMaxHeartBeatRetryTimes() { return max_heartbeat_retry_times_; }
 
-void BaseConfig::SetHeartbeatPeriodAftFailSec(int32_t heartbeat_period_afterfail_sec) {
-  this->heartbeat_period_afterfail_sec_ = heartbeat_period_afterfail_sec;
+void BaseConfig::SetHeartbeatPeriodAftFailMs(int32_t heartbeat_period_afterfail_ms) {
+  heartbeat_period_afterfail_ms_ = heartbeat_period_afterfail_ms;
 }
 
-int32_t BaseConfig::GetHeartbeatPeriodAftFailSec() { return this->heartbeat_period_afterfail_sec_; }
+int32_t BaseConfig::GetHeartbeatPeriodAftFailMs() { return heartbeat_period_afterfail_ms_; }
 
 string BaseConfig::ToString() {
   stringstream ss;
   ss << "BaseConfig={master_addrinfo_='";
-  ss << this->master_addrinfo_;
+  ss << master_addrinfo_;
   ss << "', authEnable=";
-  ss << this->auth_enable_;
+  ss << auth_enable_;
   ss << ", auth_usrname_='";
-  ss << this->auth_usrname_;
+  ss << auth_usrname_;
   ss << "', auth_usrpassword_='";
-  ss << this->auth_usrpassword_;
+  ss << auth_usrpassword_;
   ss << "', tls_enabled_=";
-  ss << this->tls_enabled_;
+  ss << tls_enabled_;
   ss << ", tls_trust_store_path_='";
-  ss << this->tls_trust_store_path_;
+  ss << tls_trust_store_path_;
   ss << "', tls_trust_store_password_='";
-  ss << this->tls_trust_store_password_;
-  ss << "', rpc_read_timeout_sec_=";
-  ss << this->rpc_read_timeout_sec_;
-  ss << ", heartbeat_period_sec_=";
-  ss << this->heartbeat_period_sec_;
+  ss << tls_trust_store_password_;
+  ss << "', rpc_read_timeout_ms_=";
+  ss << rpc_read_timeout_ms_;
+  ss << ", heartbeat_period_ms_=";
+  ss << heartbeat_period_ms_;
   ss << ", max_heartbeat_retry_times_=";
-  ss << this->max_heartbeat_retry_times_;
-  ss << ", heartbeat_period_afterfail_sec_=";
-  ss << this->heartbeat_period_afterfail_sec_;
+  ss << max_heartbeat_retry_times_;
+  ss << ", heartbeat_period_afterfail_ms_=";
+  ss << heartbeat_period_afterfail_ms_;
   ss << "}";
   return ss.str();
 }
 
 ConsumerConfig::ConsumerConfig() {
-  this->group_name_ = "";
-  this->is_bound_consume_ = false;
-  this->session_key_ = "";
-  this->source_count_ = 0;
-  this->is_select_big_ = true;
-  this->consume_position_ = kConsumeFromLatestOffset;
-  this->is_confirm_in_local_ = false;
-  this->is_rollback_if_confirm_timout_ = true;
-  this->max_subinfo_report_intvl_ = tb_config::kSubInfoReportMaxIntervalTimes;
-  this->msg_notfound_wait_period_ms_ = tb_config::kMsgNotfoundWaitPeriodMsDef;
-  this->reb_confirm_wait_period_ms_ = tb_config::kRebConfirmWaitPeriodMsDef;
-  this->max_confirm_wait_period_ms_ = tb_config::kConfirmWaitPeriodMsMax;
-  this->shutdown_reb_wait_period_ms_ = tb_config::kRebWaitPeriodWhenShutdownMs;
+  group_name_ = "";
+  is_bound_consume_ = false;
+  session_key_ = "";
+  source_count_ = 0;
+  is_select_big_ = true;
+  consume_position_ = kConsumeFromLatestOffset;
+  is_rollback_if_confirm_timout_ = true;
+  max_subinfo_report_intvl_ = tb_config::kSubInfoReportMaxIntervalTimes;
+  max_part_check_period_ms_ = tb_config::kMaxPartCheckPeriodMsDef;
+  part_check_slice_ms_ = tb_config::kPartCheckSliceMsDef;
+  msg_notfound_wait_period_ms_ = tb_config::kMsgNotfoundWaitPeriodMsDef;
+  reb_confirm_wait_period_ms_ = tb_config::kRebConfirmWaitPeriodMsDef;
+  max_confirm_wait_period_ms_ = tb_config::kConfirmWaitPeriodMsMax;
+  shutdown_reb_wait_period_ms_ = tb_config::kRebWaitPeriodWhenShutdownMs;
 }
 
 ConsumerConfig::~ConsumerConfig() {
@@ -241,21 +242,20 @@ ConsumerConfig& ConsumerConfig::operator=(const ConsumerConfig& target) {
     // parent class
     BaseConfig::operator=(target);
     // child class
-    this->group_name_ = target.group_name_;
-    this->sub_topic_and_filter_map_ = target.sub_topic_and_filter_map_;
-    this->is_bound_consume_ = target.is_bound_consume_;
-    this->session_key_ = target.session_key_;
-    this->source_count_ = target.source_count_;
-    this->is_select_big_ = target.is_select_big_;
-    this->part_offset_map_ = target.part_offset_map_;
-    this->consume_position_ = target.consume_position_;
-    this->max_subinfo_report_intvl_ = target.max_subinfo_report_intvl_;
-    this->msg_notfound_wait_period_ms_ = target.msg_notfound_wait_period_ms_;
-    this->is_confirm_in_local_ = target.is_confirm_in_local_;
-    this->is_rollback_if_confirm_timout_ = target.is_rollback_if_confirm_timout_;
-    this->reb_confirm_wait_period_ms_ = target.reb_confirm_wait_period_ms_;
-    this->max_confirm_wait_period_ms_ = target.max_confirm_wait_period_ms_;
-    this->shutdown_reb_wait_period_ms_ = target.shutdown_reb_wait_period_ms_;
+    group_name_ = target.group_name_;
+    sub_topic_and_filter_map_ = target.sub_topic_and_filter_map_;
+    is_bound_consume_ = target.is_bound_consume_;
+    session_key_ = target.session_key_;
+    source_count_ = target.source_count_;
+    is_select_big_ = target.is_select_big_;
+    part_offset_map_ = target.part_offset_map_;
+    consume_position_ = target.consume_position_;
+    max_subinfo_report_intvl_ = target.max_subinfo_report_intvl_;
+    msg_notfound_wait_period_ms_ = target.msg_notfound_wait_period_ms_;
+    is_rollback_if_confirm_timout_ = target.is_rollback_if_confirm_timout_;
+    reb_confirm_wait_period_ms_ = target.reb_confirm_wait_period_ms_;
+    max_confirm_wait_period_ms_ = target.max_confirm_wait_period_ms_;
+    shutdown_reb_wait_period_ms_ = target.shutdown_reb_wait_period_ms_;
   }
   return *this;
 }
@@ -285,9 +285,9 @@ bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, const string& group
     set<string> tmp_filters;
     tmp_sub_map[topic_name] = tmp_filters;
   }
-  this->is_bound_consume_ = false;
-  this->group_name_ = tgt_group_name;
-  this->sub_topic_and_filter_map_ = tmp_sub_map;
+  is_bound_consume_ = false;
+  group_name_ = tgt_group_name;
+  sub_topic_and_filter_map_ = tmp_sub_map;
   err_info = "Ok";
   return true;
 }
@@ -376,15 +376,16 @@ bool ConsumerConfig::setGroupConsumeTarget(
   }
   // check if bound consume
   if (!is_bound_consume) {
-    this->is_bound_consume_ = false;
-    this->group_name_ = tgt_group_name;
-    this->sub_topic_and_filter_map_ = tmp_sub_map;
+    is_bound_consume_ = false;
+    group_name_ = tgt_group_name;
+    sub_topic_and_filter_map_ = tmp_sub_map;
     err_info = "Ok";
     return true;
   }
   // check session_key
   string tgt_session_key = Utils::Trim(session_key);
-  if (tgt_session_key.length() == 0 || tgt_session_key.length() > tb_config::kSessionKeyMaxLength) {
+  if (tgt_session_key.length() == 0
+    || tgt_session_key.length() > tb_config::kSessionKeyMaxLength) {
     if (tgt_session_key.length() == 0) {
       err_info = "Illegal parameter: session_key is empty!";
     } else {
@@ -410,7 +411,7 @@ bool ConsumerConfig::setGroupConsumeTarget(
       err_info = ss.str();
       return false;
     }
-    if (tmp_sub_map.find(result[1]) != tmp_sub_map.end()) {
+    if (tmp_sub_map.find(result[1]) == tmp_sub_map.end()) {
       stringstream ss;
       ss << "Illegal parameter: ";
       ss << it_part->first;
@@ -443,77 +444,96 @@ bool ConsumerConfig::setGroupConsumeTarget(
     tmp_parts_map[part_key] = it_part->second;
   }
   // set verified data
-  this->is_bound_consume_ = true;
-  this->group_name_ = tgt_group_name;
-  this->sub_topic_and_filter_map_ = tmp_sub_map;
-  this->session_key_ = tgt_session_key;
-  this->source_count_ = source_count;
-  this->is_select_big_ = is_select_big;
-  this->part_offset_map_ = tmp_parts_map;
+  is_bound_consume_ = true;
+  group_name_ = tgt_group_name;
+  sub_topic_and_filter_map_ = tmp_sub_map;
+  session_key_ = tgt_session_key;
+  source_count_ = source_count;
+  is_select_big_ = is_select_big;
+  part_offset_map_ = tmp_parts_map;
   err_info = "Ok";
   return true;
 }
 
-const string& ConsumerConfig::GetGroupName() const { return this->group_name_; }
+const string& ConsumerConfig::GetGroupName() const { return group_name_; }
 
 const map<string, set<string> >& ConsumerConfig::GetSubTopicAndFilterMap() const {
-  return this->sub_topic_and_filter_map_;
+  return sub_topic_and_filter_map_;
 }
 
 void ConsumerConfig::SetConsumePosition(ConsumePosition consume_from_where) {
-  this->consume_position_ = consume_from_where;
+  consume_position_ = consume_from_where;
 }
 
-const ConsumePosition ConsumerConfig::GetConsumePosition() const { return this->consume_position_; }
+const ConsumePosition ConsumerConfig::GetConsumePosition() const { return consume_position_; }
 
-const int ConsumerConfig::GetMsgNotFoundWaitPeriodMs() const {
-  return this->msg_notfound_wait_period_ms_;
+const int32_t ConsumerConfig::GetMsgNotFoundWaitPeriodMs() const {
+  return msg_notfound_wait_period_ms_;
 }
 
-void ConsumerConfig::SetMsgNotFoundWaitPeriodMs(int msg_notfound_wait_period_ms) {
-  this->msg_notfound_wait_period_ms_ = msg_notfound_wait_period_ms;
+void ConsumerConfig::SetMsgNotFoundWaitPeriodMs(int32_t msg_notfound_wait_period_ms) {
+  msg_notfound_wait_period_ms_ = msg_notfound_wait_period_ms;
 }
 
-const int ConsumerConfig::GetMaxSubinfoReportIntvl() const {
-  return this->max_subinfo_report_intvl_;
+const uint32_t ConsumerConfig::GetPartCheckSliceMs() const {
+  return part_check_slice_ms_;
 }
 
-void ConsumerConfig::SetMaxSubinfoReportIntvl(int max_subinfo_report_intvl) {
-  this->max_subinfo_report_intvl_ = max_subinfo_report_intvl;
+void ConsumerConfig::SetPartCheckSliceMs(uint32_t part_check_slice_ms) {
+  if (part_check_slice_ms >= 0
+    && part_check_slice_ms <= 1000) {
+    part_check_slice_ms_ = part_check_slice_ms;
+  }
+}
+
+const int32_t ConsumerConfig::GetMaxPartCheckPeriodMs() const {
+  return max_part_check_period_ms_;
 }
 
-bool ConsumerConfig::IsConfirmInLocal() { return this->is_confirm_in_local_; }
+void ConsumerConfig::SetMaxPartCheckPeriodMs(int32_t max_part_check_period_ms) {
+  max_part_check_period_ms_ = max_part_check_period_ms;
+}
+
+const int32_t ConsumerConfig::GetMaxSubinfoReportIntvl() const {
+  return max_subinfo_report_intvl_;
+}
 
-void ConsumerConfig::SetConfirmInLocal(bool confirm_in_local) {
-  this->is_confirm_in_local_ = confirm_in_local;
+void ConsumerConfig::SetMaxSubinfoReportIntvl(int32_t max_subinfo_report_intvl) {
+  max_subinfo_report_intvl_ = max_subinfo_report_intvl;
 }
 
-bool ConsumerConfig::IsRollbackIfConfirmTimeout() { return this->is_rollback_if_confirm_timout_; }
+bool ConsumerConfig::IsRollbackIfConfirmTimeout() {
+  return is_rollback_if_confirm_timout_; }
 
-void ConsumerConfig::setRollbackIfConfirmTimeout(bool is_rollback_if_confirm_timeout) {
-  this->is_rollback_if_confirm_timout_ = is_rollback_if_confirm_timeout;
+void ConsumerConfig::setRollbackIfConfirmTimeout(
+  bool is_rollback_if_confirm_timeout) {
+  is_rollback_if_confirm_timout_ = is_rollback_if_confirm_timeout;
 }
 
 const int ConsumerConfig::GetWaitPeriodIfConfirmWaitRebalanceMs() const {
-  return this->reb_confirm_wait_period_ms_;
+  return reb_confirm_wait_period_ms_;
 }
 
-void ConsumerConfig::SetWaitPeriodIfConfirmWaitRebalanceMs(int reb_confirm_wait_period_ms) {
-  this->reb_confirm_wait_period_ms_ = reb_confirm_wait_period_ms;
+void ConsumerConfig::SetWaitPeriodIfConfirmWaitRebalanceMs(
+  int32_t reb_confirm_wait_period_ms) {
+  reb_confirm_wait_period_ms_ = reb_confirm_wait_period_ms;
 }
 
-const int ConsumerConfig::GetMaxConfirmWaitPeriodMs() const { return max_confirm_wait_period_ms_; }
+const int32_t ConsumerConfig::GetMaxConfirmWaitPeriodMs() const {
+  return max_confirm_wait_period_ms_; }
 
-void ConsumerConfig::SetMaxConfirmWaitPeriodMs(int max_confirm_wait_period_ms) {
-  this->max_confirm_wait_period_ms_ = max_confirm_wait_period_ms;
+void ConsumerConfig::SetMaxConfirmWaitPeriodMs(
+  int32_t max_confirm_wait_period_ms) {
+  max_confirm_wait_period_ms_ = max_confirm_wait_period_ms;
 }
 
-const int ConsumerConfig::GetShutdownRebWaitPeriodMs() const {
-  return this->shutdown_reb_wait_period_ms_;
+const int32_t ConsumerConfig::GetShutdownRebWaitPeriodMs() const {
+  return shutdown_reb_wait_period_ms_;
 }
 
-void ConsumerConfig::SetShutdownRebWaitPeriodMs(int wait_period_when_shutdown_ms) {
-  this->shutdown_reb_wait_period_ms_ = wait_period_when_shutdown_ms;
+void ConsumerConfig::SetShutdownRebWaitPeriodMs(
+  int32_t wait_period_when_shutdown_ms) {
+  shutdown_reb_wait_period_ms_ = wait_period_when_shutdown_ms;
 }
 
 string ConsumerConfig::ToString() {
@@ -526,10 +546,10 @@ string ConsumerConfig::ToString() {
   ss << "ConsumerConfig = {";
   ss << BaseConfig::ToString();
   ss << ", group_name_='";
-  ss << this->group_name_;
+  ss << group_name_;
   ss << "', sub_topic_and_filter_map_={";
-  for (it_map = this->sub_topic_and_filter_map_.begin();
-       it_map != this->sub_topic_and_filter_map_.end(); ++it_map) {
+  for (it_map = sub_topic_and_filter_map_.begin();
+       it_map != sub_topic_and_filter_map_.end(); ++it_map) {
     if (i++ > 0) {
       ss << ",";
     }
@@ -549,16 +569,16 @@ string ConsumerConfig::ToString() {
     ss << "]";
   }
   ss << "}, is_bound_consume_=";
-  ss << this->is_bound_consume_;
+  ss << is_bound_consume_;
   ss << ", session_key_='";
-  ss << this->session_key_;
+  ss << session_key_;
   ss << "', source_count_=";
-  ss << this->source_count_;
+  ss << source_count_;
   ss << ", is_select_big_=";
-  ss << this->is_select_big_;
+  ss << is_select_big_;
   ss << ", part_offset_map_={";
   i = 0;
-  for (it = this->part_offset_map_.begin(); it != this->part_offset_map_.end(); ++it) {
+  for (it = part_offset_map_.begin(); it != part_offset_map_.end(); ++it) {
     if (i++ > 0) {
       ss << ",";
     }
@@ -568,21 +588,23 @@ string ConsumerConfig::ToString() {
     ss << it->second;
   }
   ss << "}, consume_position_=";
-  ss << this->consume_position_;
+  ss << consume_position_;
   ss << ", max_subinfo_report_intvl_=";
-  ss << this->max_subinfo_report_intvl_;
+  ss << max_subinfo_report_intvl_;
   ss << ", msg_notfound_wait_period_ms_=";
-  ss << this->msg_notfound_wait_period_ms_;
-  ss << ", is_confirm_in_local_=";
-  ss << this->is_confirm_in_local_;
+  ss << msg_notfound_wait_period_ms_;
+  ss << ", max_part_check_period_ms_=";
+  ss << max_part_check_period_ms_;
+  ss << ", part_check_slice_ms_=";
+  ss << part_check_slice_ms_;
   ss << ", is_rollback_if_confirm_timout_=";
-  ss << this->is_rollback_if_confirm_timout_;
+  ss << is_rollback_if_confirm_timout_;
   ss << ", reb_confirm_wait_period_ms_=";
-  ss << this->reb_confirm_wait_period_ms_;
+  ss << reb_confirm_wait_period_ms_;
   ss << ", max_confirm_wait_period_ms_=";
-  ss << this->max_confirm_wait_period_ms_;
+  ss << max_confirm_wait_period_ms_;
   ss << ", shutdown_reb_wait_period_ms_=";
-  ss << this->shutdown_reb_wait_period_ms_;
+  ss << shutdown_reb_wait_period_ms_;
   ss << "}";
   return ss.str();
 }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
index f914741..a12b199 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
@@ -20,89 +20,88 @@
 #include "tubemq/tubemq_message.h"
 
 #include <string.h>
-
 #include <sstream>
 
-#include "tubemq/const_config.h"
-#include "tubemq/utils.h"
+#include "const_config.h"
+#include "utils.h"
 
 namespace tubemq {
 
 using std::stringstream;
 
-
-// message flag's properties settings
-static const int32_t kMsgFlagIncProperties = 0x01;
-// reserved property key Filter Item
-static const char kRsvPropKeyFilterItem[] = "$msgType$";
-// reserved property key message send time
-static const char kRsvPropKeyMsgTime[] = "$msgTime$";
-
 Message::Message() {
-  this->topic_ = "";
-  this->flag_ = 0;
-  this->message_id_ = tb_config::kInvalidValue;
-  this->data_ = NULL;
-  this->datalen_ = 0;
-  this->properties_.clear();
+  flag_ = 0;
+  message_id_ = tb_config::kInvalidValue;
+  data_ = NULL;
+  datalen_ = 0;
 }
 
 Message::Message(const Message& target) {
-  this->topic_ = target.topic_;
-  this->message_id_ = target.message_id_;
+  topic_ = target.topic_;
+  message_id_ = target.message_id_;
   copyData(target.data_, target.datalen_);
   copyProperties(target.properties_);
-  this->flag_ = target.flag_;
+  flag_ = target.flag_;
 }
 
 Message::Message(const string& topic, const char* data, uint32_t datalen) {
-  this->topic_ = topic;
-  this->flag_ = 0;
-  this->message_id_ = tb_config::kInvalidValue;
+  topic_ = topic;
+  flag_ = 0;
+  message_id_ = tb_config::kInvalidValue;
   copyData(data, datalen);
-  this->properties_.clear();
+  properties_.clear();
+}
+
+Message::Message(const string& topic, int32_t flag,
+  int64_t message_id, const char* data, uint32_t datalen,
+  const map<string, string>& properties) {
+  topic_ = topic;
+  flag_ = flag;
+  message_id_ = message_id;
+  copyData(data, datalen);
+  copyProperties(properties);
 }
 
 Message::~Message() { clearData(); }
 
 Message& Message::operator=(const Message& target) {
   if (this == &target) return *this;
-  this->topic_ = target.topic_;
-  this->message_id_ = target.message_id_;
+  topic_ = target.topic_;
+  message_id_ = target.message_id_;
   clearData();
   copyData(target.data_, target.datalen_);
   copyProperties(target.properties_);
-  this->flag_ = target.flag_;
+  flag_ = target.flag_;
   return *this;
 }
 
-const int64_t Message::GetMessageId() const { return this->message_id_; }
+const int64_t Message::GetMessageId() const { return message_id_; }
 
-void Message::SetMessageId(int64_t message_id) { this->message_id_ = message_id; }
+void Message::SetMessageId(int64_t message_id) { message_id_ = message_id; }
 
-const string& Message::GetTopic() const { return this->topic_; }
+const string& Message::GetTopic() const { return topic_; }
 
-void Message::SetTopic(const string& topic) { this->topic_ = topic; }
+void Message::SetTopic(const string& topic) { topic_ = topic; }
 
-const char* Message::GetData() const { return this->data_; }
+const char* Message::GetData() const { return data_; }
 
-uint32_t Message::GetDataLength() const { return this->datalen_; }
+uint32_t Message::GetDataLength() const { return datalen_; }
 
 void Message::setData(const char* data, uint32_t datalen) {
   clearData();
   copyData(data, datalen);
 }
 
-const int32_t Message::GetFlag() const { return this->flag_; }
+const int32_t Message::GetFlag() const { return flag_; }
 
-void Message::SetFlag(int32_t flag) { this->flag_ = flag; }
+void Message::SetFlag(int32_t flag) { flag_ = flag; }
 
-const map<string, string>& Message::GetProperties() const { return this->properties_; }
+const map<string, string>& Message::GetProperties() const { return properties_; }
 
 int32_t Message::GetProperties(string& attribute) {
   attribute.clear();
   map<string, string>::iterator it_map;
-  for (it_map = this->properties_.begin(); it_map != this->properties_.end(); ++it_map) {
+  for (it_map = properties_.begin(); it_map != properties_.end(); ++it_map) {
     if (!attribute.empty()) {
       attribute += delimiter::kDelimiterComma;
     }
@@ -117,8 +116,8 @@ bool Message::HasProperty(const string& key) {
   map<string, string>::iterator it_map;
   string trimed_key = Utils::Trim(key);
   if (!trimed_key.empty()) {
-    it_map = this->properties_.find(trimed_key);
-    if (it_map != this->properties_.end()) {
+    it_map = properties_.find(trimed_key);
+    if (it_map != properties_.end()) {
       return true;
     }
   }
@@ -129,8 +128,8 @@ bool Message::GetProperty(const string& key, string& value) {
   map<string, string>::iterator it_map;
   string trimed_key = Utils::Trim(key);
   if (!trimed_key.empty()) {
-    it_map = this->properties_.find(trimed_key);
-    if (it_map != this->properties_.end()) {
+    it_map = properties_.find(trimed_key);
+    if (it_map != properties_.end()) {
       value = it_map->second;
       return true;
     }
@@ -138,7 +137,9 @@ bool Message::GetProperty(const string& key, string& value) {
   return false;
 }
 
-bool Message::GetFilterItem(string& value) { return GetProperty(kRsvPropKeyFilterItem, value); }
+bool Message::GetFilterItem(string& value) {
+  return GetProperty(tb_config::kRsvPropKeyFilterItem, value);
+}
 
 bool Message::AddProperty(string& err_info, const string& key, const string& value) {
   string trimed_key = Utils::Trim(key);
@@ -169,53 +170,54 @@ bool Message::AddProperty(string& err_info, const string& key, const string& val
     err_info = ss.str();
     return false;
   }
-  if (trimed_key == kRsvPropKeyFilterItem || trimed_key == kRsvPropKeyMsgTime) {
+  if (trimed_key == tb_config::kRsvPropKeyFilterItem
+    || trimed_key == tb_config::kRsvPropKeyMsgTime) {
     stringstream ss;
     ss << "Reserved token '";
-    ss << kRsvPropKeyFilterItem;
+    ss << tb_config::kRsvPropKeyFilterItem;
     ss << "' or '";
-    ss << kRsvPropKeyMsgTime;
+    ss << tb_config::kRsvPropKeyMsgTime;
     ss << "' must not be used in parmeter key!";
     err_info = ss.str();
     return false;
   }
   // add key and value
-  this->properties_[trimed_key] = trimed_value;
-  if (!this->properties_.empty()) {
-    this->flag_ |= kMsgFlagIncProperties;
+  properties_[trimed_key] = trimed_value;
+  if (!properties_.empty()) {
+    flag_ |= tb_config::kMsgFlagIncProperties;
   }
   err_info = "Ok";
   return true;
 }
 
 void Message::clearData() {
-  if (this->data_ != NULL) {
-    delete[] this->data_;
-    this->data_ = NULL;
-    this->datalen_ = 0;
+  if (data_ != NULL) {
+    delete[] data_;
+    data_ = NULL;
+    datalen_ = 0;
   }
 }
 
 void Message::copyData(const char* data, uint32_t datalen) {
   if (data == NULL) {
-    this->data_ = NULL;
-    this->datalen_ = 0;
+    data_ = NULL;
+    datalen_ = 0;
   } else {
-    this->datalen_ = datalen;
-    this->data_ = new char[datalen];
-    memset(this->data_, 0, datalen);
-    memcpy(this->data_, data, datalen);
+    datalen_ = datalen;
+    data_ = new char[datalen];
+    memset(data_, 0, datalen);
+    memcpy(data_, data, datalen);
   }
 }
 
 void Message::copyProperties(const map<string, string>& properties) {
-  this->properties_.clear();
+  properties_.clear();
   map<string, string>::const_iterator it_map;
   for (it_map = properties.begin(); it_map != properties.end(); ++it_map) {
-    this->properties_[it_map->first] = it_map->second;
+    properties_[it_map->first] = it_map->second;
   }
-  if (!this->properties_.empty()) {
-    this->flag_ |= kMsgFlagIncProperties;
+  if (!properties_.empty()) {
+    flag_ |= tb_config::kMsgFlagIncProperties;
   }
 }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
index 5298606..ef79440 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
@@ -18,7 +18,7 @@
  */
 
 #include "tubemq/tubemq_return.h"
-#include "tubemq/const_config.h"
+#include "const_config.h"
 
 
 
@@ -32,27 +32,52 @@ PeerInfo::PeerInfo() {
   curr_offset_ = tb_config::kInvalidValue;
 }
 
-PeerInfo::PeerInfo(const Partition& partition, int64_t offset) {
-  SetMsgSourceInfo(partition, offset);
+
+PeerInfo::PeerInfo(const string& broker_host, uint32_t partition_id,
+  const string& partiton_key, int64_t offset) {
+  broker_host_ = broker_host;
+  partition_id_ = partition_id;
+  partition_key_ = partiton_key;
+  curr_offset_ = offset;
 }
 
 PeerInfo& PeerInfo::operator=(const PeerInfo& target) {
   if (this != &target) {
-    this->partition_id_ = target.partition_id_;
-    this->broker_host_ = target.broker_host_;
-    this->partition_key_ = target.partition_key_;
-    this->curr_offset_ = target.curr_offset_;
+    partition_id_ = target.partition_id_;
+    broker_host_ = target.broker_host_;
+    partition_key_ = target.partition_key_;
+    curr_offset_ = target.curr_offset_;
   }
   return *this;
 }
 
-void PeerInfo::SetMsgSourceInfo(const Partition& partition, int64_t offset) {
-  partition_id_ = partition.GetPartitionId();
-  broker_host_ = partition.GetBrokerHost();
-  partition_key_ = partition.GetPartitionKey();
-  curr_offset_ = offset;
+ConsumeOffsetInfo::ConsumeOffsetInfo() {
+  partition_key_ = "";
+  curr_offset_ = tb_config::kInvalidValue;
+}
+
+ConsumeOffsetInfo::ConsumeOffsetInfo(
+  const string& part_key, int64_t curr_offset) {
+  partition_key_ = part_key;
+  curr_offset_ = curr_offset;
+}
+
+void ConsumeOffsetInfo::SetConsumeOffsetInfo(
+  const string& part_key, int64_t curr_offset) {
+  partition_key_ = part_key;
+  curr_offset_ = curr_offset;
 }
 
+ConsumeOffsetInfo& ConsumeOffsetInfo::operator=(
+  const ConsumeOffsetInfo& target) {
+  if (this != &target) {
+    partition_key_ = target.partition_key_;
+    curr_offset_ = target.curr_offset_;
+  }
+  return *this;
+}
+
+
 ConsumerResult::ConsumerResult() {
   success_ = false;
   err_code_ = tb_config::kInvalidValue;
@@ -62,25 +87,25 @@ ConsumerResult::ConsumerResult() {
 }
 
 ConsumerResult::ConsumerResult(const ConsumerResult& target) {
-  this->success_ = target.success_;
-  this->err_code_ = target.err_code_;
-  this->err_msg_ = target.err_msg_;
-  this->topic_name_ = target.topic_name_;
-  this->peer_info_ = target.peer_info_;
-  this->confirm_context_ = target.confirm_context_;
-  this->message_list_ = target.message_list_;
+  success_ = target.success_;
+  err_code_ = target.err_code_;
+  err_msg_ = target.err_msg_;
+  topic_name_ = target.topic_name_;
+  peer_info_ = target.peer_info_;
+  confirm_context_ = target.confirm_context_;
+  message_list_ = target.message_list_;
 }
 
-ConsumerResult::ConsumerResult(int32_t err_code, string err_msg) {
+ConsumerResult::ConsumerResult(int32_t error_code, string err_msg) {
   success_ = false;
-  err_code_ = err_code;
+  err_code_ = error_code;
   err_msg_ = err_msg;
   topic_name_ = "";
   confirm_context_ = "";
 }
 
 ConsumerResult::~ConsumerResult() {
-  this->message_list_.clear();
+  message_list_.clear();
   success_ = false;
   err_code_ = tb_config::kInvalidValue;
   err_msg_ = "";
@@ -90,52 +115,64 @@ ConsumerResult::~ConsumerResult() {
 
 ConsumerResult& ConsumerResult::operator=(const ConsumerResult& target) {
   if (this != &target) {
-    this->success_ = target.success_;
-    this->err_code_ = target.err_code_;
-    this->err_msg_ = target.err_msg_;
-    this->topic_name_ = target.topic_name_;
-    this->peer_info_ = target.peer_info_;
-    this->confirm_context_ = target.confirm_context_;
-    this->message_list_ = target.message_list_;
+    success_ = target.success_;
+    err_code_ = target.err_code_;
+    err_msg_ = target.err_msg_;
+    topic_name_ = target.topic_name_;
+    peer_info_ = target.peer_info_;
+    confirm_context_ = target.confirm_context_;
+    message_list_ = target.message_list_;
   }
   return *this;
 }
 
-void ConsumerResult::SetFailureResult(int32_t err_code, string err_msg) {
+void ConsumerResult::SetFailureResult(int32_t error_code, string err_msg) {
   success_ = false;
-  err_code_ = err_code;
+  err_code_ = error_code;
   err_msg_ = err_msg;
 }
 
-void ConsumerResult::SetFailureResult(int32_t err_code, string err_msg,
+void ConsumerResult::SetFailureResult(int32_t error_code, string err_msg,
                             const string& topic_name, const PeerInfo& peer_info) {
   success_ = false;
-  err_code_ = err_code;
+  err_code_ = error_code;
   err_msg_ = err_msg;
   topic_name_ = topic_name;
   peer_info_ = peer_info;
 }
 
-void ConsumerResult::SetSuccessResult(int32_t err_code,
+void ConsumerResult::SetSuccessResult(int32_t error_code,
+                                             const string& topic_name,
+                                             const PeerInfo& peer_info) {
+  success_ = true;
+  err_code_ = error_code;
+  err_msg_ = "Ok";
+  topic_name_ = topic_name;
+  peer_info_ = peer_info;
+  confirm_context_ = "";
+  message_list_.clear();
+}
+
+void ConsumerResult::SetSuccessResult(int32_t error_code,
                                              const string& topic_name,
                                              const PeerInfo& peer_info,
                                              const string& confirm_context,
                                              const list<Message>& message_list) {
-  this->success_ = true;
-  this->err_code_ = err_code;
-  this->err_msg_ = "Ok";
-  this->topic_name_ = topic_name;
-  this->peer_info_ = peer_info;
-  this->confirm_context_ = confirm_context;
-  this->message_list_ = message_list;
+  success_ = true;
+  err_code_ = error_code;
+  err_msg_ = "Ok";
+  topic_name_ = topic_name;
+  peer_info_ = peer_info;
+  confirm_context_ = confirm_context;
+  message_list_ = message_list;
 }
 
 const string& ConsumerResult::GetPartitionKey() const {
-  return this->peer_info_.GetPartitionKey();
+  return peer_info_.GetPartitionKey();
 }
 
 const int64_t ConsumerResult::GetCurrOffset() const {
-  return this->peer_info_.GetCurrOffset();
+  return peer_info_.GetCurrOffset();
 }
 
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/unique_seq_id.h b/tubemq-client-twins/tubemq-client-cpp/src/unique_seq_id.h
new file mode 100644
index 0000000..b24b627
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/unique_seq_id.h
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_UNIQUESEQID_H
+#define TUBEMQ_UNIQUESEQID_H
+
+#include <stdint.h>
+
+#include <atomic>
+
+namespace tubemq {
+
+class UniqueSeqId {
+ public:
+  UniqueSeqId() : id_(0) {}
+
+  uint32_t Next() { return id_.fetch_add(1, std::memory_order_relaxed); }
+
+ protected:
+  std::atomic<uint32_t> id_;
+};
+
+}  // namespace tubemq
+
+#endif
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index 75fcba5..81ef56e 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -17,24 +17,27 @@
  * under the License.
  */
 
-#include "tubemq/utils.h"
+#include "utils.h"
 
 #include <arpa/inet.h>
+#include <ctype.h>
 #include <linux/if.h>
+#include <netdb.h>
 #include <netinet/in.h>
 #include <regex.h>
-#include <stdlib.h>
 #include <stdio.h>
+#include <stdlib.h>
 #include <string.h>
 #include <sys/ioctl.h>
 #include <sys/socket.h>
 #include <sys/time.h>
 #include <unistd.h>
-
 #include <sstream>
 #include <vector>
+#include "const_config.h"
+#include "const_rpc.h"
+
 
-#include "tubemq/const_config.h"
 
 namespace tubemq {
 
@@ -43,6 +46,64 @@ using std::stringstream;
 
 static const char kWhitespaceCharSet[] = " \n\r\t\f\v";
 
+/*
+ *  copy from https://web.mit.edu/freebsd/head/sys/libkern/crc32.c
+*/
+static const uint32_t crc32_tab[256] = {
+  0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f,
+  0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
+  0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2,
+  0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
+  0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9,
+  0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
+  0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c,
+  0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
+  0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423,
+  0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
+  0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106,
+  0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
+  0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d,
+  0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
+  0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950,
+  0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
+  0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7,
+  0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
+  0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa,
+  0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
+  0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81,
+  0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
+  0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84,
+  0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
+  0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb,
+  0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
+  0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e,
+  0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
+  0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55,
+  0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
+  0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28,
+  0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
+  0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f,
+  0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
+  0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242,
+  0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
+  0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69,
+  0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
+  0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc,
+  0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
+  0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693,
+  0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
+  0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d
+};
+
+int32_t Utils::Crc32(const string &buf) {
+  uint32_t crc = ~0U;
+  for (uint32_t i = 0; i < buf.size(); ++i) {
+    unsigned char c_data = buf[i];
+    crc = crc32_tab[(crc ^ c_data) & 0xFF] ^ (crc >> 8);
+  }
+  return ((~crc)& 0x7FFFFFFF);
+}
+
 string Utils::Trim(const string& source) {
   string target = source;
   if (!target.empty()) {
@@ -132,6 +193,55 @@ void Utils::Split(const string& source, map<string, int32_t>& result, const stri
   }
 }
 
+void Utils::Split(const string& source, map<string, string>& result,
+  const string& delimiter_step1, const string& delimiter_step2) {
+  string item_str;
+  string key_str;
+  string val_str;
+  string::size_type pos1 = 0;
+  string::size_type pos2 = 0;
+  string::size_type pos3 = 0;
+  if (!source.empty()) {
+    pos1 = 0;
+    pos2 = source.find(delimiter_step1);
+    while (string::npos != pos2) {
+      item_str = source.substr(pos1, pos2 - pos1);
+      item_str = Utils::Trim(item_str);
+      pos1 = pos2 + delimiter_step1.length();
+      pos2 = source.find(delimiter_step1, pos1);
+      if (item_str.empty()) {
+        continue;
+      }
+      pos3 = item_str.find(delimiter_step2);
+      if (string::npos == pos3) {
+        continue;
+      }
+      key_str = item_str.substr(0, pos3);
+      val_str = item_str.substr(pos3 + delimiter_step2.length());
+      key_str = Utils::Trim(key_str);
+      val_str = Utils::Trim(val_str);
+      if (key_str.empty()) {
+        continue;
+      }
+      result[key_str] = val_str;
+    }
+    if (pos1 != source.length()) {
+      item_str = source.substr(pos1);
+      item_str = Utils::Trim(item_str);
+      pos3 = item_str.find(delimiter_step2);
+      if (string::npos != pos3) {
+        key_str = item_str.substr(0, pos3);
+        val_str = item_str.substr(pos3 + delimiter_step2.length());
+        key_str = Utils::Trim(key_str);
+        val_str = Utils::Trim(val_str);
+        if (!key_str.empty()) {
+          result[key_str] = val_str;
+        }
+      }
+    }
+  }
+}
+
 void Utils::Join(const vector<string>& vec, const string& delimiter, string& target) {
   vector<string>::const_iterator it;
   target.clear();
@@ -143,6 +253,19 @@ void Utils::Join(const vector<string>& vec, const string& delimiter, string& tar
   }
 }
 
+void Utils::Join(const map<string, string>& source, string& target,
+  const string& delimiter_step1, const string& delimiter_step2) {
+  map<string, string>::const_iterator it;
+  target.clear();
+  int count = 0;
+  for (it = source.begin(); it != source.end(); ++it) {
+    if (count++ > 0) {
+      target += delimiter_step1;
+    }
+    target += it->first + delimiter_step2 + it->second;
+  }
+}
+
 bool Utils::ValidString(string& err_info, const string& source, bool allow_empty, bool pat_match,
                         bool check_max_length, unsigned int maxlen) {
   if (source.empty()) {
@@ -202,7 +325,8 @@ bool Utils::ValidGroupName(string& err_info, const string& group_name, string& t
   int cflags = REG_EXTENDED;
   regex_t reg;
   regmatch_t pmatch[1];
-  const char* patRule = "^[a-zA-Z][\\w-]+$";
+  //  const char* patRule = "^[a-zA-Z][\\w-]+$";
+  const char* patRule = "^[a-zA-Z]\\w+$";
   regcomp(&reg, patRule, cflags);
   int status = regexec(&reg, tgt_group_name.c_str(), 1, pmatch, 0);
   regfree(&reg);
@@ -210,8 +334,9 @@ bool Utils::ValidGroupName(string& err_info, const string& group_name, string& t
     stringstream ss;
     ss << "Illegal parameter: ";
     ss << group_name;
-    ss << " must begin with a letter,can only contain ";
-    ss << "characters,numbers,hyphen,and underscores";
+    ss << " must begin with a letter,can only contain characters,numbers,and underscores";
+    //  ss << " must begin with a letter,can only contain ";
+    //  ss << "characters,numbers,hyphen,and underscores";
     err_info = ss.str();
     return false;
   }
@@ -281,13 +406,13 @@ int64_t Utils::GetCurrentTimeMillis() {
 
 bool Utils::ValidConfigFile(string& err_info, const string& conf_file) {
   FILE *fp = NULL;
-  
+
   if (conf_file.length() == 0) {
     err_info = "Configure file is blank";
     return false;
-  }  
-  fp = fopen(conf_file.c_str(),"r");
-  if(fp == NULL) {
+  }
+  fp = fopen(conf_file.c_str(), "r");
+  if (fp == NULL) {
     err_info = "Open configure file Failed!";
     return false;
   }
@@ -321,7 +446,7 @@ bool Utils::GetLocalIPV4Address(string& err_info, string& localhost) {
     if (0 == strncmp(&ifreq->ifr_name[0], "lo", sizeof("lo"))) {
       continue;
     }
-    memcpy(&if_flag.ifr_name[0],&ifreq->ifr_name[0],sizeof(ifreq->ifr_name));
+    memcpy(&if_flag.ifr_name[0], &ifreq->ifr_name[0], sizeof(ifreq->ifr_name));
     if ((ioctl(sockfd, SIOCGIFFLAGS, (char *) &if_flag)) < 0) {
       continue;
     }
@@ -329,8 +454,8 @@ bool Utils::GetLocalIPV4Address(string& err_info, string& localhost) {
       || !(if_flag.ifr_flags & IFF_UP)) {
       continue;
     }
-    
-    if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr), 
+
+    if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr),
       "127.0.0.1", 7)) {
       continue;
     }
@@ -344,6 +469,91 @@ bool Utils::GetLocalIPV4Address(string& err_info, string& localhost) {
   return false;
 }
 
+int32_t Utils::GetServiceTypeByMethodId(int32_t method_id) {
+  switch (method_id) {
+    // broker write service
+    case rpc_config::kBrokerMethoddProducerRegister:
+    case rpc_config::kBrokerMethoddProducerHeatbeat:
+    case rpc_config::kBrokerMethoddProducerSendMsg:
+    case rpc_config::kBrokerMethoddProducerClose: {
+      return rpc_config::kBrokerWriteService;
+    }
+    // broker read service
+    case rpc_config::kBrokerMethoddConsumerRegister:
+    case rpc_config::kBrokerMethoddConsumerHeatbeat:
+    case rpc_config::kBrokerMethoddConsumerGetMsg:
+    case rpc_config::kBrokerMethoddConsumerCommit:
+    case rpc_config::kBrokerMethoddConsumerClose: {
+      return rpc_config::kBrokerReadService;
+    }
+    // master service
+    case rpc_config::kMasterMethoddProducerRegister:
+    case rpc_config::kMasterMethoddProducerHeatbeat:
+    case rpc_config::kMasterMethoddProducerClose:
+    case rpc_config::kMasterMethoddConsumerRegister:
+    case rpc_config::kMasterMethoddConsumerHeatbeat:
+    case rpc_config::kMasterMethoddConsumerClose:
+    default: {
+      return rpc_config::kMasterService;
+    }
+  }
+}
+
+void Utils::XfsAddrByDns(const map<string, int32_t>& orig_addr_map,
+  map<string, string>& target_addr_map) {
+  hostent* host = NULL;
+  map<string, int32_t>::const_iterator it;
+  for (it = orig_addr_map.begin(); it != orig_addr_map.end(); it++) {
+    char first_char =  it->first.c_str()[0];
+    if (isalpha(first_char)) {
+      host = gethostbyname(it->first.c_str());
+      if (host != NULL) {
+        switch (host->h_addrtype) {
+          case AF_INET:
+          case AF_INET6: {
+            char **pptr = NULL;
+            unsigned int addr = 0;
+            char temp_str[32];
+            memset(temp_str, 0, 32);
+            pptr = host->h_addr_list;
+            addr = ((unsigned int *) host->h_addr_list[0])[0];
+            if ((addr & 0xffff) == 0x0a0a) {
+              pptr++;
+              addr = ((unsigned int *) host->h_addr_list[0])[1];
+            }
+            inet_ntop(host->h_addrtype, *pptr, temp_str, sizeof(temp_str));
+            string tempIpaddr = temp_str;
+            if (tempIpaddr.length() > 0) {
+              target_addr_map[it->first] = tempIpaddr;
+            }
+          }
+          break;
+
+          default:
+            break;
+        }
+      }
+    } else {
+      target_addr_map[it->first] = it->first;
+    }
+  }
+}
+
+bool Utils::NeedDnsXfs(const string& masteraddr) {
+  if (masteraddr.length() > 0) {
+    char first_char = masteraddr.c_str()[0];
+    if (isalpha(first_char)) {
+      return true;
+    }
+  }
+  return false;
+}
+
+string Utils::GenBrokerAuthenticateToken(const string& username,
+  const string& usrpassword) {
+  return "";
+}
+
 
 }  // namespace tubemq
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.h b/tubemq-client-twins/tubemq-client-cpp/src/utils.h
new file mode 100644
index 0000000..c61043b
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.h
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_UTILS_H_
+#define TUBEMQ_CLIENT_UTILS_H_
+
+#include <stdint.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+namespace tubemq {
+
+using std::map;
+using std::string;
+using std::vector;
+
+class Utils {
+ public:
+  // trim string info
+  static int32_t Crc32(const string &buf);
+  static string Trim(const string& source);
+  // split string to vector
+  static void Split(const string& source, vector<string>& result, const string& delimiter);
+  // split string to map<string, int>
+  static void Split(const string& source, map<string, int32_t>& result,
+                    const string& delimiter_step1, const string& delimiter_step2);
+  static void Split(const string& source, map<string, string>& result,
+                    const string& delimiter_step1, const string& delimiter_step2);
+  static void Join(const vector<string>& vec, const string& delimiter, string& target);
+  static void Join(const map<string, string>& source, string& target,
+    const string& delimiter_step1, const string& delimiter_step2);
+  static bool ValidString(string& err_info, const string& source, bool allow_empty, bool pat_match,
+                          bool check_max_length, unsigned int maxlen);
+  static bool ValidGroupName(string& err_info, const string& group_name, string& tgt_group_name);
+  static bool ValidFilterItem(string& err_info, const string& src_filteritem,
+                              string& tgt_filteritem);
+  static string Int2str(int32_t data);
+  static string Long2str(int64_t data);
+  static uint32_t IpToInt(const string& ipv4_addr);
+  static int64_t GetCurrentTimeMillis();
+  static bool ValidConfigFile(string& err_info, const string& conf_file);
+  static bool GetLocalIPV4Address(string& err_info, string& localhost);
+  static int32_t GetServiceTypeByMethodId(int32_t method_id);
+  static void XfsAddrByDns(const map<string, int32_t>& orig_addr_map,
+    map<string, string>& target_addr_map);
+  static bool NeedDnsXfs(const string& masteraddr);
+  static string GenBrokerAuthenticateToken(const string& username, const string& usrpassword);
+};
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_UTILS_H_
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/version.h b/tubemq-client-twins/tubemq-client-cpp/src/version.h
new file mode 100644
index 0000000..c479ede
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/version.h
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_VERSION_H_
+#define TUBEMQ_CLIENT_VERSION_H_
+
+#include <string>
+
+namespace tubemq {
+
+using std::string;
+
+static const char kTubeMQClientVersion[] = "0.1.0-0.5.0";
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_VERSION_H_