You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/29 09:43:03 UTC
[inlong] branch master updated: [INLONG-5256][SDK] DataProxy SDK(cpp) network operation and management (#5257)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e2975096e [INLONG-5256][SDK] DataProxy SDK(cpp) network operation and management (#5257)
e2975096e is described below
commit e2975096e4ede764866aeda36c42b9eb5a85ce17
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Fri Jul 29 17:42:59 2022 +0800
[INLONG-5256][SDK] DataProxy SDK(cpp) network operation and management (#5257)
---
.../src/net/executor_thread_pool.cc | 122 ++++++
.../src/net/executor_thread_pool.h | 84 ++++
.../dataproxy-sdk-cpp/src/net/socket_connection.cc | 485 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/net/socket_connection.h | 126 ++++++
4 files changed, 817 insertions(+)
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc
new file mode 100644
index 000000000..c7c0010fb
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "executor_thread_pool.h"
+
+#include "sdk_core.h"
+
+namespace dataproxy_sdk
+{
+ ExecutorThread::ExecutorThread(int32_t id)
+ : io_context_(std::make_shared<asio::io_context>()), work_(asio::make_work_guard(*io_context_)), worker_(std::bind(&ExecutorThread::startWorker, this)), thread_id_(id)
+ {
+ }
+
+ ExecutorThread::~ExecutorThread()
+ {
+ LOG_DEBUG("thread:%d destructor", thread_id_);
+ close();
+ if (worker_.joinable())
+ {
+ worker_.detach();
+ }
+ }
+
+ void ExecutorThread::close()
+ {
+ LOG_DEBUG("thread:%d close() --> io_context stop", thread_id_);
+ io_context_->stop();
+ if (std::this_thread::get_id() != worker_.get_id() && worker_.joinable())
+ {
+ LOG_DEBUG("join thread %d", thread_id_);
+ worker_.join();
+ }
+ }
+
+ void ExecutorThread::startWorker() //start listen
+ {
+ LOG_DEBUG("thread:%d start io_context run", thread_id_);
+ io_context_->run();
+ }
+
+ TcpSocketPtr ExecutorThread::createTcpSocket() //create socket based on io_context
+ {
+ return std::make_shared<asio::ip::tcp::socket>(*io_context_);
+ }
+
+ SteadyTimerPtr ExecutorThread::createSteadyTimer() //create timer based on io_context
+ {
+ return std::make_shared<asio::steady_timer>(*io_context_);
+ }
+
+ ExecutorThreadPool::ExecutorThreadPool() : next_idx_(0)
+ {
+ for (int i = 0; i < g_config->thread_nums_; i++)
+ {
+ executors_.emplace_back(std::make_shared<ExecutorThread>(i));
+ }
+
+ }
+
+ ExecutorThreadPtr ExecutorThreadPool::nextExecutor()
+ {
+ if (executors_.empty())
+ {
+ LOG_ERROR("fail to find executor thread pool");
+ return nullptr;
+ }
+ int32_t idx = (next_idx_++) % executors_.size();
+ return executors_[idx];
+ }
+
+ ExecutorThreadPtr ExecutorThreadPool::getExecutor(int32_t id)
+ {
+ if (id < 0 || id >= executors_.size() || executors_.empty())
+ {
+ LOG_ERROR("fail to get network_thread<id:%d>, max id is %d", id, executors_.size() - 1);
+ return nullptr;
+ }
+ return executors_[id];
+
+ }
+
+ void ExecutorThreadPool::showState()
+ {
+ for (auto &executor : executors_)
+ {
+ executor->showState();
+ }
+
+ }
+
+ void ExecutorThreadPool::close()
+ {
+ for (auto it : executors_)
+ {
+ if (it != nullptr)
+ {
+ it->close();
+ LOG_DEBUG("thread_id:%d, shared_count:%d", it->threadId(), it.use_count());
+ }
+ it.reset();
+ }
+ executors_.clear();
+
+ }
+} // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.h
new file mode 100644
index 000000000..e8f7013b9
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.h
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CPAI_NET_EXECUTOR_THREAD_POOL_H_
+#define CPAI_NET_EXECUTOR_THREAD_POOL_H_
+
+#include <asio.hpp>
+#include <functional>
+#include <memory>
+#include <stdint.h>
+#include <thread>
+
+#include "sdk_core.h"
+#include "client_config.h"
+#include "logger.h"
+#include "noncopyable.h"
+
+namespace dataproxy_sdk
+{
+ class ExecutorThread : noncopyable
+ {
+ private:
+ std::shared_ptr<asio::io_context> io_context_;
+ io_context_work work_; //guarantee io_context.run() don't exit
+ std::thread worker_;
+ int32_t thread_id_;
+
+ public:
+ AtomicUInt waiting_send_;
+
+ public:
+ ExecutorThread(int32_t id);
+ virtual ~ExecutorThread();
+
+ inline void postTask(std::function<void(void)> task) { io_context_->post(task); }
+ std::shared_ptr<asio::io_context> getIoContext() { return io_context_; }
+ TcpSocketPtr createTcpSocket();
+ SteadyTimerPtr createSteadyTimer();
+ inline void showState(){LOG_DEBUG("------->thread_id:%d, waiting_send:%d", thread_id_, waiting_send_.get());}
+ void close();
+
+ int32_t threadId() const { return thread_id_; }
+
+ private:
+ void startWorker();
+ };
+
+ //network threads
+ class ExecutorThreadPool : noncopyable
+ {
+ private:
+ // round-robin
+ std::vector<ExecutorThreadPtr> executors_;
+ int32_t next_idx_;
+
+ public:
+ explicit ExecutorThreadPool();
+ virtual ~ExecutorThreadPool() { close(); }
+
+ ExecutorThreadPtr nextExecutor(); // round-robbin, get executor thread, for new conn creating
+ ExecutorThreadPtr getExecutor(int32_t id);
+ void showState();
+ void close();
+ };
+
+} // namespace dataproxy_sdk
+
+#endif // CPAI_NET_EXECUTOR_THREAD_POOL_H_
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc
new file mode 100644
index 000000000..f350d2f68
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "socket_connection.h"
+
+#include <chrono>
+#include <mutex>
+#include <stdint.h>
+
+#include "buffer_pool.h"
+#include "proxylist_config.h"
+#include "sdk_constant.h"
+#include "client_config.h"
+#include "send_buffer.h"
+#include "utils.h"
+
+namespace dataproxy_sdk
+{
+ Connection::Connection(ExecutorThreadPtr &executor, ProxyInfoPtr &proxyinfo)
+ : executor_(executor), thread_id_(executor_->threadId()), socket_(std::move(executor_->createTcpSocket())), status_(kConnecting), timer_(std::move(executor->createSteadyTimer())), proxyinfo_(proxyinfo), remote_info_(proxyinfo_->getString()), recv_buf_(std::make_shared<RecvBuffer>()), loads_(30), total_send_(0), total_read_(0), waiting_send_(0), send_err_nums_(0), next_load_idx_(0)
+ {
+ asio::ip::tcp::endpoint ep(asio::ip::address::from_string(proxyinfo_->ip()), static_cast<uint16_t>(proxyinfo_->portNum()));
+ doConnect(ep);
+ }
+
+ Connection::~Connection() {}
+
+ void Connection::doConnect(const asio::ip::tcp::endpoint &ep)
+ {
+ if (isStop())
+ {
+ LOG_WARN("fail to doConnect %s, connection status is disconnected", remote_info_.c_str());
+ return;
+ }
+ status_ = kConnecting;
+ timer_->expires_after(std::chrono::milliseconds(kConnectTimeout));
+ timer_->async_wait(std::bind(&Connection::connectHandler, this, std::placeholders::_1));
+ if (g_config->enable_TCP_nagle_ == false)
+ {
+ socket_->set_option(asio::ip::tcp::no_delay(true));
+ } // close nagle
+ socket_->async_connect(ep, [this](const std::error_code &ec)
+ {
+ timer_->cancel();
+ if (ec)
+ {
+ status_ = kDisconnected;
+ LOG_ERROR("%s async connect error: %s,%s", remote_info_.c_str(), ec.message().c_str(), ec.category().name());
+ doClose();
+ return;
+ }
+ status_ = kConnected;
+ socket_->set_option(asio::ip::tcp::no_delay(true));
+ setLocalInfo();
+ LOG_INFO("l:%s->r:%s is connected", local_info_.c_str(), remote_info_.c_str());
+
+ doRead();
+ doWrite(); });
+ }
+
+ void Connection::sendBuf(SendBuffer *buf)
+ {
+ auto self = shared_from_this();
+ executor_->postTask([self, this, buf]()
+ {
+ if (std::find(write_queue_.begin(), write_queue_.end(), buf) != write_queue_.end())
+ {
+ LOG_ERROR("send_buf (uid:%d) is repeat in connection write_queue", buf->uniqId());
+ return;
+ }
+
+ bool queue_empty = write_queue_.empty();
+ write_queue_.push_back(buf);
+ waiting_send_.increment();
+ executor_->waiting_send_.increment();
+ LOG_TRACE("send_buf(uid:%d) is added to connection send queue, conn r:%s", buf->uniqId(), remote_info_.c_str());
+ if (isConnected() && queue_empty) { doWrite(); } });
+ }
+
+ void Connection::sendHB(bool isBinHB)
+ {
+ if (isStop())
+ {
+ return;
+ }
+ if (!write_queue_.empty())
+ {
+ LOG_INFO("conn l:%s->r:%s has cache msg to send, heartbeat will try next time", local_info_.c_str(), remote_info_.c_str());
+ return;
+ }
+ binHB_.total_len = htonl(sizeof(BinaryHB) - 4);
+ binHB_.msg_type = 8;
+ binHB_.data_time = htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000));
+ binHB_.body_ver = 1;
+ binHB_.body_len = 0;
+ binHB_.attr_len = 0;
+ binHB_.magic = htons(constants::kBinaryMagic);
+
+ char *hb;
+ uint32_t hb_len = 0;
+ if (isBinHB)
+ {
+ hb_len = sizeof(binHB_);
+ hb = (char *)&binHB_;
+ }
+ else
+ {
+ hb_len = sizeof(msgHB);
+ hb = (char *)msgHB;
+ }
+ LOG_DEBUG("conn l:%s->r:%s send %s", local_info_.c_str(), remote_info_.c_str(), isBinHB ? "binaryHB" : "msgHB");
+ auto self = shared_from_this();
+ asio::async_write(*socket_, asio::buffer(hb, hb_len), [self, this, hb_len, isBinHB](const std::error_code &ec, std::size_t reslen)
+ {
+ if (!ec && reslen == hb_len) // send success
+ {
+ LOG_DEBUG("conn l:%s->r:%s send %s successfully", local_info_.c_str(), remote_info_.c_str(), isBinHB ? "binaryHB" : "msgHB");
+ send_err_nums_.getAndSet(0);
+ retry_hb_.increment();
+ }
+ else
+ {
+ send_err_nums_.increment();
+ LOG_ERROR("conn l:%s->r:%s send heartbeat error, this conn send_error_num:%d, error message:%s", local_info_.c_str(),
+ remote_info_.c_str(), send_err_nums_.get(), ec.message().c_str());
+ }
+
+ if (retry_hb_.get() > g_config->retry_num_) //close and create new conn
+ {
+ LOG_ERROR("conn l:%s->r:%s send_error_num:%d, more than max_retry_num:%d, this conn will close", local_info_.c_str(),
+ remote_info_.c_str(), retry_hb_.get(), g_config->retry_num_);
+ doClose(&ec);
+ } });
+ }
+
+ void Connection::connClose()
+ {
+ auto self = shared_from_this();
+ executor_->postTask([self, this]()
+ { doClose(); });
+ LOG_DEBUG("post close request: conn l:%s->r:%s", local_info_.c_str(), remote_info_.c_str());
+ }
+
+ void Connection::doClose(const std::error_code *err)
+ {
+ if (isStop())
+ {
+ return;
+ }
+
+ status_ = kDisconnected;
+ LOG_WARN("close conn, l:%s->r:%s", local_info_.c_str(), remote_info_.c_str());
+ socket_->close();
+
+ //clean
+ write_queue_.clear();
+ recv_buf_->Reset();
+ }
+
+ void Connection::doWrite()
+ {
+ if (isStop())
+ {
+ return;
+ }
+ if (write_queue_.empty())
+ {
+ return;
+ }
+
+ auto self = shared_from_this();
+ auto curBuf = write_queue_.front();
+
+ std::lock_guard<std::mutex> buf_lck(curBuf->mutex_);
+ asio::async_write(*socket_, asio::buffer(curBuf->content(), curBuf->len()), [self, this, curBuf](const std::error_code &ec, std::size_t length)
+ {
+ write_queue_.pop_front();
+
+ if (!ec) //send success
+ {
+
+ { // lock sendbuf
+ std::lock_guard<std::mutex> buf_lck(curBuf->mutex_);
+
+ total_send_.increment();
+ waiting_send_.decrement();
+ executor_->waiting_send_.decrement();
+ send_err_nums_.getAndSet(0);
+ LOG_TRACE("l:%s->r:%s async write data success(len:%d), content_len:%d, buf_id:%d", local_info_.c_str(), remote_info_.c_str(),
+ length, curBuf->len(), curBuf->uniqId());
+
+ //ack buf
+ if (g_config->msg_type_ == 2) { curBuf->timeout_timer_->cancel(); }
+
+ }
+
+ doWrite();
+ }
+ else
+ {
+ std::lock_guard<std::mutex> buf_lck(curBuf->mutex_);
+ send_err_nums_.increment();
+ LOG_ERROR("l:%s->r:%s async write data error, buf_id:%d, error message:%s", local_info_.c_str(), remote_info_.c_str(), curBuf->uniqId(),
+ ec.message().c_str());
+ doClose(&ec);
+
+ auto new_conn = g_clusters->createActiveConn(curBuf->inlong_group_id(), thread_id_);
+ if (!new_conn) { curBuf->fail_create_conn_.increment(); }
+ curBuf->setTarget(new_conn);
+ return;
+ } });
+ }
+
+ void Connection::doRead()
+ {
+ if (isStop())
+ {
+ return;
+ }
+
+ if (recv_buf_->length() == 0)
+ recv_buf_->Reset();
+
+ recv_buf_->EnsureWritableBytes(1024);
+ auto self = shared_from_this();
+ socket_->async_receive(
+ asio::buffer(recv_buf_->WriteBegin(), recv_buf_->WritableBytes()), [self, this](const std::error_code &ec, std::size_t len)
+ {
+ if (ec)
+ {
+ LOG_ERROR("async read data error, l:%s->r:%s, error message:%s", local_info_.c_str(), remote_info_.c_str(), ec.message().c_str());
+ doClose(&ec);
+ return;
+ }
+ if (len == 0)
+ {
+ LOG_ERROR("async read 0 bytes, l:%s->r:%s", local_info_.c_str(), remote_info_.c_str());
+ doClose(&ec);
+ return;
+ }
+ recv_buf_->WriteBytes(len);
+
+ //read the rest content
+ std::error_code tmp_error;
+ size_t left_size = socket_->available(tmp_error);
+ if (left_size > 0 && !tmp_error)
+ {
+ recv_buf_->EnsureWritableBytes(left_size);
+ size_t rlen = socket_->receive(asio::buffer(recv_buf_->WriteBegin(), left_size));
+ if (rlen > 0) { recv_buf_->WriteBytes(rlen); }
+ }
+ LOG_TRACE("conn read %d bytes, l:%s->r:%s", recv_buf_->length(), local_info_.c_str(), remote_info_.c_str());
+ // parse ack package
+ doParse();
+ doRead(); });
+ }
+
+ void Connection::connectHandler(const std::error_code &ec)
+ {
+ if (ec)
+ return;
+ if (isStop())
+ return;
+ LOG_ERROR("connect timeout, %s", remote_info_.c_str());
+ doClose();
+ }
+
+ int32_t Connection::doParse()
+ {
+ while (true)
+ {
+ if (recv_buf_->length() < 5)
+ {
+ return 0;
+ }
+
+ if (recv_buf_->length() < recv_buf_->PeekUint32() + 4)
+ {
+ return 0;
+ }
+ //read ack package
+ uint32_t total_len = recv_buf_->ReadUint32();
+ uint8_t msg_type = recv_buf_->ReadUint8();
+ if (msg_type == 3 || msg_type == 5 || msg_type == 6 || (msg_type & 0x1F) == 7)
+ {
+ if ((msg_type & 0x1F) != 7)
+ {
+ bool ret = parseProtocolAck(total_len);
+ LOG_TRACE("parseProtocolAck success? %d, %s", ret, remote_info_.c_str());
+ }
+ else
+ {
+ bool ret = parseBinaryAck(total_len);
+ LOG_TRACE("parseBinaryAck success? %d, %s", ret, remote_info_.c_str());
+ }
+ }
+ else if (msg_type == 1 && total_len == 0x1)
+ {
+ retry_hb_.getAndSet(0);
+ LOG_TRACE("success to parse a msghb_ack from %s", remote_info_.c_str());
+ }
+ else if (msg_type == 8) //binary hb
+ {
+ retry_hb_.getAndSet(0);
+ bool ret = parseBinaryHB(total_len);
+ LOG_TRACE("parseBinaryHB success? %d,%s", ret, remote_info_.c_str());
+ }
+ else
+ {
+ //wrong msg_type
+ LOG_ERROR("parse ack, and get wrong msgtype: %d, proxy info%s", msg_type, remote_info_.c_str());
+ return 1;
+ // FIXME: need add other handler, such as close conn?
+ }
+ }
+ return 0;
+ }
+
+ bool Connection::parseProtocolAck(uint32_t total_len)
+ {
+ uint32_t body_len = recv_buf_->ReadUint32();
+ if (body_len > recv_buf_->length())
+ {
+ LOG_ERROR("body_len is %d, more than recv_buf left len:%d ", body_len, recv_buf_->length());
+ if (total_len < 4)
+ {
+ LOG_ERROR("total_len is less than 4, this should be check");
+ }
+ recv_buf_->Skip(total_len - 4);
+ return false;
+ }
+ recv_buf_->Skip(body_len);
+ LOG_TRACE("body_len is %d, and skip body");
+ uint32_t attr_len = recv_buf_->ReadUint32();
+ char attr[attr_len + 1];
+ memset(attr, 0x0, attr_len + 1);
+ strncpy(attr, recv_buf_->data(), attr_len);
+ recv_buf_->Skip(attr_len);
+ LOG_TRACE("attr_len is %d, attr info: %s", attr_len, attr);
+ uint32_t buf_uniqId = parseAttr(attr, attr_len);
+
+ auto bpr=g_pools->getUidBufPool(buf_uniqId);
+ if(bpr!=nullptr){
+ bpr->ackBuf(buf_uniqId);
+ return true;
+ }
+
+ return false;
+ }
+
+ bool Connection::parseBinaryAck(uint32_t total_len)
+ {
+ uint32_t uniq = recv_buf_->ReadUint32();
+ uint16_t attr_len = recv_buf_->ReadUint16();
+ recv_buf_->Skip(attr_len);
+ uint16_t magic = recv_buf_->ReadUint16();
+
+ if (total_len + 4 != 13 + attr_len)
+ {
+ LOG_ERROR("failed to parse binary ack, total_len(%d) + 4 != attr_len(%d) + 13", total_len, attr_len);
+ return false;
+ }
+ if (magic != constants::kBinaryMagic)
+ {
+ LOG_ERROR("failed to parse binary ack, get error magic: %d", magic);
+ return false;
+ }
+
+ auto bpr=g_pools->getUidBufPool(uniq);
+ if(bpr!=nullptr){
+ bpr->ackBuf(uniq);
+ }
+
+ return true;
+ }
+
+ bool Connection::parseBinaryHB(uint32_t total_len)
+ {
+ uint32_t data_time = recv_buf_->ReadUint32();
+ uint8_t body_ver = recv_buf_->ReadUint8();
+ uint32_t body_len = recv_buf_->ReadUint32();
+ uint16_t load = recv_buf_->PeekUint16(); // proxy load
+ recv_buf_->Skip(body_len);
+ uint16_t attr_len = recv_buf_->ReadUint16();
+ recv_buf_->Skip(attr_len);
+ uint16_t magic = recv_buf_->ReadUint16();
+
+ if (total_len + 4 != 18 + attr_len + body_len)
+ {
+ LOG_ERROR("failed to parse binary heartbeat ack, total_len(%d) + 4 != 18 + attr_len(%d) + body_len(%d)", total_len, attr_len, body_len);
+ return false;
+ }
+
+ if (magic != constants::kBinaryMagic)
+ {
+ LOG_ERROR("failed to parse binary heartbeat ack, get error magic: %d", magic);
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lck(load_mutex_);
+ if (body_ver == 1 && body_len == 2)
+ {
+ loads_[next_load_idx_ % 30] = load;
+ LOG_TRACE("update proxy%s load, cur_load:%d, cur_idx:%d", remote_info_.c_str(), load, next_load_idx_);
+ }
+ else
+ {
+ loads_[next_load_idx_ % 30] = 0;
+ LOG_TRACE("update proxy%s loads, cur_load:%d, cur_idx:%d", remote_info_.c_str(), load, next_load_idx_);
+ }
+ ++next_load_idx_;
+
+ return true;
+ }
+
+ uint32_t Connection::parseAttr(char *attr, int32_t attr_len)
+ {
+ char *mid = nullptr;
+ LOG_TRACE("ack attr:%s", attr);
+
+ mid = strstr(attr, "mid=");
+ if (!mid)
+ {
+ if (attr[attr_len - 1] != '\0')
+ {
+ attr[attr_len - 1] = '\0';
+ LOG_ERROR("force show len(%d) attr:%s.", attr_len, attr);
+ }
+ else
+ {
+ LOG_ERROR("show len(%d) attr:%s.", attr_len, attr);
+ }
+
+ return -1;
+ }
+
+ uint32_t buf_uniqId = atoi(&mid[4]);
+ LOG_TRACE("parse ack and get buf uid:%d", buf_uniqId);
+
+ return buf_uniqId;
+ }
+
+ int32_t Connection::getAvgLoad()
+ {
+ if (isStop())
+ return -1;
+
+ std::lock_guard<std::mutex> lck(load_mutex_);
+ int32_t numerator = 0;
+ int32_t denominator = 0;
+ for (int i = 0; i < loads_.size(); i++)
+ {
+ if (loads_[i] > 0)
+ {
+ numerator += loads_[i] * constants::kWeight[i];
+ denominator += constants::kWeight[i];
+ }
+ }
+ if (0 == denominator)
+ return 0;
+ return numerator / denominator;
+ }
+
+ void Connection::setLocalInfo()
+ {
+ local_info_ = "[ip:" + socket_->local_endpoint().address().to_string() + ", port:" + std::to_string(socket_->local_endpoint().port()) + "]";
+ }
+
+} // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.h
new file mode 100644
index 000000000..7eaf8a602
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.h
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_NET_CONNECTION_H_
+#define DATAPROXY_SDK_NET_CONNECTION_H_
+
+#include <asio.hpp>
+#include <deque>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <system_error>
+#include <unordered_map>
+
+#include "atomic.h"
+#include "sdk_core.h"
+#include "executor_thread_pool.h"
+#include "logger.h"
+#include "noncopyable.h"
+#include "recv_buffer.h"
+#include "msg_protocol.h"
+
+namespace dataproxy_sdk
+{
+ class SendBuffer;
+ class Connection : noncopyable, public std::enable_shared_from_this<Connection>
+ {
+ public:
+ enum Status
+ {
+ kConnecting,
+ kConnected,
+ kDisconnected
+ };
+
+ private:
+ ExecutorThreadPtr executor_; //network sending thread, executor:socket=1:n
+ TcpSocketPtr socket_;
+ std::atomic<Status> status_;
+ SteadyTimerPtr timer_;
+ ProxyInfoPtr proxyinfo_;
+ std::string local_info_; //local ip+port
+ std::string remote_info_;
+
+ std::deque<SendBuffer *> write_queue_; //waiting for sending
+ RecvBufferPtr recv_buf_;
+
+ BinaryHB binHB_ = {0}; //binary hb
+ char msgHB[5] = {0, 0, 0, 0x1, 0x1};
+
+ std::vector<int32_t> loads_; // connection loads in last 30 times
+ int32_t next_load_idx_;
+
+ int32_t thread_id_;
+
+ enum
+ {
+ kConnectTimeout = 1000 * 20, //ms //FIXME: improve, need fix?
+ };
+
+ AtomicUInt send_err_nums_;
+ AtomicUInt total_send_; //total send buf
+ AtomicUInt total_read_; //total read ack package
+ AtomicInt waiting_send_;
+ AtomicInt retry_hb_; //hb retry times, reset while receiving ack
+
+ mutable std::mutex load_mutex_; // loads lock
+
+ public:
+ Connection(ExecutorThreadPtr &executor, ProxyInfoPtr &proxyinfo);
+ ~Connection();
+
+ void sendBuf(SendBuffer* buf);
+ void sendHB(bool isBinHB);
+ void connClose();
+ Status status() const { return status_; }
+ inline bool isStop() const { return status_ == kDisconnected; }
+ inline bool isConnected() const { return status_ == kConnected; }
+ int32_t getThreadId() const { return thread_id_; }
+ inline int32_t getWaitingSend() { return waiting_send_.get(); }
+ inline void decreaseWaiting()
+ {
+ if (waiting_send_.get() > 0)
+ {
+ waiting_send_.decrement();
+ executor_->waiting_send_.decrement();
+ }
+ }
+ std::string getRemoteInfo() const { return remote_info_; }
+ ProxyInfoPtr getBusInfo() const { return proxyinfo_; }
+
+ int32_t getAvgLoad();
+
+ private:
+ void doConnect(const asio::ip::tcp::endpoint &ep);
+ void doWrite(); //async send data
+ void doRead();
+ void connectHandler(const std::error_code &ec);
+ void setLocalInfo();
+ void doClose(const std::error_code *err = nullptr);
+ int32_t doParse();
+ bool parseProtocolAck(uint32_t total_len);
+ bool parseBinaryAck(uint32_t total_len);
+ bool parseBinaryHB(uint32_t total_len);
+ uint32_t parseAttr(char *attr, int32_t attr_len);
+ };
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_NET_CONNECTION_H_
\ No newline at end of file