You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/15 08:51:52 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-351]C++ SDK example&tests (#263)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new fec786a [TUBEMQ-351]C++ SDK example&tests (#263)
fec786a is described below
commit fec786a3655c017db67cc13ff65569e7cf42f32d
Author: charlely <41...@users.noreply.github.com>
AuthorDate: Tue Sep 15 16:51:44 2020 +0800
[TUBEMQ-351]C++ SDK example&tests (#263)
Co-authored-by: charleli <ch...@tencent.com>
---
.gitmodules | 4 -
.../tubemq-client-cpp/CMakeLists.txt | 37 +---
tubemq-client-twins/tubemq-client-cpp/README.md | 7 +-
.../tubemq-client-cpp/build_linux.sh | 7 -
.../tubemq-client-cpp/example/CMakeLists.txt | 6 +-
.../example/{log => consumer}/CMakeLists.txt | 3 +-
.../example/consumer/test_consumer.cc | 137 ++++++++++++
.../example/consumer/test_multithread_pull.cc | 166 ++++++++++++++
.../{third_party => proto}/CMakeLists.txt | 9 +-
.../tubemq-client-cpp/src/baseconsumer.cc | 22 +-
.../tubemq-client-cpp/src/client_connection.cc | 38 +++-
.../tubemq-client-cpp/src/client_connection.h | 2 +-
.../tubemq-client-cpp/src/client_service.h | 242 ++++++++++-----------
.../tubemq-client-cpp/src/thread_pool.h | 2 +-
.../{example => tests}/CMakeLists.txt | 5 +-
.../{example/log/main.cc => tests/README.md} | 68 ++----
.../executor_pool/CMakeLists.txt | 2 +-
.../{example => tests}/executor_pool/main.cc | 9 +-
.../{example => tests}/log/CMakeLists.txt | 2 +-
.../{example => tests}/log/main.cc | 32 ++-
.../log => tests/thread_pool}/CMakeLists.txt | 2 +-
.../{example/log => tests/thread_pool}/main.cc | 16 +-
.../tubemq-client-cpp/third_party/CMakeLists.txt | 62 ++++--
.../tubemq-client-cpp/third_party/log4cplus | 1 -
.../tubemq-client-cpp/third_party/rapidjson | 1 -
25 files changed, 581 insertions(+), 301 deletions(-)
diff --git a/.gitmodules b/.gitmodules
index c80f44d..417049c 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -2,7 +2,3 @@
path = tubemq-client-twins/tubemq-client-cpp/third_party/rapidjson
url = https://github.com/Tencent/rapidjson.git
tag = v1.1.0
-[submodule "tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus"]
- path = tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus
- url = https://github.com/log4cplus/log4cplus
- tag = REL_2_0_5
diff --git a/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
index 984a28a..94ac9f1 100644
--- a/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
@@ -22,36 +22,23 @@ cmake_minimum_required (VERSION 3.1)
project (TubeMQ)
-set (CXX_FLAGS
- -g
- -fPIC
- -Wall
- -D__STDC_FORMAT_MACROS
- -Wno-unused-parameter
- -Wno-unused-function
- -Wunused-variable
- -Wunused-value
- -Wshadow
- -Wcast-qual
- -Wcast-align
- -Wwrite-strings
- -Wsign-compare
- -Winvalid-pch
- -fms-extensions
- -Wfloat-equal
- -Wextra
- -std=c++11
- )
+find_package(Protobuf REQUIRED)
+
+SET(CMAKE_CXX_FLAGS "-std=c++11 -O2 -g -Wall -Werror -Wsign-compare -Wfloat-equal -fno-strict-aliasing -fPIC -DASIO_STANDALONE")
+
+INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/proto)
INCLUDE_DIRECTORIES(include)
+INCLUDE_DIRECTORIES(src)
-INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/third_party/rapidjson/include)
-INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/third_party/log4cplus/include)
-INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/log4cplus/include)
-LINK_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/log4cplus/lib)
+INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/rapidjson/src/rapidjson/include)
+INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/include)
+LINK_DIRECTORIES(${CMAKE_BINARY_DIR}/third_party/lib)
-ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(third_party)
+ADD_SUBDIRECTORY(proto)
+ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(example)
+ADD_SUBDIRECTORY(tests)
diff --git a/tubemq-client-twins/tubemq-client-cpp/README.md b/tubemq-client-twins/tubemq-client-cpp/README.md
index 0f566d5..c2c1960 100644
--- a/tubemq-client-twins/tubemq-client-cpp/README.md
+++ b/tubemq-client-twins/tubemq-client-cpp/README.md
@@ -30,9 +30,8 @@
* [Log4cplus](https://github.com/log4cplus/log4cplus.git)
* [Rapidjson](https://github.com/Tencent/rapidjson.git)
-## Install ASIO
- * ./autogen.sh
- * ./configure CFLAGS=-std=c++11 CPPFLAGS=-std=c++11 CXXFLAGS=-std=c++11
- * make && make install
+## Step to build
+ * install protobuf
+ * ./build_linux.sh
diff --git a/tubemq-client-twins/tubemq-client-cpp/build_linux.sh b/tubemq-client-twins/tubemq-client-cpp/build_linux.sh
index 7664d5b..fd2dfdb 100755
--- a/tubemq-client-twins/tubemq-client-cpp/build_linux.sh
+++ b/tubemq-client-twins/tubemq-client-cpp/build_linux.sh
@@ -20,13 +20,6 @@
#!/bin/bash
-cd ../../
-git submodule init
-git submodule update
-git submodule foreach --recursive git submodule init
-git submodule foreach --recursive git submodule update
-cd -
-
mkdir build
cd build
cmake ../
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
index 9d729c2..14c0ec2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
@@ -26,8 +26,8 @@ function(tubemq_add_example _name)
set(_srcs ${ARGN})
message (STATUS "${_name} sources: ${_srcs}")
add_executable (${_name} ${_srcs})
- TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto log4cplus pthread)
+ TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto liblog4cplus.a libprotobuf.a tubemq_proto pthread rt)
endfunction()
-add_subdirectory (log)
-add_subdirectory (executor_pool)
+add_subdirectory (consumer)
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/example/consumer/CMakeLists.txt
similarity index 88%
copy from tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
copy to tubemq-client-twins/tubemq-client-cpp/example/consumer/CMakeLists.txt
index b828bf8..8f8b7f2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/example/consumer/CMakeLists.txt
@@ -18,4 +18,5 @@
#
-tubemq_add_example(log main.cc)
+tubemq_add_example(consumer test_consumer.cc)
+tubemq_add_example(multi_pull test_multithread_pull.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
new file mode 100644
index 0000000..bd1b487
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <libgen.h>
+#include <sys/time.h>
+#include <chrono>
+#include <set>
+#include <string>
+#include <thread>
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+using namespace std;
+using namespace tubemq;
+
+using std::set;
+using std::string;
+using tubemq::ConsumerConfig;
+using tubemq::ConsumerResult;
+using tubemq::TubeMQConsumer;
+
+
+
+
+
+
+int main(int argc, char* argv[]) {
+ bool result;
+ string err_info;
+ string conf_file = "../conf/client.conf";
+ string group_name = "test_c_v8";
+ string master_addr = "10.215.128.83:8000,10.215.128.83:8000";
+ TubeMQConsumer consumer_1;
+
+ set<string> topic_list;
+ topic_list.insert("test_1");
+ ConsumerConfig consumer_config;
+
+ consumer_config.SetRpcReadTimeoutMs(20000);
+ result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+ if (!result) {
+ printf("\n Set Master AddrInfo failure: %s", err_info.c_str());
+ return -1;
+ }
+ result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+ if (!result) {
+ printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+ return -1;
+ }
+ result = StartTubeMQService(err_info, conf_file);
+ if (!result) {
+ printf("\n StartTubeMQService failure: %s", err_info.c_str());
+ return -1;
+ }
+
+ result = consumer_1.Start(err_info, consumer_config);
+ if (!result) {
+ printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+ return -2;
+ }
+
+ ConsumerResult gentRet;
+ ConsumerResult confirm_result;
+ int64_t start_time = time(NULL);
+ do {
+ // 1. get Message;
+ result = consumer_1.GetMessage(gentRet);
+ if (result) {
+ // 2.1.1 if success, process message
+ list<Message> msgs = gentRet.GetMessageList();
+ printf("\n GetMessage success, msssage count =%ld ", msgs.size());
+ // 2.1.2 confirm message result
+ consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+ } else {
+ // 2.2.1 if failure, check error code
+ // print error message if errcode not in
+ // [no partitions assigned, all partitions in use,
+ // or all partitons idle, reach max position]
+ if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+ || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+ || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+ || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+ printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+ gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+ }
+ }
+ // used for test, consume 10 minutes only
+ if (time(NULL) - start_time > 10 * 60) {
+ break;
+ }
+ } while (true);
+
+ getchar(); // for test hold the test thread
+ consumer_1.ShutDown();
+
+ getchar(); // for test hold the test thread
+ result = StopTubeMQService(err_info);
+ if (!result) {
+ printf("\n *** StopTubeMQService failure, reason is %s ", err_info.c_str());
+ }
+
+ printf("\n finishe test exist ");
+ return 0;
+}
+
+
+
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
new file mode 100644
index 0000000..fc0bf8f
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <libgen.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <time.h>
+#include <string>
+#include <chrono>
+#include <set>
+#include <thread>
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_client.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+using namespace std;
+using namespace tubemq;
+
+TubeMQConsumer consumer_1;
+
+
+
+AtomicLong last_print_time(0);
+AtomicLong last_msg_count(0);
+AtomicLong last_print_count(0);
+
+
+
+void calc_message_count(int64_t msg_count) {
+ int64_t last_time = last_print_time.Get();
+ int64_t cur_count = last_msg_count.AddAndGet(msg_count);
+ int64_t cur_time = time(NULL);
+ if (cur_count - last_print_count.Get() >= 50000
+ || cur_time - last_time > 90) {
+ if (last_print_time.CompareAndSet(last_time, cur_time)) {
+ printf("\n %ld Current message count=%ld", cur_time, last_msg_count.Get());
+ last_print_count.Set(cur_count);
+ }
+ }
+}
+
+
+void thread_task_pull(int32_t thread_no) {
+ bool result;
+ int64_t msg_count = 0;
+ ConsumerResult gentRet;
+ ConsumerResult confirm_result;
+ printf("\n thread_task_pull start: %d", thread_no);
+ do {
+ msg_count = 0;
+ // 1. get Message;
+ result = consumer_1.GetMessage(gentRet);
+ if (result) {
+ // 2.1.1 if success, process message
+ list<Message> msgs = gentRet.GetMessageList();
+ msg_count = msgs.size();
+ // 2.1.2 confirm message result
+ consumer_1.Confirm(gentRet.GetConfirmContext(), true, confirm_result);
+ } else {
+ // 2.2.1 if failure, check error code
+ // print error message if errcode not in
+ // [no partitions assigned, all partitions in use,
+ // or all partitons idle, reach max position]
+ if (!(gentRet.GetErrCode() == err_code::kErrNotFound
+ || gentRet.GetErrCode() == err_code::kErrNoPartAssigned
+ || gentRet.GetErrCode() == err_code::kErrAllPartInUse
+ || gentRet.GetErrCode() == err_code::kErrAllPartWaiting)) {
+ if (gentRet.GetErrCode() == err_code::kErrMQServiceStop
+ || gentRet.GetErrCode() == err_code::kErrClientStop) {
+ break;
+ }
+ printf("\n GetMessage failure, err_code=%d, err_msg is: %s ",
+ gentRet.GetErrCode(), gentRet.GetErrMessage().c_str());
+ }
+ }
+ calc_message_count(msg_count);
+ } while (true);
+ printf("\n thread_task_pull finished: %d", thread_no);
+}
+
+
+int main(int argc, char* argv[]) {
+ bool result;
+ string err_info;
+ string conf_file = "../conf/client.conf";
+ string group_name = "test_c_v8";
+ string master_addr = "10.215.128.83:8000,10.215.128.83:8000";
+ int32_t thread_num = 15;
+ set<string> topic_list;
+ topic_list.insert("test_1");
+ ConsumerConfig consumer_config;
+
+ consumer_config.SetRpcReadTimeoutMs(20000);
+ result = consumer_config.SetMasterAddrInfo(err_info, master_addr);
+ if (!result) {
+ printf("\n Set Master AddrInfo failure: %s ", err_info.c_str());
+ return -1;
+ }
+ result = consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
+ if (!result) {
+ printf("\n Set GroupConsume Target failure: %s", err_info.c_str());
+ return -1;
+ }
+ result = StartTubeMQService(err_info, conf_file);
+ if (!result) {
+ printf("\n StartTubeMQService failure: %s", err_info.c_str());
+ return -1;
+ }
+
+ result = consumer_1.Start(err_info, consumer_config);
+ if (!result) {
+ printf("\n Initial consumer failure, error is: %s ", err_info.c_str());
+ return -2;
+ }
+
+ std::thread pull_threads[thread_num];
+ for (int32_t i = 0; i < thread_num; i++) {
+ pull_threads[i] = std::thread(thread_task_pull, i);
+ }
+
+ getchar(); // for test hold the test thread
+ consumer_1.ShutDown();
+ //
+ for (int32_t i = 0; i < thread_num; i++) {
+ if (pull_threads[i].joinable()) {
+ pull_threads[i].join();
+ }
+ }
+
+ getchar(); // for test hold the test thread
+ result = StopTubeMQService(err_info);
+ if (!result) {
+ printf("\n *** StopTubeMQService failure, reason is %s", err_info.c_str());
+ }
+
+ printf("\n finishe test exist");
+ return 0;
+}
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/proto/CMakeLists.txt
similarity index 74%
copy from tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
copy to tubemq-client-twins/tubemq-client-cpp/proto/CMakeLists.txt
index 7dc5e88..c10ad25 100644
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/proto/CMakeLists.txt
@@ -17,9 +17,10 @@
# under the License.
#
+cmake_minimum_required (VERSION 3.1)
-CMAKE_MINIMUM_REQUIRED(VERSION 3.1)
-PROJECT(TubeMQThirdParty)
-
-ADD_SUBDIRECTORY(log4cplus)
+file(GLOB ProtoFiles "${CMAKE_CURRENT_SOURCE_DIR}/*.proto")
+PROTOBUF_GENERATE_CPP(ProtoSources ProtoHeaders ${ProtoFiles})
+add_library(tubemq_proto STATIC ${ProtoSources} ${ProtoHeaders})
+target_link_libraries(tubemq_proto libprotobuf.a)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index 714b353..46d103b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -146,9 +146,7 @@ bool BaseConsumer::GetMessage(ConsumerResult& result) {
result.SetFailureResult(error_code, err_info);
return false;
}
-
int64_t curr_offset = tb_config::kInvalidValue;
-
bool filter_consume = sub_info_.IsFilterConsume(partition_ext.GetTopic());
PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
partition_ext.GetPartitionKey(), curr_offset);
@@ -208,11 +206,9 @@ bool BaseConsumer::IsConsumeReady(ConsumerResult& result) {
if (err_code::kErrSuccess == ret_code) {
return true;
}
-
if ((config_.GetMaxPartCheckPeriodMs() >= 0)
&& (Utils::GetCurrentTimeMillis() - start_time
>= config_.GetMaxPartCheckPeriodMs())) {
-
switch (ret_code) {
case err_code::kErrNoPartAssigned: {
result.SetFailureResult(ret_code,
@@ -281,9 +277,7 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
string part_key = Utils::Trim(confirm_context.substr(0, pos1));
string booked_time_str =
Utils::Trim(confirm_context.substr(pos1 + token1.size(), confirm_context.size()));
-
int64_t booked_time = atol(booked_time_str.c_str());
-
pos1 = part_key.find(token2);
if (string::npos == pos1) {
result.SetFailureResult(err_code::kErrBadRequest,
@@ -311,9 +305,7 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
"Not found the partition by confirm_context!");
return false;
}
-
int64_t curr_offset = tb_config::kInvalidValue;
-
PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
partition_ext.GetPartitionKey(), curr_offset);
auto request = std::make_shared<RequestContext>();
@@ -352,9 +344,7 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
if (rsp->success_) {
CommitOffsetResponseB2C rsp_b2c;
ret_result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
-
(int32_t)(rsp->rsp_body_.data().length()));
-
if (ret_result) {
if (rsp_b2c.success()) {
curr_offset = rsp_b2c.curroffset();
@@ -449,7 +439,6 @@ bool BaseConsumer::register2Master(int32_t& error_code, string& err_info, bool n
error_code = error.Value();
err_info = error.Message();
}
-
if (error_code == err_code::kErrConsumeGroupForbidden
|| error_code == err_code::kErrConsumeContentForbidden) {
// set regist process status to existed
@@ -816,8 +805,7 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
if (rsp->success_) {
HeartBeatResponseB2C rsp_b2c;
bool result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
- (int32_t)(rsp->rsp_body_.data().length()));
-
+ (int32_t)(rsp->rsp_body_.data().length()));
if (result) {
set<string> partition_keys;
if (rsp_b2c.success()) {
@@ -1079,7 +1067,6 @@ bool BaseConsumer::processRegisterResponseM2C(int32_t& error_code, string& err_i
RegisterResponseM2C rsp_m2c;
bool result = rsp_m2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
(int32_t)(rsp_protocol->rsp_body_.data().length()));
-
if (!result) {
error_code = err_code::kErrParseFailure;
err_info = "Parse RegisterResponseM2C response failure!";
@@ -1237,7 +1224,6 @@ bool BaseConsumer::processRegResponseB2C(int32_t& error_code, string& err_info,
RegisterResponseB2C rsp_b2c;
bool result = rsp_b2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
(int32_t)(rsp_protocol->rsp_body_.data().length()));
-
if (!result) {
error_code = err_code::kErrParseFailure;
err_info = "Parse RegisterResponseB2C response failure!";
@@ -1332,7 +1318,6 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
bool filter_consume, const PartitionExt& partition_ext,
const string& confirm_context,
const TubeMQCodec::RspProtocolPtr& rsp) {
-
// #lizard forgives
string err_info;
@@ -1348,13 +1333,11 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
GetMessageResponseB2C rsp_b2c;
bool ret_result = rsp_b2c.ParseFromArray(
rsp->rsp_body_.data().c_str(), (int32_t)(rsp->rsp_body_.data().length()));
-
if (!ret_result) {
rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false);
result.SetFailureResult(err_code::kErrServerError,
"Parse GetMessageResponseB2C response failure!",
partition_ext.GetTopic(), peer_info);
-
LOG_TRACE("[CONSUMER] processGetMessageRspB2C parse failure, client=%s", client_uuid_.c_str());
return false;
}
@@ -1362,12 +1345,10 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
switch (rsp_b2c.errcode()) {
case err_code::kErrSuccess: {
bool esc_limit = (rsp_b2c.has_escflowctrl() && rsp_b2c.escflowctrl());
-
int64_t data_dltval =
rsp_b2c.has_currdatadlt() ? rsp_b2c.currdatadlt() : tb_config::kInvalidValue;
int64_t curr_offset = rsp_b2c.has_curroffset() ?
rsp_b2c.curroffset() : tb_config::kInvalidValue;
-
bool req_slow = rsp_b2c.has_requireslow() ? rsp_b2c.requireslow() : false;
int msg_size = 0;
list<Message> message_list;
@@ -1392,7 +1373,6 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
case err_code::kErrConsumeSpeedLimit: {
// Process with server side speed limit
-
int64_t def_dlttime = rsp_b2c.has_minlimittime() ? rsp_b2c.minlimittime()
: config_.GetMsgNotFoundWaitPeriodMs();
rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false,
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
index c795d35..e6ae503 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
@@ -150,7 +150,7 @@ void ClientConnection::asyncRead() {
}
recv_buffer_->EnsureWritableBytes(rpc_config::kRpcEnsureWriteableBytes);
auto self = shared_from_this();
- socket_->async_read_some(
+ socket_->async_receive(
asio::buffer(recv_buffer_->WriteBegin(), recv_buffer_->WritableBytes()),
[self, this](std::error_code ec, std::size_t len) {
if (ec) {
@@ -166,21 +166,36 @@ void ClientConnection::asyncRead() {
}
recv_time_ = std::time(nullptr);
recv_buffer_->WriteBytes(len);
- checkPackageDone();
- LOG_TRACE("[%s]async read done, len:%ld, package_length_:%ld, recvbuffer:%s",
- ToString().c_str(), len, package_length_, recv_buffer_->String().c_str());
+ std::error_code error;
+ size_t availsize = socket_->available(error);
+ LOG_TRACE("[%s]async read done, len:%ld, package_length_:%ld, availsize:%ld, recvbuffer:%s",
+ ToString().c_str(), len, package_length_, availsize,
+ recv_buffer_->String().c_str());
+ if (availsize > 0 && !error) {
+ recv_buffer_->EnsureWritableBytes(availsize);
+ size_t rlen = socket_->receive(asio::buffer(recv_buffer_->WriteBegin(), availsize));
+ if (rlen > 0) {
+ recv_buffer_->WriteBytes(rlen);
+ }
+ LOG_TRACE("[%s]syncread done, receivelen:%ld, recvbuffer:%s", ToString().c_str(), rlen,
+ recv_buffer_->String().c_str());
+ }
+ while (checkPackageDone() > 0 && recv_buffer_->length() > 0) {
+ LOG_TRACE("[%s]recheck packagedone package_length_:%ld, recvbuffer:%s",
+ ToString().c_str(), package_length_, recv_buffer_->String().c_str());
+ }
asyncRead();
});
}
-void ClientConnection::checkPackageDone() {
+int ClientConnection::checkPackageDone() {
if (check_ == nullptr) {
recv_buffer_->Reset();
LOG_ERROR("check codec func not set");
- return;
+ return -1;
}
if (package_length_ > recv_buffer_->length()) {
- return;
+ return 0;
}
uint32_t request_id = 0;
bool has_request_id = false;
@@ -192,11 +207,11 @@ void ClientConnection::checkPackageDone() {
package_length_ = 0;
LOG_ERROR("%s, check codec package result:%d", ToString().c_str(), result);
close();
- return;
+ return -1;
}
if (result == 0) {
package_length_ = package_length;
- return;
+ return 0;
}
++read_package_number_;
package_length_ = 0;
@@ -205,12 +220,13 @@ void ClientConnection::checkPackageDone() {
auto it = request_list_.begin();
if (it == request_list_.end()) {
LOG_ERROR("%s, not find request", ToString().c_str());
- return;
+ return 1;
}
requestCallback(it->first, nullptr, &check_out);
- return;
+ return 1;
}
requestCallback(request_id, nullptr, &check_out);
+ return 1;
}
void ClientConnection::requestCallback(uint32_t request_id, ErrorCode* err, Any* check_out) {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
index b6c7a36..2000e21 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
@@ -78,7 +78,7 @@ class ClientConnection : public Connection, public std::enable_shared_from_this<
void checkDeadline(const std::error_code& ec);
void contextString();
void asyncRead();
- void checkPackageDone();
+ int checkPackageDone();
void requestCallback(uint32_t request_id, ErrorCode* err = nullptr, Any* check_out = nullptr);
TransportRequest* nextTransportRequest();
void asyncWrite();
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
index 11f5b42..7271ca0 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
@@ -1,121 +1,121 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-#ifndef TUBEMQ_CLIENT_BASE_CLIENT_H_
-#define TUBEMQ_CLIENT_BASE_CLIENT_H_
-
-#include <stdint.h>
-
-#include <map>
-#include <mutex>
-#include <string>
-#include <thread>
-
-#include "connection_pool.h"
-#include "file_ini.h"
-#include "noncopyable.h"
-#include "rmt_data_cache.h"
-#include "thread_pool.h"
-#include "tubemq/tubemq_atomic.h"
-#include "tubemq/tubemq_config.h"
-#include "tubemq/tubemq_message.h"
-#include "tubemq/tubemq_return.h"
-
-namespace tubemq {
-
-using std::map;
-using std::mutex;
-using std::string;
-using std::thread;
-
-class BaseClient {
- public:
- explicit BaseClient(bool is_producer);
- virtual ~BaseClient();
- virtual void ShutDown() {}
- void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
- bool IsProducer() { return is_producer_; }
- const int32_t GetClientIndex() { return client_index_; }
-
- protected:
- bool is_producer_;
- int32_t client_index_;
-};
-
-class TubeMQService : public noncopyable {
- public:
- static TubeMQService* Instance();
- bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
- bool Stop(string& err_info);
- bool IsRunning();
- const int32_t GetServiceStatus() const { return service_status_.Get(); }
- int32_t GetClientObjCnt();
- bool AddClientObj(string& err_info, BaseClient* client_obj);
- BaseClient* GetClientObj(int32_t client_index) const;
- void RmvClientObj(BaseClient* client_obj);
- const string& GetLocalHost() const { return local_host_; }
- ExecutorPoolPtr GetTimerExecutorPool() { return timer_executor_; }
- SteadyTimerPtr CreateTimer() { return timer_executor_->Get()->CreateSteadyTimer(); }
- ExecutorPoolPtr GetNetWorkExecutorPool() { return network_executor_; }
- ConnectionPoolPtr GetConnectionPool() { return connection_pool_; }
- template <class function>
- void Post(function f) {
- if (thread_pool_ != nullptr) {
- thread_pool_->Post(f);
- }
- }
- bool AddMasterAddress(string& err_info, const string& master_info);
- void GetXfsMasterAddress(const string& source, string& target);
-
- protected:
- void updMasterAddrByDns();
-
- private:
- TubeMQService();
- ~TubeMQService();
- void iniLogger(const Fileini& fileini, const string& sector);
- void iniPoolThreads(const Fileini& fileini, const string& sector);
- void iniXfsThread(const Fileini& fileini, const string& sector);
- void thread_task_dnsxfs(int dns_xfs_period_ms);
- void shutDownClinets() const;
- bool hasXfsTask(map<string, int32_t>& src_addr_map);
- bool addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map);
-
- private:
- static TubeMQService* _instance;
- string local_host_;
- AtomicInteger service_status_;
- AtomicInteger client_index_base_;
- mutable mutex mutex_;
- map<int32_t, BaseClient*> clients_map_;
- ExecutorPoolPtr timer_executor_;
- ExecutorPoolPtr network_executor_;
- ConnectionPoolPtr connection_pool_;
- std::shared_ptr<ThreadPool> thread_pool_;
- thread dns_xfs_thread_;
- mutable mutex dns_mutex_;
- int64_t last_check_time_;
- map<string, int32_t> master_source_;
- map<string, string> master_target_;
-};
-
-} // namespace tubemq
-
-#endif // TUBEMQ_CLIENT_BASE_CLIENT_H_
\ No newline at end of file
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_BASE_CLIENT_H_
+#define TUBEMQ_CLIENT_BASE_CLIENT_H_
+
+#include <stdint.h>
+
+#include <map>
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "connection_pool.h"
+#include "file_ini.h"
+#include "noncopyable.h"
+#include "rmt_data_cache.h"
+#include "thread_pool.h"
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+namespace tubemq {
+
+using std::map;
+using std::mutex;
+using std::string;
+using std::thread;
+
+class BaseClient {
+ public:
+ explicit BaseClient(bool is_producer);
+ virtual ~BaseClient();
+ virtual void ShutDown() {}
+ void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
+ bool IsProducer() { return is_producer_; }
+ const int32_t GetClientIndex() { return client_index_; }
+
+ protected:
+ bool is_producer_;
+ int32_t client_index_;
+};
+
+class TubeMQService : public noncopyable {
+ public:
+ static TubeMQService* Instance();
+ bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
+ bool Stop(string& err_info);
+ bool IsRunning();
+ const int32_t GetServiceStatus() const { return service_status_.Get(); }
+ int32_t GetClientObjCnt();
+ bool AddClientObj(string& err_info, BaseClient* client_obj);
+ BaseClient* GetClientObj(int32_t client_index) const;
+ void RmvClientObj(BaseClient* client_obj);
+ const string& GetLocalHost() const { return local_host_; }
+ ExecutorPoolPtr GetTimerExecutorPool() { return timer_executor_; }
+ SteadyTimerPtr CreateTimer() { return timer_executor_->Get()->CreateSteadyTimer(); }
+ ExecutorPoolPtr GetNetWorkExecutorPool() { return network_executor_; }
+ ConnectionPoolPtr GetConnectionPool() { return connection_pool_; }
+ template <class function>
+ void Post(function f) {
+ if (thread_pool_ != nullptr) {
+ thread_pool_->Post(f);
+ }
+ }
+ bool AddMasterAddress(string& err_info, const string& master_info);
+ void GetXfsMasterAddress(const string& source, string& target);
+
+ protected:
+ void updMasterAddrByDns();
+
+ private:
+ TubeMQService();
+ ~TubeMQService();
+ void iniLogger(const Fileini& fileini, const string& sector);
+ void iniPoolThreads(const Fileini& fileini, const string& sector);
+ void iniXfsThread(const Fileini& fileini, const string& sector);
+ void thread_task_dnsxfs(int dns_xfs_period_ms);
+ void shutDownClinets() const;
+ bool hasXfsTask(map<string, int32_t>& src_addr_map);
+ bool addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map);
+
+ private:
+ static TubeMQService* _instance;
+ string local_host_;
+ AtomicInteger service_status_;
+ AtomicInteger client_index_base_;
+ mutable mutex mutex_;
+ map<int32_t, BaseClient*> clients_map_;
+ ExecutorPoolPtr timer_executor_;
+ ExecutorPoolPtr network_executor_;
+ ConnectionPoolPtr connection_pool_;
+ std::shared_ptr<ThreadPool> thread_pool_;
+ thread dns_xfs_thread_;
+ mutable mutex dns_mutex_;
+ int64_t last_check_time_;
+ map<string, int32_t> master_source_;
+ map<string, string> master_target_;
+};
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_BASE_CLIENT_H_
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
index 803eb15..6406d97 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
@@ -22,7 +22,6 @@
#include <stdlib.h>
-
#include <chrono>
#include <functional>
#include <memory>
@@ -34,6 +33,7 @@
#include <asio/ssl.hpp>
#include "noncopyable.h"
+
namespace tubemq {
// ThreadPool use one io_context for thread pool
class ThreadPool : noncopyable {
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/tests/CMakeLists.txt
similarity index 87%
copy from tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
copy to tubemq-client-twins/tubemq-client-cpp/tests/CMakeLists.txt
index 9d729c2..c1866ea 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/CMakeLists.txt
@@ -22,12 +22,13 @@
# unit_tests test is not set up using this function because it does not like
# the additional argument on commmand line and consequently does not run any
# test.
-function(tubemq_add_example _name)
+function(tubemq_add_tests _name)
set(_srcs ${ARGN})
message (STATUS "${_name} sources: ${_srcs}")
add_executable (${_name} ${_srcs})
- TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto log4cplus pthread)
+ TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto liblog4cplus.a libprotobuf.a tubemq_proto pthread rt)
endfunction()
add_subdirectory (log)
add_subdirectory (executor_pool)
+add_subdirectory (thread_pool)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc b/tubemq-client-twins/tubemq-client-cpp/tests/README.md
similarity index 65%
copy from tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
copy to tubemq-client-twins/tubemq-client-cpp/tests/README.md
index 9bb1685..4998b5f 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/README.md
@@ -1,46 +1,22 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <chrono>
-#include <exception>
-#include <iostream>
-#include <string>
-#include <thread>
-
-#include "tubemq/logger.h"
-
-using namespace std;
-using namespace tubemq;
-
-void log() {
- int i = 0;
- while (1) {
- LOG_ERROR("threadid:%ld, i:%d", std::this_thread::get_id(), i++);
- }
-}
-
-int main() {
- std::thread t1(log);
- std::thread t2(log);
- std::thread t3(log);
- std::thread t4(log);
- t1.join();
- return 0;
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#Example#
+
+tubemq-client-cpp example.
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/CMakeLists.txt
similarity index 94%
rename from tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt
rename to tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/CMakeLists.txt
index bef5bbb..dac0999 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/CMakeLists.txt
@@ -18,4 +18,4 @@
#
-tubemq_add_example(executor_pool main.cc)
+tubemq_add_tests(executor_pool main.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc b/tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/main.cc
similarity index 84%
rename from tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc
rename to tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/main.cc
index 8c5ca25..04c6f48 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/executor_pool/main.cc
@@ -24,7 +24,7 @@
#include <string>
#include <thread>
-#include "tubemq/executor_pool.h"
+#include "executor_pool.h"
using namespace std;
using namespace tubemq;
@@ -40,7 +40,7 @@ int main() {
using namespace std::placeholders; // for _1, _2, _3...
ExecutorPool pool(4);
auto timer = pool.Get()->CreateSteadyTimer();
- timer->expires_after(std::chrono::seconds(5));
+ timer->expires_after(std::chrono::seconds(1));
std::cout << "startwait" << endl;
timer->wait();
std::cout << "endwait" << endl;
@@ -49,7 +49,10 @@ int main() {
std::cout << "startsyncwait" << endl;
timer->async_wait(std::bind(handler, 5, _1));
std::cout << "endsyncwait" << endl;
- std::this_thread::sleep_for(5s);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ timer->expires_after(std::chrono::milliseconds(100));
+ timer->async_wait(std::bind(handler, 6, _1));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/tests/log/CMakeLists.txt
similarity index 96%
copy from tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
copy to tubemq-client-twins/tubemq-client-cpp/tests/log/CMakeLists.txt
index b828bf8..a7a7fe9 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/log/CMakeLists.txt
@@ -18,4 +18,4 @@
#
-tubemq_add_example(log main.cc)
+tubemq_add_tests(log main.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc b/tubemq-client-twins/tubemq-client-cpp/tests/log/main.cc
similarity index 60%
copy from tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
copy to tubemq-client-twins/tubemq-client-cpp/tests/log/main.cc
index 9bb1685..d012c32 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/log/main.cc
@@ -23,23 +23,39 @@
#include <string>
#include <thread>
-#include "tubemq/logger.h"
+#include "buffer.h"
+#include "logger.h"
+#include "tubemq/tubemq_atomic.h"
using namespace std;
using namespace tubemq;
-void log() {
- int i = 0;
+AtomicInteger ati;
+
+void logfunc() {
while (1) {
- LOG_ERROR("threadid:%ld, i:%d", std::this_thread::get_id(), i++);
+ LOG_TRACE("atomic:%d", ati.IncrementAndGet());
}
}
int main() {
- std::thread t1(log);
- std::thread t2(log);
- std::thread t3(log);
- std::thread t4(log);
+ {
+ auto buf = std::make_shared<Buffer>();
+ std::string data = "abcdef";
+ buf->Write(data.data(), data.size());
+ auto buf2 = buf->Slice();
+ buf2->ReadUint32();
+ buf->Write(data.data(), data.size());
+ printf("%s\n", buf->String().c_str());
+ printf("%s\n", buf2->String().c_str());
+ }
+
+ ati.GetAndSet(1);
+ GetLogger().Init("./tubemq", tubemq::Logger::Level(0));
+ std::thread t1(logfunc);
+ std::thread t2(logfunc);
+ std::thread t3(logfunc);
+ std::thread t4(logfunc);
t1.join();
return 0;
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/CMakeLists.txt
similarity index 95%
rename from tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
rename to tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/CMakeLists.txt
index b828bf8..4fe33bb 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/CMakeLists.txt
@@ -18,4 +18,4 @@
#
-tubemq_add_example(log main.cc)
+tubemq_add_tests(thread_pool main.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc b/tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/main.cc
similarity index 79%
rename from tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
rename to tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/main.cc
index 9bb1685..afeef04 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/log/main.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/tests/thread_pool/main.cc
@@ -23,24 +23,14 @@
#include <string>
#include <thread>
-#include "tubemq/logger.h"
+#include "thread_pool.h"
using namespace std;
using namespace tubemq;
-void log() {
- int i = 0;
- while (1) {
- LOG_ERROR("threadid:%ld, i:%d", std::this_thread::get_id(), i++);
- }
-}
-
int main() {
- std::thread t1(log);
- std::thread t2(log);
- std::thread t3(log);
- std::thread t4(log);
- t1.join();
+ ThreadPool pool(std::thread::hardware_concurrency());
+ pool.Post([] { std::cout << "abcd" << endl; });
return 0;
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
index 7dc5e88..c223f9b 100644
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/third_party/CMakeLists.txt
@@ -1,25 +1,45 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
CMAKE_MINIMUM_REQUIRED(VERSION 3.1)
+
PROJECT(TubeMQThirdParty)
-ADD_SUBDIRECTORY(log4cplus)
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error")
+
+include(ExternalProject)
+
+ExternalProject_Add(
+ rapidjson
+ PREFIX "rapidjson"
+ GIT_REPOSITORY "https://github.com/Tencent/rapidjson.git"
+ GIT_TAG f54b0e47a08782a6131cc3d60f94d038fa6e0a51
+ TIMEOUT 20
+ CMAKE_ARGS
+ -DRAPIDJSON_BUILD_TESTS=OFF
+ -DRAPIDJSON_BUILD_DOC=OFF
+ -DRAPIDJSON_BUILD_EXAMPLES=OFF
+ CONFIGURE_COMMAND ""
+ BUILD_COMMAND ""
+ INSTALL_COMMAND ""
+ UPDATE_COMMAND ""
+ )
+
+ExternalProject_Add(
+ log4cplus_proj
+ URL https://github.com/log4cplus/log4cplus/releases/download/REL_2_0_5/log4cplus-2.0.5.tar.gz
+ CONFIGURE_COMMAND ./configure --prefix=<INSTALL_DIR> --disable-shared --with-pic CFLAGS=-O2\ CXXFLAGS=-O2\ -fPIC
+ TEST_BEFORE_INSTALL 0
+ BUILD_IN_SOURCE 1
+ INSTALL_DIR ${CMAKE_BINARY_DIR}/third_party/
+ )
+
+ExternalProject_Add(
+ asio_proj
+ URL https://github.com/chriskohlhoff/asio/archive/asio-1-18-0.tar.gz
+ INSTALL_DIR ${CMAKE_BINARY_DIR}/third_party/
+ CONFIGURE_COMMAND cd ../asio_proj/asio && ./autogen.sh && ./configure --prefix=<INSTALL_DIR> CFLAGS=-std=c++11 CPPFLAGS=-std=c++11 CXXFLAGS=-std=c++11 && make install
+ TEST_BEFORE_INSTALL 0
+ BUILD_IN_SOURCE 0
+ BUILD_COMMAND ""
+ INSTALL_COMMAND ""
+ )
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus b/tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus
deleted file mode 160000
index 9d00f7d..0000000
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/log4cplus
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 9d00f7d10f2507f68f9ab5fea8b842735d9c6cfe
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/rapidjson b/tubemq-client-twins/tubemq-client-cpp/third_party/rapidjson
deleted file mode 160000
index f54b0e4..0000000
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/rapidjson
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit f54b0e47a08782a6131cc3d60f94d038fa6e0a51