You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/15 08:51:52 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-351]C++ SDK example&tests (#263)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new fec786a  [TUBEMQ-351]C++ SDK example&tests (#263)
fec786a is described below

commit fec786a3655c017db67cc13ff65569e7cf42f32d
Author: charlely <41...@users.noreply.github.com>
AuthorDate: Tue Sep 15 16:51:44 2020 +0800

    [TUBEMQ-351]C++ SDK example&tests (#263)
    
    Co-authored-by: charleli <ch...@tencent.com>
---
 .gitmodules                                        |   4 -
 .../tubemq-client-cpp/CMakeLists.txt               |  37 +---
 tubemq-client-twins/tubemq-client-cpp/README.md    |   7 +-
 .../tubemq-client-cpp/build_linux.sh               |   7 -
 .../tubemq-client-cpp/example/CMakeLists.txt       |   6 +-
 .../example/{log => consumer}/CMakeLists.txt       |   3 +-
 .../example/consumer/test_consumer.cc              | 137 ++++++++++++
 .../example/consumer/test_multithread_pull.cc      | 166 ++++++++++++++
 .../{third_party => proto}/CMakeLists.txt          |   9 +-
 .../tubemq-client-cpp/src/baseconsumer.cc          |  22 +-
 .../tubemq-client-cpp/src/client_connection.cc     |  38 +++-
 .../tubemq-client-cpp/src/client_connection.h      |   2 +-
 .../tubemq-client-cpp/src/client_service.h         | 242 ++++++++++-----------
 .../tubemq-client-cpp/src/thread_pool.h            |   2 +-
 .../{example => tests}/CMakeLists.txt              |   5 +-
 .../{example/log/main.cc => tests/README.md}       |  68 ++----
 .../executor_pool/CMakeLists.txt                   |   2 +-
 .../{example => tests}/executor_pool/main.cc       |   9 +-
 .../{example => tests}/log/CMakeLists.txt          |   2 +-
 .../{example => tests}/log/main.cc                 |  32 ++-
 .../log => tests/thread_pool}/CMakeLists.txt       |   2 +-
 .../{example/log => tests/thread_pool}/main.cc     |  16 +-
 .../tubemq-client-cpp/third_party/CMakeLists.txt   |  62 ++++--
 .../tubemq-client-cpp/third_party/log4cplus        |   1 -
 .../tubemq-client-cpp/third_party/rapidjson        |   1 -
 25 files changed, 581 insertions(+), 301 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index c80f44d..417049c 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -2,7 +2,3 @@
 	path = tubemq-client-twins/tubemq-client-cpp/third_party/rapidjson
 	url = https://github.com/Tencent/rapidjson.git
 	tag = v1.1.0
-[submodule "tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus"]
-	path = tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus
-	url = https://github.com/log4cplus/log4cplus
-	tag = REL_2_0_5
diff --git a/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
index 984a28a..94ac9f1 100644
--- a/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
@@ -22,36 +22,23 @@ cmake_minimum_required (VERSION 3.1)
 
 project (TubeMQ)
 
-set (CXX_FLAGS
-    -g
-    -fPIC
-    -Wall
-    -D__STDC_FORMAT_MACROS
-    -Wno-unused-parameter
-    -Wno-unused-function
-    -Wunused-variable
-    -Wunused-value
-    -Wshadow
-    -Wcast-qual
-    -Wcast-align
-    -Wwrite-strings
-    -Wsign-compare
-    -Winvalid-pch
-    -fms-extensions
-    -Wfloat-equal
-    -Wextra
-    -std=c++11
-    )
+find_package(Protobuf REQUIRED)
+
+SET(CMAKE_CXX_FLAGS "-std=c++11 -O2 -g -Wall -Werror -Wsign-compare -Wfloat-equal -fno-strict-aliasing -fPIC -DASIO_STANDALONE")
+
+INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/proto)
 
 INCLUDE_DIRECTORIES(include)
+INCLUDE_DIRECTORIES(src)
 
-INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/third_party/rapidjson/include)
-INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/third_party/log4cplus/include)
-INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/log4cplus/include)
-LINK_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/log4cplus/lib)
+INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/rapidjson/src/rapidjson/include)
+INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/include)
+LINK_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/lib)
 
-ADD_SUBDIRECTORY(src)
 ADD_SUBDIRECTORY(third_party)
+ADD_SUBDIRECTORY(proto)
+ADD_SUBDIRECTORY(src)
 ADD_SUBDIRECTORY(example)
+ADD_SUBDIRECTORY(tests)
 
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/README.md b/tubemq-client-twins/tubemq-client-cpp/README.md
index 0f566d5..c2c1960 100644
--- a/tubemq-client-twins/tubemq-client-cpp/README.md
+++ b/tubemq-client-twins/tubemq-client-cpp/README.md
@@ -30,9 +30,8 @@
  * [Log4cplus](https://github.com/log4cplus/log4cplus.git)
  * [Rapidjson](https://github.com/Tencent/rapidjson.git)
 
-## Install ASIO
-  * ./autogen.sh
-  * ./configure CFLAGS=-std=c++11 CPPFLAGS=-std=c++11 CXXFLAGS=-std=c++11
-  * make && make install
+## Step to build
+  * install protobuf
+  * ./build_linux.sh
 
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/build_linux.sh b/tubemq-client-twins/tubemq-client-cpp/build_linux.sh
index 7664d5b..fd2dfdb 100755
--- a/tubemq-client-twins/tubemq-client-cpp/build_linux.sh
+++ b/tubemq-client-twins/tubemq-client-cpp/build_linux.sh
@@ -20,13 +20,6 @@
 
 #!/bin/bash
 
-cd ../../
-git submodule init
-git submodule update
-git submodule foreach --recursive git submodule init 
-git submodule foreach --recursive git submodule update 
-cd -
-
 mkdir build
 cd build
 cmake ../
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
index 9d729c2..14c0ec2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
@@ -26,8 +26,8 @@ function(tubemq_add_example _name)
   set(_srcs ${ARGN})
   message (STATUS "${_name} sources: ${_srcs}")
   add_executable (${_name} ${_srcs})
-  TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto log4cplus pthread)
+  TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto liblog4cplus.a libprotobuf.a tubemq_proto pthread rt)
 endfunction()
 
-add_subdirectory (log)
-add_subdirectory (executor_pool)
+add_subdirectory (consumer)
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/example/consumer/CMakeLists.txt
similarity index 88%
copy from tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
copy to tubemq-client-twins/tubemq-client-cpp/example/consumer/CMakeLists.txt
index b828bf8..8f8b7f2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/example/consumer/CMakeLists.txt
@@ -18,4 +18,5 @@
 #
 
 
-tubemq_add_example(log main.cc)
+tubemq_add_example(consumer test_consumer.cc)
+tubemq_add_example(multi_pull test_multithread_pull.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
new file mode 100644
index 0000000..bd1b487
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <libgen.h>
+#include <sys/time.h>
+#include <chrono>
+#include <set>
+#include <string>
+#include <thread>
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+using namespace std;
+using namespace tubemq;
+
+using std::set;
+using std::string;
+using tubemq::ConsumerConfig;
+using tubemq::ConsumerResult;
+using tubemq::TubeMQConsumer;
+
+
+
+
+
+
+int main(int argc, char* argv[]) {
+  bool result;
+  string err_info;
+  string conf_file = "../conf/client.conf";
+  string group_name = "test_c_v8";
+  string master_addr = "10.215.128.83:8000,10.215.128.83:8000";
+  TubeMQConsumer consumer_1;
+
+  set<string> topic_list;
+  topic_list.insert("test_1");
+  ConsumerConfig consumer_config;
+
+  consumer_config.SetRpcReadTimeoutMs(20000);
+  result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+  if (!result) {
+    printf("\n Set Master AddrInfo failure: %s", err_info.c_str());
+    return -1;
+  }
+  result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+  if (!result) {
+    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+    return -1;
+  }
+  result = StartTubeMQService(err_info, conf_file);
+  if (!result) {
+    printf("\n StartTubeMQService failure: %s", err_info.c_str());
+    return -1;
+  }
+
+  result = consumer_1.Start(err_info, consumer_config);
+  if (!result) {
+    printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+    return -2;
+  }
+
+  ConsumerResult gentRet;
+  ConsumerResult confirm_result;
+  int64_t start_time = time(NULL);
+  do {
+    // 1. get Message;
+    result = consumer_1.GetMessage(gentRet);
+    if (result) {
+      // 2.1.1  if success, process message
+      list<Message> msgs = gentRet.GetMessageList();
+      printf("\n GetMessage success, msssage count =%ld ", msgs.size());
+      // 2.1.2 confirm message result
+      consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+    } else {
+      // 2.2.1 if failure, check error code
+      // print error message if errcode not in
+      // [no partitions assigned, all partitions in use,
+      //    or all partitons idle, reach max position]
+      if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+        || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+        || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+        || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+        printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+          gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+      }
+    }
+    // used for test, consume 10 minutes only
+    if (time(NULL) - start_time > 10 * 60) {
+      break;
+    }
+  } while (true);
+
+  getchar();  // for test hold the test thread
+  consumer_1.ShutDown();
+
+  getchar();  // for test hold the test thread
+  result = StopTubeMQService(err_info);
+  if (!result) {
+    printf("\n *** StopTubeMQService failure, reason is %s ", err_info.c_str());
+  }
+
+  printf("\n finishe test exist ");
+  return 0;
+}
+
+
+
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
new file mode 100644
index 0000000..fc0bf8f
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <libgen.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <time.h>
+#include <string>
+#include <chrono>
+#include <set>
+#include <thread>
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+using namespace std;
+using namespace tubemq;
+
+TubeMQConsumer consumer_1;
+
+
+
+AtomicLong last_print_time(0);
+AtomicLong last_msg_count(0);
+AtomicLong last_print_count(0);
+
+
+
+void calc_message_count(int64_t msg_count) {
+  int64_t last_time = last_print_time.Get();
+  int64_t cur_count = last_msg_count.AddAndGet(msg_count);
+  int64_t cur_time = time(NULL);
+  if (cur_count - last_print_count.Get() >= 50000
+    || cur_time - last_time > 90) {
+    if (last_print_time.CompareAndSet(last_time, cur_time)) {
+      printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
+      last_print_count.Set(cur_count);
+    }
+  }
+}
+
+
+void thread_task_pull(int32_t thread_no) {
+  bool result;
+  int64_t msg_count = 0;
+  ConsumerResult gentRet;
+  ConsumerResult confirm_result;
+  printf("\n thread_task_pull start: %d", thread_no);
+  do {
+    msg_count = 0;
+    // 1. get Message;
+    result = consumer_1.GetMessage(gentRet);
+    if (result) {
+      // 2.1.1  if success, process message
+      list<Message> msgs = gentRet.GetMessageList();
+      msg_count = msgs.size();
+      // 2.1.2 confirm message result
+      consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+    } else {
+      // 2.2.1 if failure, check error code
+      // print error message if errcode not in
+      // [no partitions assigned, all partitions in use,
+      //    or all partitons idle, reach max position]
+      if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+        || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+        || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+        || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+        if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
+          || gentRet.GetErrCode() == err_code::kErrClientStop) {
+          break;
+        }
+        printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+          gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+      }
+    }
+    calc_message_count(msg_count);
+  } while (true);
+  printf("\n thread_task_pull finished: %d", thread_no);
+}
+
+
+int main(int argc, char* argv[]) {
+  bool result;
+  string err_info;
+  string conf_file = "../conf/client.conf";
+  string group_name = "test_c_v8";
+  string master_addr = "10.215.128.83:8000,10.215.128.83:8000";
+  int32_t thread_num = 15;
+  set<string> topic_list;
+  topic_list.insert("test_1");
+  ConsumerConfig consumer_config;
+
+  consumer_config.SetRpcReadTimeoutMs(20000);
+  result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+  if (!result) {
+    printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
+    return -1;
+  }
+  result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+  if (!result) {
+    printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+    return -1;
+  }
+  result = StartTubeMQService(err_info, conf_file);
+  if (!result) {
+    printf("\n StartTubeMQService failure: %s", err_info.c_str());
+    return -1;
+  }
+
+  result = consumer_1.Start(err_info, consumer_config);
+  if (!result) {
+    printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+    return -2;
+  }
+
+  std::thread pull_threads[thread_num];
+  for (int32_t i = 0; i < thread_num; i++) {
+    pull_threads[i] = std::thread(thread_task_pull, i);
+  }
+
+  getchar();  // for test hold the test thread
+  consumer_1.ShutDown();
+  //
+  for (int32_t i = 0; i < thread_num; i++) {
+    if (pull_threads[i].joinable()) {
+      pull_threads[i].join();
+    }
+  }
+
+  getchar();  // for test hold the test thread
+  result = StopTubeMQService(err_info);
+  if (!result) {
+    printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
+  }
+
+  printf("\n finishe test exist");
+  return 0;
+}
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/proto/CMakeLists.txt
similarity index 74%
copy from tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
copy to tubemq-client-twins/tubemq-client-cpp/proto/CMakeLists.txt
index 7dc5e88..c10ad25 100644
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/proto/CMakeLists.txt
@@ -17,9 +17,10 @@
 # under the License.
 #
 
+cmake_minimum_required (VERSION 3.1)
 
-CMAKE_MINIMUM_REQUIRED(VERSION 3.1)                                             
-PROJECT(TubeMQThirdParty)
-
-ADD_SUBDIRECTORY(log4cplus) 
 
+file(GLOB ProtoFiles "${CMAKE_CURRENT_SOURCE_DIR}/*.proto")
+PROTOBUF_GENERATE_CPP(ProtoSources ProtoHeaders ${ProtoFiles})
+add_library(tubemq_proto STATIC ${ProtoSources} ${ProtoHeaders})
+target_link_libraries(tubemq_proto libprotobuf.a)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index 714b353..46d103b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -146,9 +146,7 @@ bool BaseConsumer::GetMessage(ConsumerResult& result) {
     result.SetFailureResult(error_code, err_info);
     return false;
   }
-
   int64_t curr_offset = tb_config::kInvalidValue;
-
   bool filter_consume = sub_info_.IsFilterConsume(partition_ext.GetTopic());
   PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
     partition_ext.GetPartitionKey(), curr_offset);
@@ -208,11 +206,9 @@ bool BaseConsumer::IsConsumeReady(ConsumerResult& result) {
     if (err_code::kErrSuccess == ret_code) {
       return true;
     }
-
     if ((config_.GetMaxPartCheckPeriodMs() >= 0)
       && (Utils::GetCurrentTimeMillis() - start_time
       >= config_.GetMaxPartCheckPeriodMs())) {
-
       switch (ret_code) {
         case err_code::kErrNoPartAssigned: {
           result.SetFailureResult(ret_code,
@@ -281,9 +277,7 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
   string part_key = Utils::Trim(confirm_context.substr(0, pos1));
   string booked_time_str =
       Utils::Trim(confirm_context.substr(pos1 + token1.size(), confirm_context.size()));
-
   int64_t booked_time = atol(booked_time_str.c_str());
-
   pos1 = part_key.find(token2);
   if (string::npos == pos1) {
     result.SetFailureResult(err_code::kErrBadRequest,
@@ -311,9 +305,7 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
                             "Not found the partition by confirm_context!");
     return false;
   }
-
   int64_t curr_offset = tb_config::kInvalidValue;
-
   PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
     partition_ext.GetPartitionKey(), curr_offset);
   auto request = std::make_shared<RequestContext>();
@@ -352,9 +344,7 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
     if (rsp->success_) {
       CommitOffsetResponseB2C rsp_b2c;
       ret_result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
-
                                           (int32_t)(rsp->rsp_body_.data().length()));
-
       if (ret_result) {
         if (rsp_b2c.success()) {
           curr_offset = rsp_b2c.curroffset();
@@ -449,7 +439,6 @@ bool BaseConsumer::register2Master(int32_t& error_code, string& err_info, bool n
       error_code = error.Value();
       err_info = error.Message();
     }
-
     if (error_code == err_code::kErrConsumeGroupForbidden
       || error_code == err_code::kErrConsumeContentForbidden) {
       // set regist process status to existed
@@ -816,8 +805,7 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
           if (rsp->success_) {
             HeartBeatResponseB2C rsp_b2c;
             bool result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
-                                                (int32_t)(rsp->rsp_body_.data().length()));
-
+                                                 (int32_t)(rsp->rsp_body_.data().length()));
             if (result) {
               set<string> partition_keys;
               if (rsp_b2c.success()) {
@@ -1079,7 +1067,6 @@ bool BaseConsumer::processRegisterResponseM2C(int32_t& error_code, string& err_i
   RegisterResponseM2C rsp_m2c;
   bool result = rsp_m2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
                                        (int32_t)(rsp_protocol->rsp_body_.data().length()));
-
   if (!result) {
     error_code = err_code::kErrParseFailure;
     err_info = "Parse RegisterResponseM2C response failure!";
@@ -1237,7 +1224,6 @@ bool BaseConsumer::processRegResponseB2C(int32_t& error_code, string& err_info,
   RegisterResponseB2C rsp_b2c;
   bool result = rsp_b2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
                                        (int32_t)(rsp_protocol->rsp_body_.data().length()));
-
   if (!result) {
     error_code = err_code::kErrParseFailure;
     err_info = "Parse RegisterResponseB2C response failure!";
@@ -1332,7 +1318,6 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
                                              bool filter_consume, const PartitionExt& partition_ext,
                                              const string& confirm_context,
                                              const TubeMQCodec::RspProtocolPtr& rsp) {
-
   // #lizard forgives
   string err_info;
 
@@ -1348,13 +1333,11 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
   GetMessageResponseB2C rsp_b2c;
   bool ret_result = rsp_b2c.ParseFromArray(
     rsp->rsp_body_.data().c_str(), (int32_t)(rsp->rsp_body_.data().length()));
-
   if (!ret_result) {
     rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false);
     result.SetFailureResult(err_code::kErrServerError,
                             "Parse GetMessageResponseB2C response failure!",
                             partition_ext.GetTopic(), peer_info);
-
     LOG_TRACE("[CONSUMER] processGetMessageRspB2C parse failure, client=%s", client_uuid_.c_str());
     return false;
   }
@@ -1362,12 +1345,10 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
   switch (rsp_b2c.errcode()) {
     case err_code::kErrSuccess: {
       bool esc_limit = (rsp_b2c.has_escflowctrl() && rsp_b2c.escflowctrl());
-
       int64_t data_dltval =
           rsp_b2c.has_currdatadlt() ? rsp_b2c.currdatadlt() : tb_config::kInvalidValue;
       int64_t curr_offset = rsp_b2c.has_curroffset() ?
         rsp_b2c.curroffset() : tb_config::kInvalidValue;
-
       bool req_slow = rsp_b2c.has_requireslow() ? rsp_b2c.requireslow() : false;
       int msg_size = 0;
       list<Message> message_list;
@@ -1392,7 +1373,6 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
 
     case err_code::kErrConsumeSpeedLimit: {
       // Process with server side speed limit
-
       int64_t def_dlttime = rsp_b2c.has_minlimittime() ? rsp_b2c.minlimittime()
                                                     : config_.GetMsgNotFoundWaitPeriodMs();
       rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false,
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
index c795d35..e6ae503 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
@@ -150,7 +150,7 @@ void ClientConnection::asyncRead() {
   }
   recv_buffer_->EnsureWritableBytes(rpc_config::kRpcEnsureWriteableBytes);
   auto self = shared_from_this();
-  socket_->async_read_some(
+  socket_->async_receive(
       asio::buffer(recv_buffer_->WriteBegin(), recv_buffer_->WritableBytes()),
       [self, this](std::error_code ec, std::size_t len) {
         if (ec) {
@@ -166,21 +166,36 @@ void ClientConnection::asyncRead() {
         }
         recv_time_ = std::time(nullptr);
         recv_buffer_->WriteBytes(len);
-        checkPackageDone();
-        LOG_TRACE("[%s]async read done, len:%ld, package_length_:%ld, recvbuffer:%s",
-                  ToString().c_str(), len, package_length_, recv_buffer_->String().c_str());
+        std::error_code error;
+        size_t availsize = socket_->available(error);
+        LOG_TRACE("[%s]async read done, len:%ld, package_length_:%ld, availsize:%ld, recvbuffer:%s",
+                  ToString().c_str(), len, package_length_, availsize,
+                  recv_buffer_->String().c_str());
+        if (availsize > 0 && !error) {
+          recv_buffer_->EnsureWritableBytes(availsize);
+          size_t rlen = socket_->receive(asio::buffer(recv_buffer_->WriteBegin(), availsize));
+          if (rlen > 0) {
+            recv_buffer_->WriteBytes(rlen);
+          }
+          LOG_TRACE("[%s]syncread done, receivelen:%ld, recvbuffer:%s", ToString().c_str(), rlen,
+                    recv_buffer_->String().c_str());
+        }
+        while (checkPackageDone() > 0 && recv_buffer_->length() > 0) {
+          LOG_TRACE("[%s]recheck packagedone package_length_:%ld, recvbuffer:%s",
+                    ToString().c_str(), package_length_, recv_buffer_->String().c_str());
+        }
         asyncRead();
       });
 }
 
-void ClientConnection::checkPackageDone() {
+int ClientConnection::checkPackageDone() {
   if (check_ == nullptr) {
     recv_buffer_->Reset();
     LOG_ERROR("check codec func not set");
-    return;
+    return -1;
   }
   if (package_length_ > recv_buffer_->length()) {
-    return;
+    return 0;
   }
   uint32_t request_id = 0;
   bool has_request_id = false;
@@ -192,11 +207,11 @@ void ClientConnection::checkPackageDone() {
     package_length_ = 0;
     LOG_ERROR("%s, check codec package result:%d", ToString().c_str(), result);
     close();
-    return;
+    return -1;
   }
   if (result == 0) {
     package_length_ = package_length;
-    return;
+    return 0;
   }
   ++read_package_number_;
   package_length_ = 0;
@@ -205,12 +220,13 @@ void ClientConnection::checkPackageDone() {
     auto it = request_list_.begin();
     if (it == request_list_.end()) {
       LOG_ERROR("%s, not find request", ToString().c_str());
-      return;
+      return 1;
     }
     requestCallback(it->first, nullptr, &check_out);
-    return;
+    return 1;
   }
   requestCallback(request_id, nullptr, &check_out);
+  return 1;
 }
 
 void ClientConnection::requestCallback(uint32_t request_id, ErrorCode* err, Any* check_out) {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
index b6c7a36..2000e21 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
@@ -78,7 +78,7 @@ class ClientConnection : public Connection, public std::enable_shared_from_this<
   void checkDeadline(const std::error_code& ec);
   void contextString();
   void asyncRead();
-  void checkPackageDone();
+  int checkPackageDone();
   void requestCallback(uint32_t request_id, ErrorCode* err = nullptr, Any* check_out = nullptr);
   TransportRequest* nextTransportRequest();
   void asyncWrite();
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
index 11f5b42..7271ca0 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
@@ -1,121 +1,121 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-#ifndef TUBEMQ_CLIENT_BASE_CLIENT_H_
-#define TUBEMQ_CLIENT_BASE_CLIENT_H_
-
-#include <stdint.h>
-
-#include <map>
-#include <mutex>
-#include <string>
-#include <thread>
-
-#include "connection_pool.h"
-#include "file_ini.h"
-#include "noncopyable.h"
-#include "rmt_data_cache.h"
-#include "thread_pool.h"
-#include "tubemq/tubemq_atomic.h"
-#include "tubemq/tubemq_config.h"
-#include "tubemq/tubemq_message.h"
-#include "tubemq/tubemq_return.h"
-
-namespace tubemq {
-
-using std::map;
-using std::mutex;
-using std::string;
-using std::thread;
-
-class BaseClient {
- public:
-  explicit BaseClient(bool is_producer);
-  virtual ~BaseClient();
-  virtual void ShutDown() {}
-  void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
-  bool IsProducer() { return is_producer_; }
-  const int32_t GetClientIndex() { return client_index_; }
-
- protected:
-  bool is_producer_;
-  int32_t client_index_;
-};
-
-class TubeMQService : public noncopyable {
- public:
-  static TubeMQService* Instance();
-  bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
-  bool Stop(string& err_info);
-  bool IsRunning();
-  const int32_t GetServiceStatus() const { return service_status_.Get(); }
-  int32_t GetClientObjCnt();
-  bool AddClientObj(string& err_info, BaseClient* client_obj);
-  BaseClient* GetClientObj(int32_t client_index) const;
-  void RmvClientObj(BaseClient* client_obj);
-  const string& GetLocalHost() const { return local_host_; }
-  ExecutorPoolPtr GetTimerExecutorPool() { return timer_executor_; }
-  SteadyTimerPtr CreateTimer() { return timer_executor_->Get()->CreateSteadyTimer(); }
-  ExecutorPoolPtr GetNetWorkExecutorPool() { return network_executor_; }
-  ConnectionPoolPtr GetConnectionPool() { return connection_pool_; }
-  template <class function>
-  void Post(function f) {
-    if (thread_pool_ != nullptr) {
-      thread_pool_->Post(f);
-    }
-  }
-  bool AddMasterAddress(string& err_info, const string& master_info);
-  void GetXfsMasterAddress(const string& source, string& target);
-
- protected:
-  void updMasterAddrByDns();
-
- private:
-  TubeMQService();
-  ~TubeMQService();
-  void iniLogger(const Fileini& fileini, const string& sector);
-  void iniPoolThreads(const Fileini& fileini, const string& sector);
-  void iniXfsThread(const Fileini& fileini, const string& sector);
-  void thread_task_dnsxfs(int dns_xfs_period_ms);
-  void shutDownClinets() const;
-  bool hasXfsTask(map<string, int32_t>& src_addr_map);
-  bool addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map);
-
- private:
-  static TubeMQService* _instance;
-  string local_host_;
-  AtomicInteger service_status_;
-  AtomicInteger client_index_base_;
-  mutable mutex mutex_;
-  map<int32_t, BaseClient*> clients_map_;
-  ExecutorPoolPtr timer_executor_;
-  ExecutorPoolPtr network_executor_;
-  ConnectionPoolPtr connection_pool_;
-  std::shared_ptr<ThreadPool> thread_pool_;
-  thread dns_xfs_thread_;
-  mutable mutex dns_mutex_;
-  int64_t last_check_time_;
-  map<string, int32_t> master_source_;
-  map<string, string> master_target_;
-};
-
-}  // namespace tubemq
-
-#endif  // TUBEMQ_CLIENT_BASE_CLIENT_H_
\ No newline at end of file
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_BASE_CLIENT_H_
+#define TUBEMQ_CLIENT_BASE_CLIENT_H_
+
+#include <stdint.h>
+
+#include <map>
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "connection_pool.h"
+#include "file_ini.h"
+#include "noncopyable.h"
+#include "rmt_data_cache.h"
+#include "thread_pool.h"
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+namespace tubemq {
+
+using std::map;
+using std::mutex;
+using std::string;
+using std::thread;
+
+class BaseClient {
+ public:
+  explicit BaseClient(bool is_producer);
+  virtual ~BaseClient();
+  virtual void ShutDown() {}
+  void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
+  bool IsProducer() { return is_producer_; }
+  const int32_t GetClientIndex() { return client_index_; }
+
+ protected:
+  bool is_producer_;
+  int32_t client_index_;
+};
+
+class TubeMQService : public noncopyable {
+ public:
+  static TubeMQService* Instance();
+  bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
+  bool Stop(string& err_info);
+  bool IsRunning();
+  const int32_t GetServiceStatus() const { return service_status_.Get(); }
+  int32_t GetClientObjCnt();
+  bool AddClientObj(string& err_info, BaseClient* client_obj);
+  BaseClient* GetClientObj(int32_t client_index) const;
+  void RmvClientObj(BaseClient* client_obj);
+  const string& GetLocalHost() const { return local_host_; }
+  ExecutorPoolPtr GetTimerExecutorPool() { return timer_executor_; }
+  SteadyTimerPtr CreateTimer() { return timer_executor_->Get()->CreateSteadyTimer(); }
+  ExecutorPoolPtr GetNetWorkExecutorPool() { return network_executor_; }
+  ConnectionPoolPtr GetConnectionPool() { return connection_pool_; }
+  template <class function>
+  void Post(function f) {
+    if (thread_pool_ != nullptr) {
+      thread_pool_->Post(f);
+    }
+  }
+  bool AddMasterAddress(string& err_info, const string& master_info);
+  void GetXfsMasterAddress(const string& source, string& target);
+
+ protected:
+  void updMasterAddrByDns();
+
+ private:
+  TubeMQService();
+  ~TubeMQService();
+  void iniLogger(const Fileini& fileini, const string& sector);
+  void iniPoolThreads(const Fileini& fileini, const string& sector);
+  void iniXfsThread(const Fileini& fileini, const string& sector);
+  void thread_task_dnsxfs(int dns_xfs_period_ms);
+  void shutDownClinets() const;
+  bool hasXfsTask(map<string, int32_t>& src_addr_map);
+  bool addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map);
+
+ private:
+  static TubeMQService* _instance;
+  string local_host_;
+  AtomicInteger service_status_;
+  AtomicInteger client_index_base_;
+  mutable mutex mutex_;
+  map<int32_t, BaseClient*> clients_map_;
+  ExecutorPoolPtr timer_executor_;
+  ExecutorPoolPtr network_executor_;
+  ConnectionPoolPtr connection_pool_;
+  std::shared_ptr<ThreadPool> thread_pool_;
+  thread dns_xfs_thread_;
+  mutable mutex dns_mutex_;
+  int64_t last_check_time_;
+  map<string, int32_t> master_source_;
+  map<string, string> master_target_;
+};
+
+}  // namespace tubemq
+
+#endif  // TUBEMQ_CLIENT_BASE_CLIENT_H_
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
index 803eb15..6406d97 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
@@ -22,7 +22,6 @@
 
 #include <stdlib.h>
 
-
 #include <chrono>
 #include <functional>
 #include <memory>
@@ -34,6 +33,7 @@
 #include <asio/ssl.hpp>
 #include "noncopyable.h"
 
+
 namespace tubemq {
 // ThreadPool use one io_context for thread pool
 class ThreadPool : noncopyable {
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/tests/CMakeLists.txt
similarity index 87%
copy from tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
copy to tubemq-client-twins/tubemq-client-cpp/tests/CMakeLists.txt
index 9d729c2..c1866ea 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/CMakeLists.txt
@@ -22,12 +22,13 @@
 # unit_tests test is not set up using this function because it does not like
 # the additional argument on commmand line and consequently does not run any
 # test.
-function(tubemq_add_example _name)
+function(tubemq_add_tests _name)
   set(_srcs ${ARGN})
   message (STATUS "${_name} sources: ${_srcs}")
   add_executable (${_name} ${_srcs})
-  TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto log4cplus pthread)
+  TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto liblog4cplus.a libprotobuf.a tubemq_proto pthread rt)
 endfunction()
 
 add_subdirectory (log)
 add_subdirectory (executor_pool)
+add_subdirectory (thread_pool)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc b/tubemq-client-twins/tubemq-client-cpp/tests/README.md
similarity index 65%
copy from tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
copy to tubemq-client-twins/tubemq-client-cpp/tests/README.md
index 9bb1685..4998b5f 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/README.md
@@ -1,46 +1,22 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <chrono>
-#include <exception>
-#include <iostream>
-#include <string>
-#include <thread>
-
-#include "tubemq/logger.h"
-
-using namespace std;
-using namespace tubemq;
-
-void log() {
-  int i = 0;
-  while (1) {
-    LOG_ERROR("threadid:%ld, i:%d", std::this_thread::get_id(), i++);
-  }
-}
-
-int main() {
-  std::thread t1(log);
-  std::thread t2(log);
-  std::thread t3(log);
-  std::thread t4(log);
-  t1.join();
-  return 0;
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#Example#
+
+tubemq-client-cpp example. 
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/CMakeLists.txt
similarity index 94%
rename from tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt
rename to tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/CMakeLists.txt
index bef5bbb..dac0999 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/CMakeLists.txt
@@ -18,4 +18,4 @@
 #
 
 
-tubemq_add_example(executor_pool main.cc)
+tubemq_add_tests(executor_pool main.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc b/tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/main.cc
similarity index 84%
rename from tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc
rename to tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/main.cc
index 8c5ca25..04c6f48 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/main.cc
@@ -24,7 +24,7 @@
 #include <string>
 #include <thread>
 
-#include "tubemq/executor_pool.h"
+#include "executor_pool.h"
 
 using namespace std;
 using namespace tubemq;
@@ -40,7 +40,7 @@ int main() {
   using namespace std::placeholders;  // for _1, _2, _3...
   ExecutorPool pool(4);
   auto timer = pool.Get()->CreateSteadyTimer();
-  timer->expires_after(std::chrono::seconds(5));
+  timer->expires_after(std::chrono::seconds(1));
   std::cout << "startwait" << endl;
   timer->wait();
   std::cout << "endwait" << endl;
@@ -49,7 +49,10 @@ int main() {
   std::cout << "startsyncwait" << endl;
   timer->async_wait(std::bind(handler, 5, _1));
   std::cout << "endsyncwait" << endl;
-  std::this_thread::sleep_for(5s);
+  std::this_thread::sleep_for(std::chrono::seconds(1));
+  timer->expires_after(std::chrono::milliseconds(100));
+  timer->async_wait(std::bind(handler, 6, _1));
+  std::this_thread::sleep_for(std::chrono::seconds(1));
   return 0;
 }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/tests/log/CMakeLists.txt
similarity index 96%
copy from tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
copy to tubemq-client-twins/tubemq-client-cpp/tests/log/CMakeLists.txt
index b828bf8..a7a7fe9 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/log/CMakeLists.txt
@@ -18,4 +18,4 @@
 #
 
 
-tubemq_add_example(log main.cc)
+tubemq_add_tests(log main.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc b/tubemq-client-twins/tubemq-client-cpp/tests/log/main.cc
similarity index 60%
copy from tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
copy to tubemq-client-twins/tubemq-client-cpp/tests/log/main.cc
index 9bb1685..d012c32 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/log/main.cc
@@ -23,23 +23,39 @@
 #include <string>
 #include <thread>
 
-#include "tubemq/logger.h"
+#include "buffer.h"
+#include "logger.h"
+#include "tubemq/tubemq_atomic.h"
 
 using namespace std;
 using namespace tubemq;
 
-void log() {
-  int i = 0;
+AtomicInteger ati;
+
+void logfunc() {
   while (1) {
-    LOG_ERROR("threadid:%ld, i:%d", std::this_thread::get_id(), i++);
+    LOG_TRACE("atomic:%d", ati.IncrementAndGet());
   }
 }
 
 int main() {
-  std::thread t1(log);
-  std::thread t2(log);
-  std::thread t3(log);
-  std::thread t4(log);
+  {
+    auto buf = std::make_shared<Buffer>();
+    std::string data = "abcdef";
+    buf->Write(data.data(), data.size());
+    auto buf2 = buf->Slice();
+    buf2->ReadUint32();
+    buf->Write(data.data(), data.size());
+    printf("%s\n", buf->String().c_str());
+    printf("%s\n", buf2->String().c_str());
+  }
+
+  ati.GetAndSet(1);
+  GetLogger().Init("./tubemq", tubemq::Logger::Level(0));
+  std::thread t1(logfunc);
+  std::thread t2(logfunc);
+  std::thread t3(logfunc);
+  std::thread t4(logfunc);
   t1.join();
   return 0;
 }
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/CMakeLists.txt
similarity index 95%
rename from tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
rename to tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/CMakeLists.txt
index b828bf8..4fe33bb 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/CMakeLists.txt
@@ -18,4 +18,4 @@
 #
 
 
-tubemq_add_example(log main.cc)
+tubemq_add_tests(thread_pool main.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc b/tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/main.cc
similarity index 79%
rename from tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
rename to tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/main.cc
index 9bb1685..afeef04 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/main.cc
@@ -23,24 +23,14 @@
 #include <string>
 #include <thread>
 
-#include "tubemq/logger.h"
+#include "thread_pool.h"
 
 using namespace std;
 using namespace tubemq;
 
-void log() {
-  int i = 0;
-  while (1) {
-    LOG_ERROR("threadid:%ld, i:%d", std::this_thread::get_id(), i++);
-  }
-}
-
 int main() {
-  std::thread t1(log);
-  std::thread t2(log);
-  std::thread t3(log);
-  std::thread t4(log);
-  t1.join();
+  ThreadPool pool(std::thread::hardware_concurrency());
+  pool.Post([] { std::cout << "abcd" << endl; });
   return 0;
 }
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
index 7dc5e88..c223f9b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
@@ -1,25 +1,45 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
 CMAKE_MINIMUM_REQUIRED(VERSION 3.1)                                             
+
 PROJECT(TubeMQThirdParty)
 
-ADD_SUBDIRECTORY(log4cplus) 
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error")
+
+include(ExternalProject)
+
+ExternalProject_Add(
+    rapidjson
+    PREFIX "rapidjson"
+    GIT_REPOSITORY "https://github.com/Tencent/rapidjson.git"
+    GIT_TAG f54b0e47a08782a6131cc3d60f94d038fa6e0a51
+    TIMEOUT 20
+    CMAKE_ARGS
+    -DRAPIDJSON_BUILD_TESTS=OFF
+    -DRAPIDJSON_BUILD_DOC=OFF
+    -DRAPIDJSON_BUILD_EXAMPLES=OFF
+    CONFIGURE_COMMAND ""
+    BUILD_COMMAND ""
+    INSTALL_COMMAND ""
+    UPDATE_COMMAND ""
+    )
+
+ExternalProject_Add(
+    log4cplus_proj
+    URL https://github.com/log4cplus/log4cplus/releases/download/REL_2_0_5/log4cplus-2.0.5.tar.gz
+    CONFIGURE_COMMAND ./configure --prefix=<INSTALL_DIR> --disable-shared --with-pic CFLAGS=-O2\  CXXFLAGS=-O2\ -fPIC
+    TEST_BEFORE_INSTALL 0
+    BUILD_IN_SOURCE 1
+    INSTALL_DIR ${CMAKE_BINARY_DIR}/third_party/
+    )
+
+ExternalProject_Add(
+    asio_proj
+    URL https://github.com/chriskohlhoff/asio/archive/asio-1-18-0.tar.gz 
+    INSTALL_DIR ${CMAKE_BINARY_DIR}/third_party/
+    CONFIGURE_COMMAND cd ../asio_proj/asio && ./autogen.sh && ./configure --prefix=<INSTALL_DIR> CFLAGS=-std=c++11 CPPFLAGS=-std=c++11 CXXFLAGS=-std=c++11 && make install
+    TEST_BEFORE_INSTALL 0
+    BUILD_IN_SOURCE 0
+    BUILD_COMMAND "" 
+    INSTALL_COMMAND ""
+    )
+
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus b/tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus
deleted file mode 160000
index 9d00f7d..0000000
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 9d00f7d10f2507f68f9ab5fea8b842735d9c6cfe
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/rapidjson b/tubemq-client-twins/tubemq-client-cpp/third_party/rapidjson
deleted file mode 160000
index f54b0e4..0000000
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/rapidjson
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit f54b0e47a08782a6131cc3d60f94d038fa6e0a51