You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 11:02:56 UTC
[inlong] branch master updated: [INLONG-5642][SDK] Change SDK(cpp) logger framework (#5715)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 143aade8d [INLONG-5642][SDK] Change SDK(cpp) logger framework (#5715)
143aade8d is described below
commit 143aade8d64391aade1b9af00016fd06793553da
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Fri Aug 26 19:02:51 2022 +0800
[INLONG-5642][SDK] Change SDK(cpp) logger framework (#5715)
---
.../dataproxy-sdk-cpp/README.md | 3 +-
.../release/conf/config_example.json | 2 +-
.../dataproxy-sdk-cpp/release/inc/client_config.h | 9 +-
.../dataproxy-sdk-cpp/release/inc/tc_api.h | 4 +-
.../dataproxy-sdk-cpp/src/base/client_config.cc | 48 +--
.../dataproxy-sdk-cpp/src/base/logger.cc | 372 +++++++++++++++++----
.../dataproxy-sdk-cpp/src/base/logger.h | 276 ++++++++++-----
.../dataproxy-sdk-cpp/src/base/pack_queue.cc | 42 +--
.../dataproxy-sdk-cpp/src/base/proxylist_config.cc | 26 +-
.../dataproxy-sdk-cpp/src/base/sdk_constant.h | 24 +-
.../dataproxy-sdk-cpp/src/base/sdk_core.cc | 96 +++---
.../dataproxy-sdk-cpp/src/base/sdk_core.h | 2 +-
.../dataproxy-sdk-cpp/src/base/utils.cc | 9 +-
.../dataproxy-sdk-cpp/src/net/buffer_pool.cc | 52 +--
.../src/net/executor_thread_pool.cc | 2 +-
.../dataproxy-sdk-cpp/src/net/socket_connection.cc | 8 +-
.../test/base/client_config_test.cc | 21 +-
.../dataproxy-sdk-cpp/test/base/pack_queue_test.cc | 27 +-
.../test/base/proxylist_config_test.cc | 23 +-
.../dataproxy-sdk-cpp/test/base/test_log.cc | 2 +-
.../dataproxy-sdk-cpp/test/base/test_utils.cc | 2 -
.../dataproxy-sdk-cpp/test/main/init_exit_test.cc | 27 ++
.../dataproxy-sdk-cpp/test/net/big_pack_test.cc | 3 +-
.../dataproxy-sdk-cpp/test/net/buffer_pool_test.cc | 7 +-
.../dataproxy-sdk-cpp/test/net/executor_test.cc | 2 -
.../dataproxy-sdk-cpp/test/net/send_buffer_test.cc | 2 -
.../test/net/socket_connection_test.cc | 9 +-
.../dataproxy-sdk-cpp/third_party/CMakeLists.txt | 9 -
28 files changed, 705 insertions(+), 404 deletions(-)
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/README.md b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/README.md
index 526eabd91..644186ea0 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/README.md
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/README.md
@@ -26,7 +26,6 @@ dataproxy-sdk cpp version, used for sending data to dataproxy
## Prerequisites
* CMake 3.1+
-* log4cplus
* snappy
* curl
* rapidjson
@@ -75,7 +74,7 @@ Refer to `release/conf/config_example.json`.
1. First, init dataproxy-sdk, there are two ways you can choose:
- A) `int32_t tc_api_init(const char* config_file)`. Here, `config_file` is the path of your config file, and absolute path is recommended. Note that only once called is needed in one process.
-- B) `int32_t tc_api_init(ClientConfig* client_config)`. Here, `client_config` is the pointer of a `ClientConfig` object.
+- B) `int32_t tc_api_init(ClientConfig& client_config)`. Here, `client_config` is the pointer of a `ClientConfig` object.
2. Then, send data: `int32_t tc_api_send(const char* inlong_group_id, const char* inlong_stream_id, const char* msg, int32_t msg_len, UserCallBack call_back = NULL)`. If you set `call_back`, it will be callbacked if your data failed to send. See the signature of `UserCallBack` in `release/inc/user_msg.h`.
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/conf/config_example.json b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/conf/config_example.json
index 0f8c279ec..39353f8a5 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/conf/config_example.json
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/conf/config_example.json
@@ -24,7 +24,7 @@
"enable_hb": true,
"hb_interval": 60,
"proxy_update_interval": 10,
- "proxy_cfg_preurl": "http://127.0.0.1:8099/api/inlong/manager/openapi/dataproxy/getIpList",
+ "proxy_cfg_preurl": "http://127.0.0.1:8099/inlong/manager/openapi/dataproxy/getIpList",
"need_auth": true,
"auth_id": "admin",
"auth_key": "87haw3VYTPqK5fK0"
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/client_config.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/client_config.h
index ed8eaad5d..81458227b 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/client_config.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/client_config.h
@@ -30,7 +30,6 @@ namespace dataproxy_sdk
{
private:
std::string config_path_;
- bool user_config_err_=false;
int32_t buf_num_; //sendbuf num of each bufpool, max_buf_pool_/(ext_pack_size_+400)
public:
@@ -93,11 +92,11 @@ namespace dataproxy_sdk
std::string auth_id_;
std::string auth_key_;
- ClientConfig(const std::string config_path) : config_path_(config_path) {}
- ClientConfig(const std::string& proxy_url, bool need_auth, const std::string& auth_id, const std::string& auth_key);
-
- bool parseConfig(); // return false if parse failed
void defaultInit();
+
+ ClientConfig() { defaultInit(); }
+
+ bool parseConfig(const std::string& config_path); // return false if parse failed
void showClientConfig();
void updateBufSize();
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h
index 5518d9b4b..8be16cba0 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h
@@ -56,7 +56,7 @@ int32_t tc_api_init(const char* config_file);
* @description: tc_api_init ext function
* @return 0 if success
* @param {char*} config_file - user configfile, prefer using absolute path
- * @param {int32_t} use_def - is use_def isn't 0,
+ * @param {int32_t} use_def - if use_def is zero, directly return if parsing config_file failed, which means this init failed
*/
int32_t tc_api_init_ext(const char* config_file, int32_t use_def);
@@ -65,7 +65,7 @@ int32_t tc_api_init_ext(const char* config_file, int32_t use_def);
* @return 0 if success
* @param {ClientConfig&} refer to client_config.h
*/
-int32_t tc_api_init(ClientConfig* client_config);
+int32_t tc_api_init(ClientConfig& client_config);
/**
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc
index 0aad0f6d8..678e20b76 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc
@@ -27,46 +27,9 @@
namespace dataproxy_sdk
{
- ClientConfig::ClientConfig(const std::string& proxy_url, bool need_auth, const std::string& auth_id, const std::string& auth_key):
- proxy_URL_(proxy_url),
- need_auth_(need_auth),
- auth_id_(auth_id),
- auth_key_(auth_key),
- thread_nums_(constants::kThreadNums),
- shared_buf_nums_(constants::kSharedBufferNums),
- enable_groupId_isolation_(constants::kEnableGroupidIsolation),
- buffer_num_per_groupId_(constants::kBufferNumPerGroupid),
- net_tag_(constants::kNetTag),
- enable_pack_(constants::kEnablePack),
- pack_size_(constants::kPackSize),
- pack_timeout_(constants::kPackTimeout),
- ext_pack_size_(constants::kExtPackSize),
- enable_zip_(constants::kEnableZip),
- min_zip_len_(constants::kMinZipLen),
- enable_retry_(constants::kEnableRetry),
- retry_interval_(constants::kRetryInterval),
- retry_num_(constants::kRetryNum),
- log_num_(constants::kLogNum),
- log_size_(constants::kLogSize),
- log_level_(constants::kLogLevel),
- log_file_type_(constants::kLogFileType),
- log_path_(constants::kLogPath),
- log_enable_limit_(constants::kLogEnableLimit),
- enable_proxy_URL_from_cluster_(constants::kEnableProxyURLFromCluster),
- proxy_cluster_URL_(constants::kProxyClusterURL),
- proxy_update_interval_(constants::kProxyUpdateInterval),
- proxy_URL_timeout_(constants::kProxyURLTimeout),
- max_active_proxy_num_(constants::kMaxActiveProxyNum),
- ser_ip_(constants::kSerIP),
- max_buf_pool_(constants::kMaxBufPool),
- msg_type_(constants::kMsgType),
- enable_TCP_nagle_(constants::kEnableTCPNagle),
- mask_cpu_affinity_(constants::kMaskCPUAffinity),
- is_from_DC_(constants::kIsFromDC),
- extend_field_(constants::kExtendField) {}
-
- bool ClientConfig::parseConfig()
+ bool ClientConfig::parseConfig(const std::string& config_path)
{
+ config_path_ = config_path;
std::string file_content;
if (!Utils::readFile(config_path_, file_content))
{
@@ -644,7 +607,6 @@ namespace dataproxy_sdk
}
void ClientConfig::defaultInit(){
- user_config_err_=true;
thread_nums_=constants::kThreadNums;
shared_buf_nums_=constants::kSharedBufferNums;
@@ -690,16 +652,14 @@ namespace dataproxy_sdk
is_from_DC_=constants::kIsFromDC;
extend_field_=constants::kExtendField;
+ need_auth_=constants::kNeedAuth;
+
buf_size_ = ext_pack_size_ + 400;
buf_num_ = max_buf_pool_ / (buf_size_);
}
void ClientConfig::showClientConfig()
{
- if(user_config_err_){
- LOG_ERROR("dataproxy_sdk_cpp init user config err, then use default config values");
- }
-
LOG_WARN("thread_num: %d", thread_nums_);
LOG_WARN("shared_buf_num: %d", shared_buf_nums_);
LOG_WARN("inlong_group_ids: <%s>", Utils::getVectorStr(inlong_group_ids_).c_str());
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc
index a4fa43e6b..28e26483b 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc
@@ -19,79 +19,317 @@
#include "logger.h"
-#include <log4cplus/consoleappender.h>
-#include <log4cplus/fileappender.h>
-#include <log4cplus/layout.h>
-#include <log4cplus/logger.h>
-#include <log4cplus/loggingmacros.h>
+#include <sys/time.h>
+#include <time.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <string.h>
+#include <pthread.h>
#include <stdarg.h>
-
-#include <string>
-
-#include "singleton.h"
+#include <stddef.h>
+#include <unistd.h>
+#include <errno.h>
namespace dataproxy_sdk
{
-Logger& getLogger() { return Singleton<Logger>::instance(); }
-
-bool Logger::init(uint32_t file_max_size,
- uint32_t file_num,
- uint8_t level,
- uint8_t output_type,
- bool enable_limit,
- const std::string& base_path,
- const std::string& logname)
-{
- file_max_size_ = file_max_size;
- file_num_ = file_num;
- level_ = level;
- output_type_ = output_type;
- enable_limit_ = enable_limit;
- base_path_ = base_path;
- log_name_ = logname;
- setUp();
- return true;
-}
-
-bool Logger::write(const char* format, ...)
-{
- char buf[8192];
- va_list ap;
- va_start(ap, format);
- vsnprintf(buf, sizeof(buf) - 1, format, ap);
- va_end(ap);
- return writeCharStream(buf);
-}
-
-bool Logger::writeCharStream(const char* log)
-{
- auto logger = log4cplus::Logger::getInstance(instance_);
- logger.forcedLog(log4cplus::TRACE_LOG_LEVEL, log);
- return true;
-}
+ // log config
+ debug_log_cfg gst_log_cfg;
-void Logger::setUp()
-{
- bool immediate_flush = true;
- std::string pattern = "[%D{%Y-%m-%d %H:%M:%S.%q}]%m%n";
- auto logger_d = log4cplus::Logger::getInstance(instance_);
- logger_d.removeAllAppenders();
- logger_d.setLogLevel(log4cplus::TRACE_LOG_LEVEL);
-
- if (output_type_ == 2)
- { //file
- log4cplus::SharedAppenderPtr fileAppender(new log4cplus::RollingFileAppender(
- base_path_ + log_name_, file_max_size_ * kMBSize, file_num_, immediate_flush, true));
- std::unique_ptr<log4cplus::Layout> layout(new log4cplus::PatternLayout(pattern));
- fileAppender->setLayout(std::move(layout));
- logger_d.addAppender(fileAppender);
+ // log info
+ debug_log_file gst_log_file;
+
+ // log run info
+ debug_log_run gst_log_run = {
+ //.log_type =
+ _LOG_PTY,
+ //.log_change_min =
+ 10,
+ };
+
+ pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+
+ void debug_show_logcfg()
+ {
+ debug_log_cfg *log_cfg = &gst_log_cfg;
+
+ LOG_ERROR("debug log config info. level(%d).", log_cfg->level);
+
+ return;
+ }
+
+ void debug_get_date(char *logTime)
+ {
+ time_t mytime = time(NULL);
+ struct tm curr;
+ localtime_r(&mytime, &curr);
+
+ if (curr.tm_year > 50)
+ sprintf(logTime, "%04d-%02d-%02d %02d:%02d:%02d",
+ curr.tm_year + 1900, curr.tm_mon + 1, curr.tm_mday,
+ curr.tm_hour, curr.tm_min, curr.tm_sec);
+ else
+ sprintf(logTime, "%04d-%02d-%02d %02d:%02d:%02d",
+ curr.tm_year + 2000, curr.tm_mon + 1, curr.tm_mday,
+ curr.tm_hour, curr.tm_min, curr.tm_sec);
+
+ return;
+ }
+
+ // update log file
+ int32_t debug_shift_file(int32_t loglevel)
+ {
+ int32_t i = 0;
+ int32_t ret = 0;
+ char file_name[NAME_LEN];
+ char new_name[NAME_LEN];
+ struct stat file_stat;
+ debug_log_file *log_file = &gst_log_file;
+
+ memset(&file_name[0], 0x0, NAME_LEN);
+ snprintf(&file_name[0], NAME_LEN - 1, "%s", &log_file->file_list[loglevel][0]);
+
+ // get log file stat
+ ret = stat(file_name, &file_stat);
+ if (ret < 0)
+ {
+ printf("get log file:%s stat err.", file_name);
+ return 1;
+ }
+
+ switch (log_file->shift_type)
+ {
+ case _LOG_SHIFT_SIZE:
+ if (file_stat.st_size < log_file->max_size)
+ {
+ return 0;
+ }
+ break;
+
+ case _LOG_SHIFT_COUNT:
+ default:
+ if (log_file->log_count < log_file->max_count)
+ {
+ return 0;
+ }
+ log_file->log_count = 0;
+ break;
+ }
+
+ // create new log file
+ for (i = log_file->max_lognum; i >= 0; i--)
+ {
+ if (i == 0)
+ {
+ snprintf(&file_name[0], NAME_LEN - 1, "%s",
+ &log_file->file_list[loglevel][0]);
+ }
+ else
+ {
+ snprintf(&file_name[0], NAME_LEN - 1, "%s.%d",
+ &log_file->file_list[loglevel][0], i);
+ }
+
+ if (0 == access(file_name, F_OK))
+ {
+ snprintf(&new_name[0], NAME_LEN - 1, "%s.%d",
+ &log_file->file_list[loglevel][0], i + 1);
+ if (rename(file_name, new_name) < 0)
+ {
+ printf("log file rename err(%d).", errno);
+ return 1;
+ }
+ }
+ }
+
+ return 0;
+ }
+
+ // init log file
+ int32_t debug_init_logfile(debug_log_file *log_file, debug_log_cfg *log_cfg)
+ {
+ const char *log_pre[_LOG_MAX_FILE] = {"error", "warn", "info", "debug", "trace", "stat"};
+
+ memset(log_file, 0x0, sizeof(debug_log_file));
+
+ for (int i = 0; i < _LOG_MAX_FILE; i++)
+ {
+ snprintf(&(log_file->file_list[i][0]), NAME_LEN - 1, "%ssdk_cpp_%s.log",
+ &log_cfg->path[0], log_pre[i]);
+ //printf("log will write to file:%s.\n", &log_file->file_list[i][0]);
+ }
+
+ log_file->shift_type = log_cfg->shift_type;
+ log_file->max_lognum = log_cfg->num;
+ log_file->max_size = log_cfg->size * 1024 * 1024;
+ log_file->max_count = log_cfg->size * 1024 * 1024;
+ log_file->log_count = log_cfg->size * 1024 * 1024;
+
+ return 0;
}
- else
- { //console
- log4cplus::SharedAppenderPtr consoleAppender(new log4cplus::ConsoleAppender());
- consoleAppender->setLayout(std::unique_ptr<log4cplus::Layout>(new log4cplus::SimpleLayout()));
- logger_d.addAppender(consoleAppender);
+
+ static time_t last_write_log_time[_LOG_MAX_FILE] = {0};
+ static int log_limit_cnt[_LOG_MAX_FILE] = {0};
+
+ int32_t log_print(int32_t loglevel, int32_t log_time, char *format, ...)
+ {
+ va_list ap;
+ char date[50];
+ FILE *file_id;
+ struct timeval tm;
+ debug_log_file *log_file = &gst_log_file;
+
+ if ((loglevel > (int32_t)SDK_CFG_LOG_LEVEL) && (loglevel != _LOG_STAT))
+ {
+ return 0;
+ }
+
+ #if 1
+ time_t now = time(NULL);
+ if (loglevel != _LOG_STAT && now - last_write_log_time[loglevel] < 10)
+ {
+ if (log_limit_cnt[loglevel] > 200)
+ {
+ return 0;
+ }
+
+ log_limit_cnt[loglevel]++;
+ }
+ else
+ {
+ last_write_log_time[loglevel] = now;
+ log_limit_cnt[loglevel] = 0;
+ }
+ #endif
+
+ debug_get_date(date);
+ pthread_mutex_lock(&log_mutex);
+ // open log file
+ file_id = fopen(&(log_file->file_list[loglevel][0]), "a+");
+ if (NULL == file_id)
+ {
+ printf("open file:%s err, errno:%d.\n", &(log_file->file_list[loglevel][0]), errno);
+ pthread_mutex_unlock(&log_mutex);
+ return 1;
+ }
+
+ va_start(ap, format);
+ if (1 == log_time)
+ {
+ fprintf(file_id, "[%s]", date);
+ }
+ else
+ {
+ gettimeofday(&tm, NULL);
+ fprintf(file_id, "[%s.%.6d]", date,
+ (int32_t)tm.tv_usec);
+ }
+
+ vfprintf(file_id, format, ap);
+ fprintf(file_id, "\n");
+ va_end(ap);
+ log_file->log_count++;
+ fflush(file_id);
+ fclose(file_id);
+ debug_shift_file(loglevel);
+ pthread_mutex_unlock(&log_mutex);
+
+ return 0;
+ }
+
+ static int create_multi_dir(const char *path)
+ {
+ int i, len;
+
+ len = strlen(path);
+ char dir_path[len + 1];
+ dir_path[len] = '\0';
+
+ strncpy(dir_path, path, len);
+
+ for (i = 0; i < len; i++)
+ {
+ if (dir_path[i] == '/' && i > 0)
+ {
+ dir_path[i] = '\0';
+ if (access(dir_path, F_OK) < 0)
+ {
+ if (mkdir(dir_path, 0755) < 0)
+ {
+ printf("mkdir=%s:msg=%s\n", dir_path, strerror(errno));
+ return -1;
+ }
+ }
+ dir_path[i] = '/';
+ }
+ }
+
+ return 0;
}
-}
+
+ int32_t check_path(const char *path)
+ {
+ struct stat st_stat = {0};
+ int ret = stat(path, &st_stat);
+
+ if (ret && errno != ENOENT)
+ {
+ fprintf(stderr, "Check directory error: %s\n", strerror(errno));
+ return 1;
+ }
+
+ if ((ret && errno == ENOENT) || (!ret && !S_ISDIR(st_stat.st_mode)))
+ {
+ // create dir, rwxr-xr-x
+ if (mkdir(path, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH))
+ {
+ fprintf(stderr, "Crate directory error: %s\n", strerror(errno));
+
+ return 1;
+ }
+ }
+
+ return 0;
+ }
+
+ int32_t debug_init_log()
+ {
+ int32_t path_len = strlen(SDK_CFG_LOG_PATH);
+ debug_log_cfg *log_cfg = &gst_log_cfg;
+ debug_log_file *log_file = &gst_log_file;
+ debug_log_run *log_run = &gst_log_run;
+
+ log_cfg->level = SDK_CFG_LOG_LEVEL;
+ log_cfg->num = SDK_CFG_LOG_NUM;
+ log_cfg->size = SDK_CFG_LOG_SIZE;
+ log_cfg->shift_type = _LOG_SHIFT_SIZE;
+ log_cfg->file_type = 2;
+
+ if (SDK_CFG_LOG_PATH[path_len] != '/')
+ {
+ snprintf(&log_cfg->path[0], NAME_LEN - 1,
+ "%s%s", SDK_CFG_LOG_PATH, "/");
+ }
+ else
+ {
+ snprintf(&log_cfg->path[0], NAME_LEN - 1,
+ "%s", SDK_CFG_LOG_PATH);
+ }
+
+ create_multi_dir(SDK_CFG_LOG_PATH);
+
+ check_path(SDK_CFG_LOG_PATH);
+
+ debug_init_logfile(log_file, log_cfg);
+
+ log_run->log_pid = getpid();
+ log_run->log_type = log_cfg->file_type;
+ log_run->log_level = log_cfg->level;
+
+ return 0;
+ }
+
} // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h
index b2bd25783..0aa10a406 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h
@@ -24,104 +24,220 @@
#include <string.h>
#include <string>
#include <unistd.h>
+#include <sys/types.h>
#include <vector>
#include "sdk_constant.h"
+#include "sdk_core.h"
namespace dataproxy_sdk
{
-static const uint32_t kMBSize = 1024 * 1024;
-const uint32_t kPid = getpid();
+ #ifndef SDK_CFG_LOG_LEVEL
+ #define SDK_CFG_LOG_LEVEL g_config.log_level_
+ #endif
-class Logger;
+ #ifndef SDK_CFG_LOG_NUM
+ #define SDK_CFG_LOG_NUM g_config.log_num_
+ #endif
-Logger& getLogger();
+ #ifndef SDK_CFG_LOG_SIZE
+ #define SDK_CFG_LOG_SIZE g_config.log_size_
+ #endif
-// only show fileName
-#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
+ #ifndef SDK_CFG_LOG_PATH
+ #define SDK_CFG_LOG_PATH g_config.log_path_.c_str()
+ #endif
-#define LOG_LEVEL(level, fmt, ...) \
- { \
- if (dataproxy_sdk::getLogger().enableLevel(level)) \
- { \
- dataproxy_sdk::getLogger().write("[pid:%d][%s][%s:%s:%d]" fmt, kPid, dataproxy_sdk::Logger::level2String(level), \
- __FILENAME__, __func__, __LINE__, ##__VA_ARGS__); \
- } \
- }
+ #define NAME_LEN 512
-#define LOG_TRACE(fmt, ...) LOG_SDKCPP(dataproxy_sdk::getLogger(), dataproxy_sdk::Logger::kLogTrace, fmt, ##__VA_ARGS__)
-#define LOG_DEBUG(fmt, ...) LOG_SDKCPP(dataproxy_sdk::getLogger(), dataproxy_sdk::Logger::kLogDebug, fmt, ##__VA_ARGS__)
-#define LOG_INFO(fmt, ...) LOG_SDKCPP(dataproxy_sdk::getLogger(), dataproxy_sdk::Logger::kLogInfo, fmt, ##__VA_ARGS__)
-#define LOG_WARN(fmt, ...) LOG_SDKCPP(dataproxy_sdk::getLogger(), dataproxy_sdk::Logger::kLogWarn, fmt, ##__VA_ARGS__)
-#define LOG_ERROR(fmt, ...) LOG_SDKCPP(dataproxy_sdk::getLogger(), dataproxy_sdk::Logger::kLogError, fmt, ##__VA_ARGS__)
-
-#define LOG_SDKCPP(logger, level, fmt, ...) \
- { \
- if (logger.enableLevel(level)) \
- { \
- logger.write("[pid:%d][%s][%s:%s:%d]" fmt, kPid, dataproxy_sdk::Logger::level2String(level), __FILENAME__, __func__, \
- __LINE__, ##__VA_ARGS__); \
- } \
- }
+ // only show fileName
+ #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
-class Logger
-{
- public:
- enum Level {
- kLogError = 0,
- kLogWarn = 1,
- kLogInfo = 2,
- kLogDebug = 3,
- kLogTrace = 4,
+ enum _LOG_LEVEL_
+ {
+ _LOG_ERROR = 0x00,
+ _LOG_WARN,
+ _LOG_INFO,
+ _LOG_DEBUG,
+ _LOG_TRACE,
+ _LOG_STAT,
+ _LOG_MAX_FILE
};
- private:
- uint32_t file_max_size_;
- uint32_t file_num_;
- uint8_t level_;
- uint8_t output_type_; //2->file, 1->console
- bool enable_limit_;
- std::string base_path_;
- std::string log_name_;
-
- std::string instance_;
-
- public:
- Logger()
- : file_max_size_(10)
- , file_num_(10)
- , level_(kLogInfo)
- , output_type_(2)
- , enable_limit_(true)
- , base_path_("./logs/")
- , instance_("DataProxySDK")
+ enum _LOG_TYPE_
{
- }
+ _LOG_PTY = 0x01,
+ _LOG_FILE = 0x02,
+ };
- ~Logger() {}
-
- bool init(uint32_t file_max_size,
- uint32_t file_num,
- uint8_t level,
- uint8_t output_type,
- bool enable_limit,
- const std::string& base_path,
- const std::string& logname = constants::kLogName);
- bool write(const char* sFormat, ...) __attribute__((format(printf, 2, 3)));
- inline bool writeStream(const std::string& msg) { return writeCharStream(msg.c_str()); }
- inline bool enableLevel(Level level) { return ((level <= level_) ? true : false); }
- static const char* level2String(Level level)
+ enum _LOG_SHIFT_TYPE_
{
- static const char* level_names[] = {
- "ERROR", "WARN", "INFO", "DEBUG", "TRACE",
- };
- return level_names[level];
- }
+ _LOG_SHIFT_SIZE = 0, /*shift by size*/
+ _LOG_SHIFT_COUNT = 1, /*shift by log count*/
+ };
+
+ /* CFG debug info */
+ enum SDK_LOG_DEBUG_EN {
+ SDK_LOG_CHANGE_LEVEL,
+ };
- private:
- void setUp();
- bool writeCharStream(const char* msg);
-};
+ typedef struct tag_debug_log_cfg
+ {
+ int32_t level;
+ int32_t num;
+ int32_t size;
+ int32_t shift_type;
+ int32_t file_type;
+ char path[NAME_LEN];
+ } debug_log_cfg;
+
+ typedef struct tag_debug_log_file {
+ char file_list[_LOG_MAX_FILE][NAME_LEN];
+ int32_t shift_type;
+ int32_t max_lognum;
+ int32_t max_size;
+ int32_t max_count;
+ int32_t log_count;
+ } debug_log_file;
+
+ typedef struct tag_debug_log_run {
+ pid_t log_pid;
+ uint8_t log_type;
+ uint8_t log_level;
+ int32_t log_change_min;
+ } debug_log_run;
+
+ #define _log_pid_ gst_log_run.log_pid
+ #define _log_type_ gst_log_run.log_type
+ #define _log_level_ gst_log_run.log_level
+
+
+ #define LOG_STAT(fmt, args...) \
+ do{ \
+ char log_time[50] = {0}; \
+ if (_log_type_ & _LOG_FILE) \
+ { \
+ log_print(_LOG_STAT, 1, (char *)"PID[%d]STAT:%s:%.3d<%s>: "fmt,\
+ _log_pid_, __FILENAME__, __LINE__, __FUNCTION__, ##args); \
+ } \
+ if (_log_type_ & _LOG_PTY) \
+ { \
+ debug_get_date(log_time); \
+ fprintf(stderr, (char *)"[%s]PID[%d]STAT:%s:%.3d<%s>: "fmt"\n",\
+ log_time, _log_pid_, __FILENAME__, __LINE__, \
+ __FUNCTION__, ##args); \
+ } \
+ }while(0)
+
+ #define LOG_ERROR(fmt, args...) \
+ do{ \
+ char log_time[50] = {0}; \
+ if (_log_type_ & _LOG_FILE) \
+ { \
+ log_print(_LOG_ERROR, 1, (char *)"PID[%d]ERR:%s:%.3d<%s>: "fmt,\
+ _log_pid_, __FILENAME__, __LINE__, __FUNCTION__, ##args); \
+ } \
+ if (_log_type_ & _LOG_PTY) \
+ { \
+ debug_get_date(log_time); \
+ fprintf(stderr, (char *)"[%s]PID[%d]ERR:%s:%.3d<%s>: "fmt"\n", \
+ log_time, _log_pid_, __FILENAME__, __LINE__, \
+ __FUNCTION__, ##args); \
+ } \
+ }while(0)
+
+ #define LOG_WARN(fmt, args...) \
+ do{ \
+ char log_time[50] = {0}; \
+ if (_log_level_ < _LOG_WARN) \
+ {break;} \
+ if (_log_type_ & _LOG_FILE) \
+ { \
+ log_print(_LOG_WARN, 1, (char *)"PID[%d]WARN:%s:%.3d<%s>: "fmt,\
+ _log_pid_, __FILENAME__, __LINE__, __FUNCTION__, ##args); \
+ } \
+ if (_log_type_ & _LOG_PTY) \
+ { \
+ debug_get_date(log_time); \
+ fprintf(stderr, (char *)"[%s]PID[%d]WARN:%s:%.3d<%s>: "fmt"\n",\
+ log_time, _log_pid_, __FILENAME__, __LINE__, \
+ __FUNCTION__, ##args); \
+ } \
+ }while(0)
+
+ #define LOG_INFO(fmt, args...) \
+ do{ \
+ char log_time[50] = {0}; \
+ if (_log_level_ < _LOG_INFO) \
+ {break;} \
+ if (_log_type_ & _LOG_FILE) \
+ { \
+ log_print(_LOG_INFO, 1, (char *)"PID[%d]INFO:%s:%.3d<%s>: "fmt,\
+ _log_pid_, __FILENAME__, __LINE__, __FUNCTION__, ##args); \
+ } \
+ if (_log_type_ & _LOG_PTY) \
+ { \
+ debug_get_date(log_time); \
+ fprintf(stderr, (char *)"[%s]PID[%d]INFO:%s:%.3d<%s>: "fmt"\n",\
+ log_time, _log_pid_, __FILENAME__, __LINE__, \
+ __FUNCTION__, ##args); \
+ } \
+ }while(0)
+
+ #define LOG_DEBUG(fmt, args...) \
+ do{ \
+ char log_time[50] = {0}; \
+ if (_log_level_ < _LOG_DEBUG) \
+ {break;} \
+ if (_log_type_ & _LOG_FILE) \
+ { \
+ log_print(_LOG_DEBUG, 1, (char *)"PID[%d]DEBUG:%s:%.3d<%s>: "fmt,\
+ _log_pid_, __FILENAME__, __LINE__, __FUNCTION__, ##args); \
+ } \
+ if (_log_type_ & _LOG_PTY) \
+ { \
+ debug_get_date(log_time); \
+ fprintf(stderr, (char *)"[%s]PID[%d]DEBUG:%s:%.3d<%s>: "fmt"\n",\
+ log_time, _log_pid_, __FILENAME__, __LINE__, \
+ __FUNCTION__, ##args); \
+ } \
+ }while(0)
+
+ #define LOG_TRACE(fmt, args...) \
+ do{ \
+ char log_time[50] = {0}; \
+ if (_log_level_ < _LOG_TRACE) \
+ {break;} \
+ if (_log_type_ & _LOG_FILE) \
+ { \
+ log_print(_LOG_TRACE, 1, (char *)"PID[%d]TRACE:%s:%.3d<%s>: "fmt, \
+ _log_pid_, __FILENAME__, __LINE__, __FUNCTION__, ##args); \
+ } \
+ if (_log_type_ & _LOG_PTY) \
+ { \
+ debug_get_date(log_time); \
+ fprintf(stderr, (char *)"[%s]PID[%d]TRACE:%s:%.3d<%s>: "fmt"\n",\
+ log_time, _log_pid_, __FILENAME__, __LINE__, \
+ __FUNCTION__, ##args); \
+ } \
+ }while(0)
+
+ #ifdef __cplusplus
+ extern "C" {
+ #endif
+
+ extern debug_log_run gst_log_run;
+ extern debug_log_cfg gst_log_cfg;
+ extern debug_log_file gst_log_file;
+ extern void debug_get_date(char *logTime);
+
+ int32_t log_print(int32_t log_level, int32_t log_time, char *format, ...);
+ int32_t debug_init_log();
+ void tc_log_level_restore(void);
+
+ #ifdef __cplusplus
+ }
+ #endif
} // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc
index 09d70282a..9a575f1f6 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc
@@ -36,7 +36,7 @@
namespace dataproxy_sdk
{
PackQueue::PackQueue(const std::string &inlong_group_id, const std::string &inlong_stream_id)
- : cur_len_(0), inlong_group_id_(inlong_group_id), inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0), msg_type_(g_config->msg_type_), data_capacity_(g_config->buf_size_)
+ : cur_len_(0), inlong_group_id_(inlong_group_id), inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0), msg_type_(g_config.msg_type_), data_capacity_(g_config.buf_size_)
{
data_ = new char[data_capacity_];
memset(data_, 0x0, data_capacity_);
@@ -84,7 +84,7 @@ namespace dataproxy_sdk
}
//if uneable_pack, single msg is written to packqueue and directly sent to buf
- if (!g_config->enable_pack_)
+ if (!g_config.enable_pack_)
{
int32_t res = writeToBuf();
if (res)
@@ -173,9 +173,9 @@ namespace dataproxy_sdk
int32_t PackQueue::appendMsg(const std::string &msg, std::string client_ip, int64_t report_time, UserCallBack call_back)
{
//too long msg
- if (msg.size() > g_config->ext_pack_size_)
+ if (msg.size() > g_config.ext_pack_size_)
{
- LOG_ERROR("msg len (%d) more than ext_pack_size (%d)", msg.size(), g_config->ext_pack_size_);
+ LOG_ERROR("msg len (%d) more than ext_pack_size (%d)", msg.size(), g_config.ext_pack_size_);
return SDKInvalidResult::kMsgTooLong;
}
@@ -203,11 +203,11 @@ namespace dataproxy_sdk
cur_len_ += msg.size() + 1; // '\n' using one byte
- if (g_config->isNormalDataPackFormat())
+ if (g_config.isNormalDataPackFormat())
{
cur_len_ += 4;
}
- if (g_config->isAttrDataPackFormat())
+ if (g_config.isAttrDataPackFormat())
{
cur_len_ += constants::kAttrLen + 8;
}
@@ -241,7 +241,7 @@ namespace dataproxy_sdk
{
time_trigger = true;
}
- if (msg_len + cur_len_ > g_config->pack_size_)
+ if (msg_len + cur_len_ > g_config.pack_size_)
{
len_trigger = true;
}
@@ -278,7 +278,7 @@ namespace dataproxy_sdk
idx += static_cast<uint32_t>(it->msg.size());
//add attrlen|attr
- if (g_config->isAttrDataPackFormat())
+ if (g_config.isAttrDataPackFormat())
{
*(uint32_t *)(&data_[idx]) = htonl(it->data_pack_format_attr.size());
idx += sizeof(uint32_t);
@@ -333,7 +333,7 @@ namespace dataproxy_sdk
uint32_t char_groupid_flag = 0;
std::string groupid_streamid_char;
uint16_t groupid_num = 0, streamid_num = 0;
- if (g_config->enableCharGroupid() || groupId_num_ == 0 || streamId_num_ == 0) //using string groupid and streamid
+ if (g_config.enableCharGroupid() || groupId_num_ == 0 || streamId_num_ == 0) //using string groupid and streamid
{
groupid_num = 0;
streamid_num = 0;
@@ -345,17 +345,17 @@ namespace dataproxy_sdk
groupid_num = groupId_num_;
streamid_num = streamId_num_;
}
- uint16_t ext_field = (g_config->extend_field_ | char_groupid_flag);
+ uint16_t ext_field = (g_config.extend_field_ | char_groupid_flag);
uint32_t data_time = data_time_ / 1000;
// attr
std::string attr;
- if (g_config->enableTraceIP())
+ if (g_config.enableTraceIP())
{
if (groupid_streamid_char.empty())
- attr = "node1ip=" + g_config->ser_ip_ + "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
+ attr = "node1ip=" + g_config.ser_ip_ + "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
else
- attr = groupid_streamid_char + "&node1ip=" + g_config->ser_ip_ + "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
+ attr = groupid_streamid_char + "&node1ip=" + g_config.ser_ip_ + "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
}
else
{
@@ -427,9 +427,9 @@ namespace dataproxy_sdk
attr += "&cp=snappy";
attr += "&cnt=" + std::to_string(cnt);
attr += "&sid=" + std::string(Utils::getSnowflakeId());
- if (g_config->is_from_DC_)
+ if (g_config.is_from_DC_)
{ //&__addcol1_reptime=yyyymmddHHMMSS&__addcol2__ip=BBB&f=dc
- attr += "&__addcol1_reptime=" + Utils::getFormatTime(Utils::getCurrentMsTime()) + "&__addcol2__ip=" + g_config->ser_ip_ + "&f=dc";
+ attr += "&__addcol1_reptime=" + Utils::getFormatTime(Utils::getCurrentMsTime()) + "&__addcol2__ip=" + g_config.ser_ip_ + "&f=dc";
}
// attrlen
@@ -459,7 +459,7 @@ namespace dataproxy_sdk
*/
bool PackQueue::isZipAndOperate(std::string &res, uint32_t real_cur_len)
{
- if (g_config->enable_zip_ && real_cur_len > g_config->min_zip_len_)
+ if (g_config.enable_zip_ && real_cur_len > g_config.min_zip_len_)
{
LOG_TRACE("start snappy.");
Utils::zipData(data_, real_cur_len, res);
@@ -474,7 +474,7 @@ namespace dataproxy_sdk
if (cur_len_ == 0 || msg_set_.empty())
return;
//no timeout, and it isn't last packing
- if (Utils::getCurrentMsTime() - first_use_ < g_config->pack_timeout_ && !isLastPack)// FIXME:should use first_use instead of last_use?
+ if (Utils::getCurrentMsTime() - first_use_ < g_config.pack_timeout_ && !isLastPack)// FIXME:should use first_use instead of last_use?
return;
LOG_TRACE("start auto pack, inlong_group_id:%s, inlong_stream_id:%s", inlong_group_id_.c_str(), inlong_stream_id_.c_str());
@@ -544,7 +544,7 @@ namespace dataproxy_sdk
return;
for (auto &it : queues_)
{
- LOG_WARN("-------> dataproxy_sdk_cpp #local:%s#%s#success send msg:%d", g_config->ser_ip_.c_str(), it.second->topicDesc().c_str(),
+ LOG_STAT("dataproxy_sdk_cpp #local:%s#%s#success send msg:%d", g_config.ser_ip_.c_str(), it.second->topicDesc().c_str(),
it.second->success_num_.getAndSet(0));
}
}
@@ -555,7 +555,7 @@ namespace dataproxy_sdk
return;
for (auto &it : queues_)
{
- LOG_WARN("-------> dataproxy_sdk_cpp #local:%s#%s#total success msg:%d", g_config->ser_ip_.c_str(), it.second->topicDesc().c_str(),
+ LOG_STAT("dataproxy_sdk_cpp #local:%s#%s#total success msg:%d", g_config.ser_ip_.c_str(), it.second->topicDesc().c_str(),
it.second->total_success_num_.get());
}
}
@@ -569,9 +569,9 @@ namespace dataproxy_sdk
{
uint32_t pack = it.second->pack_num_.getAndSet(0);
total_pack += pack;
- LOG_DEBUG("------->toipc:%s, pack_num:%d", it.second->topicDesc().c_str(), pack);
+ LOG_DEBUG("toipc:%s, pack_num:%d", it.second->topicDesc().c_str(), pack);
}
- LOG_DEBUG("------->total_pack:%d", total_pack);
+ LOG_DEBUG("total_pack:%d", total_pack);
g_pools->showState();
g_executors->showState();
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc
index 065dd0e3b..7e6dcdf1e 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc
@@ -48,7 +48,7 @@ namespace dataproxy_sdk
bool ClusterProxyList::isNeedLoadBalance()
{
// #if 0
- if (!g_config->enable_heart_beat_ || !g_config->heart_beat_interval_)
+ if (!g_config.enable_heart_beat_ || !g_config.heart_beat_interval_)
{
return false;
}
@@ -403,10 +403,10 @@ namespace dataproxy_sdk
{ clearInvalidConn(ec); });
// whether need do balance
- if (g_config->enable_heart_beat_ && g_config->heart_beat_interval_ > 0)
+ if (g_config.enable_heart_beat_ && g_config.heart_beat_interval_ > 0)
{
keepAlive_timer_ = timer_worker_->createSteadyTimer();
- keepAlive_timer_->expires_after(std::chrono::seconds(g_config->heart_beat_interval_));
+ keepAlive_timer_->expires_after(std::chrono::seconds(g_config.heart_beat_interval_));
keepAlive_timer_->async_wait([this](const std::error_code &ec)
{ keepConnAlive(ec); });
}
@@ -449,7 +449,7 @@ namespace dataproxy_sdk
int32_t GlobalCluster::initBuslistAndCreateConns()
{
- for (auto &inlong_group_id : g_config->inlong_group_ids_)
+ for (auto &inlong_group_id : g_config.inlong_group_ids_)
{
groupid2cluster_map_[inlong_group_id] = -1;
}
@@ -518,7 +518,7 @@ namespace dataproxy_sdk
while (true)
{
std::unique_lock<std::mutex> con_lck(cond_mutex_);
- if (cond_.wait_for(con_lck, std::chrono::minutes(g_config->proxy_update_interval_), [this]()
+ if (cond_.wait_for(con_lck, std::chrono::minutes(g_config.proxy_update_interval_), [this]()
{ return update_flag_; }))
{
if (exit_flag_)
@@ -530,7 +530,7 @@ namespace dataproxy_sdk
}
else
{
- LOG_INFO("proxy update interval is %d mins, update proxylist", g_config->proxy_update_interval_);
+ LOG_INFO("proxy update interval is %d mins, update proxylist", g_config.proxy_update_interval_);
doUpdate();
}
}
@@ -584,13 +584,13 @@ namespace dataproxy_sdk
{
//拼接tdm请求的url
std::string url;
- if (g_config->enable_proxy_URL_from_cluster_)
- url = g_config->proxy_cluster_URL_;
+ if (g_config.enable_proxy_URL_from_cluster_)
+ url = g_config.proxy_cluster_URL_;
else
{
- url = g_config->proxy_URL_ + "/" + groupid2cluster.first;
+ url = g_config.proxy_URL_ + "/" + groupid2cluster.first;
}
- std::string post_data = "ip=" + g_config->ser_ip_ + "&version=" + constants::kTDBusCAPIVersion;
+ std::string post_data = "ip=" + g_config.ser_ip_ + "&version=" + constants::kTDBusCAPIVersion;
LOG_WARN("get inlong_group_id:%s proxy cfg url:%s, post_data:%s", groupid2cluster.first.c_str(), url.c_str(), post_data.c_str());
// request proxylist from mananer, if failed multi-times, read from local cache file
@@ -598,7 +598,7 @@ namespace dataproxy_sdk
int32_t ret;
for (int i = 0; i < constants::kMaxRequestTDMTimes; i++)
{
- HttpRequest request = {url, g_config->proxy_URL_timeout_, g_config->need_auth_, g_config->auth_id_, g_config->auth_key_, post_data};
+ HttpRequest request = {url, g_config.proxy_URL_timeout_, g_config.need_auth_, g_config.auth_id_, g_config.auth_key_, post_data};
ret = Utils::requestUrl(meta_data, &request);
if (!ret)
{
@@ -882,7 +882,7 @@ namespace dataproxy_sdk
proxylist_config->initUnusedBus();
//set active_proxy_num and backup_proxy_num
- proxylist_config->setActiveAndBackupBusNum(g_config->max_active_proxy_num_);
+ proxylist_config->setActiveAndBackupBusNum(g_config.max_active_proxy_num_);
return 0;
}
@@ -930,7 +930,7 @@ namespace dataproxy_sdk
}
}
- keepAlive_timer_->expires_after(std::chrono::seconds(g_config->heart_beat_interval_));
+ keepAlive_timer_->expires_after(std::chrono::seconds(g_config.heart_beat_interval_));
keepAlive_timer_->async_wait([this](const std::error_code &ec)
{ keepConnAlive(ec); });
}
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h
index 9b48ded52..b95c916ea 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h
@@ -35,8 +35,8 @@ namespace dataproxy_sdk
static const int32_t kMaxRequestTDMTimes = 4;
static const int32_t kMaxRetryConnection = 20; //create conn failed more than 20 times, start sendbuf callback
- static const std::string kAttrFormat = "__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx"; // msg_type 7 body's attr format
- static const int32_t kAttrLen = kAttrFormat.size();
+ static const char kAttrFormat[] = "__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx"; // msg_type 7 body's attr format
+ static const int32_t kAttrLen = strlen(kAttrFormat);
static const char kTDBusCAPIVersion[] = "dataproxy_sdk_cpp-v2.0.0";
static const char kLogName[] = "dataproxy_cpp.log";
@@ -61,20 +61,20 @@ namespace dataproxy_sdk
static const uint32_t kRetryNum = 3;
static const uint32_t kLogNum = 10;
static const uint32_t kLogSize = 10;
- static const uint8_t kLogLevel = 1;
+ static const uint8_t kLogLevel = 2;
static const uint8_t kLogFileType = 2;
- static const std::string kLogPath = "./logs/";
+ static const char kLogPath[] = "./sdklogs/";
static const bool kLogEnableLimit = true;
- static const std::string kProxyURL = "http://127.0.0.1:8099/api/dataproxy_ip_v2";
+ static const char kProxyURL[] = "http://127.0.0.1:8099/inlong/manager/openapi/dataproxy/getIpList";
static const bool kEnableProxyURLFromCluster = false;
- static const std::string kProxyClusterURL =
+ static const char kProxyClusterURL[] =
"http://127.0.0.1:8099/heartbeat/dataproxy_ip_v2?cluster_id=0&net_tag=normal";
static const uint32_t kProxyUpdateInterval = 10;
static const uint32_t kProxyURLTimeout = 2;
static const uint32_t kMaxActiveProxyNum = 3;
- static const std::string kSerIP = "127.0.0.1";
+ static const char kSerIP[] = "127.0.0.1";
static const uint32_t kMaxBufPool = 50 * 1024 * 1024;
static const uint32_t kMsgType = 7;
@@ -85,15 +85,15 @@ namespace dataproxy_sdk
static const uint32_t kMaskCPUAffinity = 0xff;
static const bool kIsFromDC = false;
static const uint16_t kExtendField = 0;
- static const std::string kNetTag = "all";
+ static const char kNetTag[] = "all";
static const bool kNeedAuth = false;
// http basic auth
- static const std::string kBasicAuthHeader = "Authorization:";
- static const std::string kBasicAuthPrefix = "Basic";
- static const std::string kBasicAuthSeparator = " ";
- static const std::string kBasicAuthJoiner = ":";
+ static const char kBasicAuthHeader[] = "Authorization:";
+ static const char kBasicAuthPrefix[] = "Basic";
+ static const char kBasicAuthSeparator[] = " ";
+ static const char kBasicAuthJoiner[] = ":";
} // namespace constants
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc
index 3e3a03188..a29ebbdd4 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc
@@ -44,31 +44,29 @@ namespace dataproxy_sdk
AtomicInt user_exit_flag{0};
AtomicUInt g_send_msgid{0}; // msg uuid
AtomicInt g_buf_full{0}; // used for buf full log limit
- ClientConfig *g_config = nullptr;
+ ClientConfig g_config;
GlobalCluster *g_clusters = nullptr;
GlobalQueues *g_queues = nullptr;
TotalPools *g_pools = nullptr;
ExecutorThreadPool *g_executors = nullptr;
- int32_t g_use_default = 0; // whether use default config to init sdk if user's config file error
int32_t init_helper ()
{
- getLogger().init(g_config->log_size_, g_config->log_num_, Logger::Level(g_config->log_level_), g_config->log_file_type_,
- g_config->log_enable_limit_, g_config->log_path_);
-
+ debug_init_log();
+
LOG_WARN("dataproxy_sdk_cpp start init, version:%s", constants::kTDBusCAPIVersion);
- g_config->showClientConfig();
+ g_config.showClientConfig();
// get local ip
- if (!Utils::getFirstIpAddr(g_config->ser_ip_))
+ if (!Utils::getFirstIpAddr(g_config.ser_ip_))
{
- LOG_WARN("not found the localHost in local OS, use user's ser_ip(%s) in config", g_config->ser_ip_.c_str());
+ LOG_WARN("not found the localHost in local OS, use user's ser_ip(%s) in config", g_config.ser_ip_.c_str());
}
- if (g_config->enable_setaffinity_)
+ if (g_config.enable_setaffinity_)
{
- Utils::bindCPU(g_config->mask_cpu_affinity_);
+ Utils::bindCPU(g_config.mask_cpu_affinity_);
}
signal(SIGPIPE, SIG_IGN);
@@ -108,7 +106,7 @@ namespace dataproxy_sdk
LOG_INFO("read cache proxylist for disaster tolerance");
g_clusters->readCacheBuslist();
- if (!g_config->inlong_group_ids_.empty())
+ if (!g_config.inlong_group_ids_.empty())
{
int32_t ret = g_clusters->initBuslistAndCreateConns();
// FIXME: improve, return ret to user?
@@ -128,7 +126,6 @@ namespace dataproxy_sdk
int32_t tc_api_init(const char *config_file)
{
- getLogger().init(5, 15, Logger::Level(3), 2, true, "./", ".cpplog");
// one process is only initialized once
if (!init_flag.compareAndSwap(0, 1))
@@ -138,61 +135,38 @@ namespace dataproxy_sdk
}
user_exit_flag.getAndSet(0);
- g_config = new ClientConfig(config_file);
- if (!g_config){
- LOG_ERROR("dataproxy_sdk_cpp init error");
- return SDKInvalidResult::kErrorInit;
+ if (!g_config.parseConfig(config_file)){
+ LOG_ERROR("dataproxy_sdk_cpp init error, something configs use default value");
}
- bool res = g_config->parseConfig();
-
- if (!res){
- // init error and not allow default init
- if (g_use_default)
- {
- g_config->defaultInit();
- }
- else
- {
- LOG_ERROR("dataproxy_sdk_cpp init error");
- return SDKInvalidResult::kErrorInit;
- }
- }
-
- remove("./.cpplog");
return init_helper();
-
}
- int32_t tc_api_init(ClientConfig* client_config)
+ int32_t tc_api_init(ClientConfig& client_config)
{
if (!init_flag.compareAndSwap(0, 1))
{
return SDKInvalidResult::kMultiInit;
}
-
- if (!client_config)
- {
- return SDKInvalidResult::kErrorInit;
-
- }
// check and proxy url
- if (client_config->proxy_URL_.empty())
+ if (client_config.proxy_URL_.empty())
{
+ init_flag.compareAndSwap(1, 0);
return SDKInvalidResult::kErrorInit;
}
// check auth setting
- if (client_config->need_auth_ && (client_config->auth_id_.empty() || client_config->auth_key_.empty()))
+ if (client_config.need_auth_ && (client_config.auth_id_.empty() || client_config.auth_key_.empty()))
{
+ init_flag.compareAndSwap(1, 0);
return SDKInvalidResult::kErrorAuthInfo;
}
g_config = client_config;
- g_config->updateBufSize();
+ g_config.updateBufSize();
return init_helper();
@@ -200,8 +174,25 @@ namespace dataproxy_sdk
int32_t tc_api_init_ext(const char *config_file, int32_t use_def)
{
- g_use_default = use_def;
- return tc_api_init(config_file);
+ if (!init_flag.compareAndSwap(0, 1))
+ {
+ LOG_ERROR("dataproxy_sdk_cpp has been initialized before!");
+ return SDKInvalidResult::kMultiInit;
+ }
+ user_exit_flag.getAndSet(0);
+ if (!g_config.parseConfig(config_file)){
+ LOG_ERROR("dataproxy_sdk_cpp init error");
+
+ // don't use default, return directly
+ if (!use_def)
+ {
+ LOG_ERROR("not using default config, dataproxy_sdk_cpp should be inited again!");
+ init_flag.compareAndSwap(1, 0);
+ return SDKInvalidResult::kErrorInit;
+ }
+
+ }
+ return init_helper();
}
int32_t sendBaseMsg(const std::string msg,
@@ -226,14 +217,14 @@ namespace dataproxy_sdk
}
// msg len check
- if (msg.size() > g_config->ext_pack_size_)
+ if (msg.size() > g_config.ext_pack_size_)
{
- LOG_ERROR("msg len is too long, cur msg_len:%d, ext_pack_size:%d", msg.size(), g_config->ext_pack_size_);
+ LOG_ERROR("msg len is too long, cur msg_len:%d, ext_pack_size:%d", msg.size(), g_config.ext_pack_size_);
return SDKInvalidResult::kMsgTooLong;
}
- if(g_config->enable_groupId_isolation_){
- if(std::find(g_config->inlong_group_ids_.begin(),g_config->inlong_group_ids_.end(),inlong_group_id)==g_config->inlong_group_ids_.end()){
+ if(g_config.enable_groupId_isolation_){
+ if(std::find(g_config.inlong_group_ids_.begin(),g_config.inlong_group_ids_.end(),inlong_group_id)==g_config.inlong_group_ids_.end()){
LOG_ERROR("inlong_group_id:%s is not specified in config file, check it", inlong_group_id.c_str());
return SDKInvalidResult::kInvalidGroupId;
}
@@ -316,7 +307,10 @@ namespace dataproxy_sdk
std::this_thread::sleep_for(std::chrono::milliseconds(max_waitms));
- g_queues->printTotalAck(); // pring ack msg count of each groupid+streamid
+ if(g_queues)
+ {
+ g_queues->printTotalAck(); // pring ack msg count of each groupid+streamid
+ }
delete g_queues;
g_queues=nullptr;
@@ -327,8 +321,6 @@ namespace dataproxy_sdk
g_pools=nullptr;
delete g_clusters;
g_clusters=nullptr;
- delete g_config;
- g_config=nullptr;
// std::this_thread::sleep_for(std::chrono::seconds(5));
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h
index 14dafb3a5..dccf2ce1d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h
@@ -53,7 +53,7 @@ using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
using io_context_work = asio::executor_work_guard<asio::io_context::executor_type>;
extern AtomicUInt g_send_msgid;
-extern ClientConfig* g_config;
+extern ClientConfig g_config;
extern GlobalCluster* g_clusters;
extern GlobalQueues* g_queues;
extern TotalPools* g_pools;
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc
index 435f064ee..33473c413 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc
@@ -281,7 +281,10 @@ std::string Utils::base64_encode(const std::string& data)
std::string Utils::genBasicAuthCredential(const std::string& id, const std::string& key)
{
std::string credential = id + constants::kBasicAuthJoiner + key;
- return constants::kBasicAuthPrefix + constants::kBasicAuthSeparator + base64_encode(credential);
+ std::string result = constants::kBasicAuthPrefix;
+ result.append(constants::kBasicAuthSeparator);
+ result.append(base64_encode(credential));
+ return result;
}
int32_t Utils::requestUrl(std::string& res, const HttpRequest* request)
@@ -303,7 +306,9 @@ int32_t Utils::requestUrl(std::string& res, const HttpRequest* request)
if ( request->need_auth && !request->auth_id.empty() && !request->auth_key.empty())
{
// Authorization: Basic xxxxxxxx
- std::string auth = constants::kBasicAuthHeader + constants::kBasicAuthSeparator + genBasicAuthCredential(request->auth_id, request->auth_key);
+ std::string auth = constants::kBasicAuthHeader;
+ auth.append(constants::kBasicAuthSeparator);
+ auth.append(genBasicAuthCredential(request->auth_id, request->auth_key));
LOG_INFO("request manager, auth-header:%s", auth.c_str());
list = curl_slist_append(list, auth.c_str());
}
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc
index aaa6ce85b..55ed55f45 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc
@@ -93,12 +93,12 @@ namespace dataproxy_sdk
std::lock_guard<std::mutex> buf_lck(send_buf->mutex_);
auto self = shared_from_this();
- if (g_config->retry_num_ > 0 && g_config->retry_interval_ > 0) //resend if need
+ if (g_config.retry_num_ > 0 && g_config.retry_interval_ > 0) //resend if need
{
executor_->postTask([self, this, send_buf]
{
send_buf->timeout_timer_ = executor_->createSteadyTimer();
- send_buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+ send_buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config.retry_interval_));
send_buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, shared_from_this(), std::placeholders::_1, send_buf)); });
}
if (send_buf->target()->isStop())
@@ -137,7 +137,7 @@ namespace dataproxy_sdk
{
if (ec) // timer is cancelled, two cases: 1.ackbuf->sendbuf.reset;2.msg_type=2,conn.doWrite->cancel
{
- if (g_config->msg_type_ == 2)
+ if (g_config.msg_type_ == 2)
{
LOG_TRACE("msg_type is 2, no need ackmsg, clear buf(uid:%d) directly", buf->uniqId());
ackBuf(buf->uniqId());
@@ -155,19 +155,19 @@ namespace dataproxy_sdk
if (buf->getAlreadySend() == 2)
{
LOG_INFO("buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s) ackmsg timeout, send %d times(max retry_num:%d)", buf->uniqId(), buf->inlong_group_id().c_str(),
- buf->inlong_stream_id().c_str(), buf->getAlreadySend(), g_config->retry_num_);
+ buf->inlong_stream_id().c_str(), buf->getAlreadySend(), g_config.retry_num_);
}
else
{
LOG_DEBUG("buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s) ackmsg timeout, send %d times(max retry_num:%d)", buf->uniqId(), buf->inlong_group_id().c_str(),
- buf->inlong_stream_id().c_str(), buf->getAlreadySend(), g_config->retry_num_);
+ buf->inlong_stream_id().c_str(), buf->getAlreadySend(), g_config.retry_num_);
}
// max_retry_num, usercallback
- if (buf->getAlreadySend() >= g_config->retry_num_ || buf->fail_create_conn_.get() >= constants::kMaxRetryConnection)
+ if (buf->getAlreadySend() >= g_config.retry_num_ || buf->fail_create_conn_.get() >= constants::kMaxRetryConnection)
{
LOG_WARN("fail to send buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s), has send max_retry_num(%d) times, start usercallback", buf->uniqId(), buf->inlong_group_id().c_str(),
- buf->inlong_stream_id().c_str(), g_config->retry_num_);
+ buf->inlong_stream_id().c_str(), g_config.retry_num_);
buf->doUserCallBack();
buf->reset();
}
@@ -181,7 +181,7 @@ namespace dataproxy_sdk
{
LOG_INFO("fail to create new conn to send buf, inlong_group_id:%s, inlong_stream_id:%s", buf->inlong_group_id().c_str(), buf->inlong_stream_id().c_str());
buf->fail_create_conn_.increment();
- buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+ buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config.retry_interval_));
buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, shared_from_this(), std::placeholders::_1, buf));
return;
}
@@ -190,7 +190,7 @@ namespace dataproxy_sdk
buf->target()->sendBuf(buf);
buf->increaseRetryNum();
- buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+ buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config.retry_interval_));
buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, shared_from_this(), std::placeholders::_1, buf));
}
}
@@ -230,13 +230,13 @@ namespace dataproxy_sdk
{
if (inlong_group_id.empty())
{
- LOG_INFO("STATE|pool_id:%d, current_use:%d(total:%d), invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", pool_id_, currentUse(),
- g_config->bufNum(), has_send_buf_.get(), has_ack_.get(), waiting_ack_.get());
+ LOG_STAT("STATE|pool_id:%d, current_use:%d(total:%d), invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", pool_id_, currentUse(),
+ g_config.bufNum(), has_send_buf_.get(), has_ack_.get(), waiting_ack_.get());
}
else
{
- LOG_INFO("STATE|inlong_group_id:%s, pool_id:%d, current_use:%d(total:%d), invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", inlong_group_id.c_str(), pool_id_, currentUse(),
- g_config->bufNum(), has_send_buf_.get(), has_ack_.get(), waiting_ack_.get());
+ LOG_STAT("STATE|inlong_group_id:%s, pool_id:%d, current_use:%d(total:%d), invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", inlong_group_id.c_str(), pool_id_, currentUse(),
+ g_config.bufNum(), has_send_buf_.get(), has_ack_.get(), waiting_ack_.get());
}
}
@@ -251,34 +251,34 @@ namespace dataproxy_sdk
TotalPools::TotalPools() : next_(0), mutex_()
{
- if (g_config->enable_groupId_isolation_) // different groupid data use different bufpool
+ if (g_config.enable_groupId_isolation_) // different groupid data use different bufpool
{
- for (int32_t i = 0; i < g_config->inlong_group_ids_.size(); i++) // create a bufpool for ervery groupidS
+ for (int32_t i = 0; i < g_config.inlong_group_ids_.size(); i++) // create a bufpool for ervery groupidS
{
std::vector<BufferPoolPtr> groupid_pool;
- groupid_pool.reserve(g_config->buffer_num_per_groupId_);
- for (int32_t j = 0; j < g_config->buffer_num_per_groupId_; j++)
+ groupid_pool.reserve(g_config.buffer_num_per_groupId_);
+ for (int32_t j = 0; j < g_config.buffer_num_per_groupId_; j++)
{
- groupid_pool.push_back(std::make_shared<BufferPool>(j, g_config->bufNum(), g_config->buf_size_));
+ groupid_pool.push_back(std::make_shared<BufferPool>(j, g_config.bufNum(), g_config.buf_size_));
}
- groupid2pool_map_[g_config->inlong_group_ids_[i]] = groupid_pool;
- groupid2next_[g_config->inlong_group_ids_[i]] = 0;
+ groupid2pool_map_[g_config.inlong_group_ids_[i]] = groupid_pool;
+ groupid2next_[g_config.inlong_group_ids_[i]] = 0;
}
}
else // round-robin
{
- pools_.reserve(g_config->shared_buf_nums_);
- for (int i = 0; i < g_config->shared_buf_nums_; i++)
+ pools_.reserve(g_config.shared_buf_nums_);
+ for (int i = 0; i < g_config.shared_buf_nums_; i++)
{
- pools_.push_back(std::make_shared<BufferPool>(i, g_config->bufNum(), g_config->buf_size_));
+ pools_.push_back(std::make_shared<BufferPool>(i, g_config.bufNum(), g_config.buf_size_));
}
}
}
BufferPoolPtr TotalPools::getPool(const std::string &inlong_group_id)
{
- if (g_config->enable_groupId_isolation_) // groupid isolate
+ if (g_config.enable_groupId_isolation_) // groupid isolate
{
auto groupid_pool = groupid2pool_map_.find(inlong_group_id);
if (groupid_pool == groupid2pool_map_.end() || groupid_pool->second.empty())
@@ -332,7 +332,7 @@ namespace dataproxy_sdk
bool TotalPools::isPoolAvailable(const std::string &inlong_group_id)
{
- if (g_config->enable_groupId_isolation_) // groupid_isolation
+ if (g_config.enable_groupId_isolation_) // groupid_isolation
{
auto groupid_pool = groupid2pool_map_.find(inlong_group_id);
if (groupid_pool == groupid2pool_map_.end())
@@ -381,7 +381,7 @@ namespace dataproxy_sdk
void TotalPools::showState()
{
- if (g_config->enable_groupId_isolation_)
+ if (g_config.enable_groupId_isolation_)
{
for (auto &groupid_pool : groupid2pool_map_)
{
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc
index 8d5653ba9..4d6068f60 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc
@@ -65,7 +65,7 @@ namespace dataproxy_sdk
ExecutorThreadPool::ExecutorThreadPool() : next_idx_(0)
{
- for (int i = 0; i < g_config->thread_nums_; i++)
+ for (int i = 0; i < g_config.thread_nums_; i++)
{
executors_.emplace_back(std::make_shared<ExecutorThread>(i));
}
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc
index 88e6c6ca9..f4fe6c3af 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc
@@ -50,7 +50,7 @@ namespace dataproxy_sdk
status_ = kConnecting;
timer_->expires_after(std::chrono::milliseconds(kConnectTimeout));
timer_->async_wait(std::bind(&Connection::connectHandler, this, std::placeholders::_1));
- if (g_config->enable_TCP_nagle_ == false)
+ if (g_config.enable_TCP_nagle_ == false)
{
socket_->set_option(asio::ip::tcp::no_delay(true));
} // close nagle
@@ -140,10 +140,10 @@ namespace dataproxy_sdk
remote_info_.c_str(), send_err_nums_.get(), ec.message().c_str());
}
- if (retry_hb_.get() > g_config->retry_num_) //close and create new conn
+ if (retry_hb_.get() > g_config.retry_num_) //close and create new conn
{
LOG_ERROR("conn l:%s->r:%s send_error_num:%d, more than max_retry_num:%d, this conn will close", local_info_.c_str(),
- remote_info_.c_str(), retry_hb_.get(), g_config->retry_num_);
+ remote_info_.c_str(), retry_hb_.get(), g_config.retry_num_);
doClose(&ec);
} });
}
@@ -205,7 +205,7 @@ namespace dataproxy_sdk
length, curBuf->len(), curBuf->uniqId());
//ack buf
- if (g_config->msg_type_ == 2) { curBuf->timeout_timer_->cancel(); }
+ if (g_config.msg_type_ == 2) { curBuf->timeout_timer_->cancel(); }
}
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/client_config_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/client_config_test.cc
index 695f0685e..cecfbb1b5 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/client_config_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/client_config_test.cc
@@ -32,23 +32,24 @@ using namespace dataproxy_sdk;
TEST(clientBase, test1)
{
- ClientConfig client = ClientConfig("config.json");
- EXPECT_EQ(client.parseConfig(), true);
+ ClientConfig client;
+ EXPECT_EQ(client.parseConfig("config.json"), true);
// EXPECT_EQ(client.bufNum(), 1204);
- ClientConfig client2 = ClientConfig("nochconfig.json");
- EXPECT_EQ(client2.parseConfig(), false);
+ EXPECT_EQ(client.parseConfig("nochconfig.json"), false);
}
TEST(client, test2)
{
- ClientConfig client = ClientConfig("emptyconfig.json");
- EXPECT_EQ(client.parseConfig(), true);
+ ClientConfig client;
+ EXPECT_EQ(client.parseConfig("emptyconfig.json"), true);
}
TEST(client, init)
{
- ClientConfig client = ClientConfig("proxy_url", false, "", "key");
+ ClientConfig client;
+ client.proxy_URL_ = "proxy_url";
+ client.auth_key_ = "key";
EXPECT_EQ(client.proxy_URL_, "proxy_url");
EXPECT_EQ(client.need_auth_, false);
EXPECT_EQ(client.auth_id_, "");
@@ -58,9 +59,9 @@ TEST(client, init)
TEST(sdk, init)
{
- ClientConfig client = ClientConfig("proxy_url", false, "", "key");
+ ClientConfig client;
int32_t init_first = tc_api_init("./release/conf/config_example.json");
- int32_t init_second = tc_api_init(&client);
+ int32_t init_second = tc_api_init(client);
EXPECT_EQ(init_first, 0);
EXPECT_EQ(init_second, SDKInvalidResult::kMultiInit);
EXPECT_EQ(tc_api_close(1000), 0);
@@ -69,8 +70,6 @@ TEST(sdk, init)
int main(int argc, char* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 2, true, "./newlogs/");
-
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/pack_queue_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/pack_queue_test.cc
index 4f5fd661a..4823aa858 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/pack_queue_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/pack_queue_test.cc
@@ -24,9 +24,9 @@ using namespace testing;
// #if 0
TEST(packqueue, basetest)
{
- g_config->msg_type_ = 3;
- g_config->retry_num_ = 100;
- g_config->enable_pack_ = false;
+ g_config.msg_type_ = 3;
+ g_config.retry_num_ = 100;
+ g_config.enable_pack_ = false;
ExecutorThreadPtr th1 = make_shared<ExecutorThread>(1);
ProxyInfoPtr proxy = make_shared<ProxyInfo>(1, "127.0.0.1", 4000);
@@ -70,9 +70,9 @@ TEST(packqueue, basetest)
// #endif
TEST(packzip, test)
{
- g_config->msg_type_ = 3;
- g_config->enable_pack_ = false;
- g_config->enable_zip_ = false;
+ g_config.msg_type_ = 3;
+ g_config.enable_pack_ = false;
+ g_config.enable_zip_ = false;
string msg2(1024, 'a');
msg2 += "end";
@@ -91,10 +91,10 @@ TEST(packzip, test)
TEST(packqueue, packtest)
{
- g_config->msg_type_ = 5;
- g_config->retry_num_ = 100;
- g_config->enable_pack_ = false;
- g_config->enable_zip_ = false;
+ g_config.msg_type_ = 5;
+ g_config.retry_num_ = 100;
+ g_config.enable_pack_ = false;
+ g_config.enable_zip_ = false;
string inlong_group_id = "inlong_groupid_1";
string inlong_stream_id = "inlong_streamid_1";
@@ -116,16 +116,13 @@ TEST(packqueue, packtest)
buf->setLen(out_len);
LOG_DEBUG("buf content is%d", buf->content());
delete buf;
- delete g_config;
}
int main(int argc, char* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 1, true, "./");
- g_config = new ClientConfig("config.json");
- g_config->parseConfig();
- Utils::getFirstIpAddr(g_config->ser_ip_);
+ g_config.parseConfig("config.json");
+ Utils::getFirstIpAddr(g_config.ser_ip_);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/proxylist_config_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/proxylist_config_test.cc
index a176bef1b..41d437669 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/proxylist_config_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/proxylist_config_test.cc
@@ -27,9 +27,8 @@ TEST(businfo, basetest)
TEST(clusterProxylist, readCachelist) //.bus_list.ini文件中缓存有groupid_test1和groupid_test2
{
- g_config = new ClientConfig("config.json");
- EXPECT_EQ(g_config->parseConfig(), true);
- EXPECT_EQ(Utils::getFirstIpAddr(g_config->ser_ip_), true);
+ EXPECT_EQ(g_config.parseConfig("config.json"), true);
+ EXPECT_EQ(Utils::getFirstIpAddr(g_config.ser_ip_), true);
GlobalClusterMock *m_clusters = new GlobalClusterMock();
@@ -49,13 +48,11 @@ TEST(clusterProxylist, readCachelist) //.bus_list.ini文件中缓存有groupid_t
delete g_clusters;
delete g_executors;
- delete g_config;
}
TEST(clusterProxylist, basetest)
{
- g_config = new ClientConfig("config.json");
- EXPECT_EQ(g_config->parseConfig(), true);
+ EXPECT_EQ(g_config.parseConfig("config.json"), true);
ClusterProxyListPtr cluster = make_shared<ClusterProxyList>();
GlobalCluster *g_cluster = new GlobalCluster();
string meta_info;
@@ -78,13 +75,11 @@ TEST(clusterProxylist, basetest)
EXPECT_EQ(cluster2->load(), 20);
EXPECT_EQ(cluster2->isNeedLoadBalance(), false);
delete g_cluster;
- delete g_config;
}
TEST(clusterProxylist, addBuslistAndUpdate1)
{
- g_config = new ClientConfig("config.json");
- EXPECT_EQ(g_config->parseConfig(), true);
+ EXPECT_EQ(g_config.parseConfig("config.json"), true);
g_clusters = new GlobalCluster();
g_executors = new ExecutorThreadPool();
this_thread::sleep_for(std::chrono::seconds(5));
@@ -99,8 +94,6 @@ TEST(clusterProxylist, addBuslistAndUpdate1)
delete g_clusters;
delete g_executors;
- delete g_config;
-
// this_thread::sleep_for(std::chrono::seconds(10));
}
@@ -121,9 +114,8 @@ void addExistBidFunc(GlobalCluster *gcluster)
TEST(clusterProxylist, connInit)
{
- g_config = new ClientConfig("config.json");
- EXPECT_EQ(g_config->parseConfig(), true);
- EXPECT_EQ(Utils::getFirstIpAddr(g_config->ser_ip_), true);
+ EXPECT_EQ(g_config.parseConfig("config.json"), true);
+ EXPECT_EQ(Utils::getFirstIpAddr(g_config.ser_ip_), true);
g_clusters = new GlobalCluster();
g_executors = new ExecutorThreadPool();
@@ -144,15 +136,12 @@ TEST(clusterProxylist, connInit)
delete g_clusters;
delete g_executors;
- delete g_config;
this_thread::sleep_for(std::chrono::minutes(1));
}
int main(int argc, char *argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 2, true, "./");
-
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/test_log.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/test_log.cc
index cd9cd4424..5b207f54d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/test_log.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/test_log.cc
@@ -45,7 +45,7 @@ void logfunc()
int main(int argc, char const* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 2, true, "./log/");
+ debug_init_log();
LOG_INFO("this is info log %d", 1);
std::string str = "tttttt";
LOG_TRACE("this is trace log:%s", str.c_str());
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/test_utils.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/test_utils.cc
index d7684b9b1..33e23de28 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/test_utils.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/base/test_utils.cc
@@ -199,8 +199,6 @@ TEST(splitStr, test)
int main(int argc, char* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 1, true, "./logs/");
-
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/main/init_exit_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/main/init_exit_test.cc
index 64d2f2b34..8a74cf493 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/main/init_exit_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/main/init_exit_test.cc
@@ -19,6 +19,19 @@
#include "../common.h"
+void subroutine()
+{
+ ClientConfig client_config;
+ tc_api_init(client_config);
+ for (size_t i = 0; i < 1000; i++)
+ {
+ LOG_ERROR("log error %d", i);
+ LOG_STAT("log stat %d", i);
+ LOG_INFO("log info %d", i);
+ }
+ tc_api_close(100);
+}
+
TEST(initAndclose, test)
{
EXPECT_EQ(tc_api_init("config.json"), 0);
@@ -28,6 +41,20 @@ TEST(initAndclose, test)
EXPECT_EQ(tc_api_close(100), SDKInvalidResult::kMultiExits);
}
+TEST(multiThreadInit, test)
+{
+ thread threads[20];
+ for (size_t i = 0; i < 20; i++)
+ {
+ threads[i] = thread(subroutine);
+ }
+ for (size_t i = 0; i < 20; i++)
+ {
+ threads[i].join();
+ }
+
+}
+
int main(int argc, char* argv[])
{
testing::InitGoogleTest(&argc, argv);
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/big_pack_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/big_pack_test.cc
index 6d76036a9..714fb7091 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/big_pack_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/big_pack_test.cc
@@ -141,7 +141,7 @@ class BigSizeConn : public enable_shared_from_this<BigSizeConn>
}
else
{
- LOG_ERROR("write error:%s", ec.message().c_str())
+ LOG_ERROR("write error:%s", ec.message().c_str());
doClose(&ec);
}
});
@@ -166,7 +166,6 @@ char data[capacity] = {0};
int main(int argc, char* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 2, true, "./");
// g_config = new ClientConfig("config.json");
int runtimes = 10;
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/buffer_pool_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/buffer_pool_test.cc
index cf96c66c5..e531b8617 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/buffer_pool_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/buffer_pool_test.cc
@@ -40,9 +40,8 @@ using namespace dataproxy_sdk;
TEST(bufpool, basetest)
{
- g_config = new ClientConfig("config.json");
- EXPECT_EQ(g_config->parseConfig(), true);
- cout << g_config->bufNum() << endl;
+ EXPECT_EQ(g_config.parseConfig("config.json"), true);
+ cout << g_config.bufNum() << endl;
g_pools = new TotalPools();
EXPECT_NE(g_pools->getPool("groupid_1"), nullptr);
@@ -57,8 +56,6 @@ TEST(bufpool, basetest)
int main(int argc, char* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 2, true, "./");
-
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/executor_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/executor_test.cc
index 656dd5194..bf256cc52 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/executor_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/executor_test.cc
@@ -166,8 +166,6 @@ TEST(executor, repeattimer)
int main(int argc, char* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 1, true, "./newlogs/");
-
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/send_buffer_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/send_buffer_test.cc
index 3ea1f33b3..7138bb5cc 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/send_buffer_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/send_buffer_test.cc
@@ -82,8 +82,6 @@ TEST(senBuffer, mutextest) { SendBuffer* buf = new SendBuffer(1024); }
int main(int argc, char* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 1, true, "./logs/");
-
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/socket_connection_test.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/socket_connection_test.cc
index e1778ee1a..c628c0176 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/socket_connection_test.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/test/net/socket_connection_test.cc
@@ -25,9 +25,9 @@ ConnectionPtr conn1 = make_shared<Connection>(th1, proxy);
TEST(connection, sendBufTest1)
{
- g_config->msg_type_ = 3;
- g_config->retry_num_ = 100;
- g_config->enable_pack_ = false;
+ g_config.msg_type_ = 3;
+ g_config.retry_num_ = 100;
+ g_config.enable_pack_ = false;
EXPECT_EQ(conn1->getThreadId(), 1);
EXPECT_EQ(conn1->getWaitingSend(), 0);
@@ -56,8 +56,7 @@ TEST(connection, sendBufTest1)
int main(int argc, char* argv[])
{
- getLogger().init(5, 15, Logger::Level(4), 1, true, "./logs/");
- g_config = new ClientConfig("config.json");
+ g_config.parseConfig("config.json");
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/third_party/CMakeLists.txt b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/third_party/CMakeLists.txt
index 05c6ad757..305c773af 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/third_party/CMakeLists.txt
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/third_party/CMakeLists.txt
@@ -25,15 +25,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error")
include(ExternalProject)
-ExternalProject_Add(
- log4cplus_proj
- URL https://github.com/log4cplus/log4cplus/releases/download/REL_2_0_5/log4cplus-2.0.5.tar.gz
- CONFIGURE_COMMAND ./configure --prefix=<INSTALL_DIR> --disable-shared --with-pic CFLAGS=-O2\ CXXFLAGS=-O2\ -fPIC
- TEST_BEFORE_INSTALL 0
- BUILD_IN_SOURCE 1
- INSTALL_DIR ${CMAKE_BINARY_DIR}/third_party/
- )
-
ExternalProject_Add(
snappy_proj
URL https://github.com/google/snappy/archive/1.1.8.tar.gz