You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/29 09:43:03 UTC

[inlong] branch master updated: [INLONG-5256][SDK] DataProxy SDK(cpp) network operation and management (#5257)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e2975096e [INLONG-5256][SDK] DataProxy SDK(cpp) network operation and management (#5257)
e2975096e is described below

commit e2975096e4ede764866aeda36c42b9eb5a85ce17
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Fri Jul 29 17:42:59 2022 +0800

    [INLONG-5256][SDK] DataProxy SDK(cpp) network operation and management (#5257)
---
 .../src/net/executor_thread_pool.cc                | 122 ++++++
 .../src/net/executor_thread_pool.h                 |  84 ++++
 .../dataproxy-sdk-cpp/src/net/socket_connection.cc | 485 +++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/net/socket_connection.h  | 126 ++++++
 4 files changed, 817 insertions(+)

diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc
new file mode 100644
index 000000000..c7c0010fb
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.cc
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "executor_thread_pool.h"
+
+#include "sdk_core.h"
+
+namespace dataproxy_sdk
+{
+    ExecutorThread::ExecutorThread(int32_t id)
+        : io_context_(std::make_shared<asio::io_context>()), work_(asio::make_work_guard(*io_context_)), worker_(std::bind(&ExecutorThread::startWorker, this)), thread_id_(id)
+    {
+    }
+
+    ExecutorThread::~ExecutorThread()
+    {
+        LOG_DEBUG("thread:%d destructor", thread_id_);
+        close();
+        if (worker_.joinable())
+        {
+            worker_.detach();
+        }
+    }
+
+    void ExecutorThread::close()
+    {
+        LOG_DEBUG("thread:%d close() --> io_context stop", thread_id_);
+        io_context_->stop();
+        if (std::this_thread::get_id() != worker_.get_id() && worker_.joinable())
+        {
+            LOG_DEBUG("join thread %d", thread_id_);
+            worker_.join();
+        }
+    }
+
+    void ExecutorThread::startWorker() //start listen
+    {
+        LOG_DEBUG("thread:%d start io_context run", thread_id_);
+        io_context_->run();
+    }
+
+    TcpSocketPtr ExecutorThread::createTcpSocket() //create socket based on io_context
+    {
+        return std::make_shared<asio::ip::tcp::socket>(*io_context_);
+    }
+
+    SteadyTimerPtr ExecutorThread::createSteadyTimer() //create timer based on io_context
+    {
+        return std::make_shared<asio::steady_timer>(*io_context_);
+    }
+
+    ExecutorThreadPool::ExecutorThreadPool() : next_idx_(0)
+    {
+        for (int i = 0; i < g_config->thread_nums_; i++)
+        {
+            executors_.emplace_back(std::make_shared<ExecutorThread>(i));
+        }
+        
+    }
+
+    ExecutorThreadPtr ExecutorThreadPool::nextExecutor()
+    {
+        if (executors_.empty())
+        {
+            LOG_ERROR("fail to find executor thread pool");
+            return nullptr;
+        }
+        int32_t idx = (next_idx_++) % executors_.size();
+        return executors_[idx];       
+    }
+
+    ExecutorThreadPtr ExecutorThreadPool::getExecutor(int32_t id)
+    {
+        if (id < 0 || id >= executors_.size() || executors_.empty())
+        {
+            LOG_ERROR("fail to get network_thread<id:%d>, max id is %d", id, executors_.size() - 1);
+            return nullptr;
+        }
+        return executors_[id];
+        
+    }
+
+    void ExecutorThreadPool::showState()
+    {
+        for (auto &executor : executors_)
+        {
+            executor->showState();
+        }
+        
+    }
+
+    void ExecutorThreadPool::close()
+    {
+        for (auto it : executors_)
+        {
+            if (it != nullptr)
+            {
+                it->close();
+                LOG_DEBUG("thread_id:%d, shared_count:%d", it->threadId(), it.use_count());
+            }
+            it.reset();
+        }
+        executors_.clear();
+        
+    }
+} // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.h
new file mode 100644
index 000000000..e8f7013b9
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/executor_thread_pool.h
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CPAI_NET_EXECUTOR_THREAD_POOL_H_
+#define CPAI_NET_EXECUTOR_THREAD_POOL_H_
+
+#include <asio.hpp>
+#include <functional>
+#include <memory>
+#include <stdint.h>
+#include <thread>
+
+#include "sdk_core.h"
+#include "client_config.h"
+#include "logger.h"
+#include "noncopyable.h"
+
+namespace dataproxy_sdk
+{
+  class ExecutorThread : noncopyable
+  {
+  private:
+    std::shared_ptr<asio::io_context> io_context_;
+    io_context_work work_; //guarantee io_context.run() don't exit
+    std::thread worker_;
+    int32_t thread_id_;
+
+  public:
+    AtomicUInt waiting_send_;
+
+  public:
+    ExecutorThread(int32_t id);
+    virtual ~ExecutorThread();
+
+    inline void postTask(std::function<void(void)> task) { io_context_->post(task); }
+    std::shared_ptr<asio::io_context> getIoContext() { return io_context_; }
+    TcpSocketPtr createTcpSocket();
+    SteadyTimerPtr createSteadyTimer();
+    inline void showState(){LOG_DEBUG("------->thread_id:%d, waiting_send:%d", thread_id_, waiting_send_.get());}
+    void close();
+
+    int32_t threadId() const { return thread_id_; }
+    
+  private:
+    void startWorker();
+  };
+
+  //network threads
+  class ExecutorThreadPool : noncopyable
+  {
+  private:
+    // round-robin
+    std::vector<ExecutorThreadPtr> executors_;
+    int32_t next_idx_;
+
+  public:
+    explicit ExecutorThreadPool();
+    virtual ~ExecutorThreadPool() { close(); }
+
+    ExecutorThreadPtr nextExecutor(); // round-robbin, get executor thread, for new conn creating
+    ExecutorThreadPtr getExecutor(int32_t id);
+    void showState();
+    void close();
+  };
+
+} // namespace dataproxy_sdk
+
+#endif // CPAI_NET_EXECUTOR_THREAD_POOL_H_
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc
new file mode 100644
index 000000000..f350d2f68
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.cc
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "socket_connection.h"
+
+#include <chrono>
+#include <mutex>
+#include <stdint.h>
+
+#include "buffer_pool.h"
+#include "proxylist_config.h"
+#include "sdk_constant.h"
+#include "client_config.h"
+#include "send_buffer.h"
+#include "utils.h"
+
+namespace dataproxy_sdk
+{
+    Connection::Connection(ExecutorThreadPtr &executor, ProxyInfoPtr &proxyinfo)
+        : executor_(executor), thread_id_(executor_->threadId()), socket_(std::move(executor_->createTcpSocket())), status_(kConnecting), timer_(std::move(executor->createSteadyTimer())), proxyinfo_(proxyinfo), remote_info_(proxyinfo_->getString()), recv_buf_(std::make_shared<RecvBuffer>()), loads_(30), total_send_(0), total_read_(0), waiting_send_(0), send_err_nums_(0), next_load_idx_(0)
+    {
+        asio::ip::tcp::endpoint ep(asio::ip::address::from_string(proxyinfo_->ip()), static_cast<uint16_t>(proxyinfo_->portNum()));
+        doConnect(ep);
+    }
+
+    Connection::~Connection() {}
+
+    void Connection::doConnect(const asio::ip::tcp::endpoint &ep)
+    {
+        if (isStop())
+        {
+            LOG_WARN("fail to doConnect %s, connection status is disconnected", remote_info_.c_str());
+            return;
+        }
+        status_ = kConnecting;
+        timer_->expires_after(std::chrono::milliseconds(kConnectTimeout));
+        timer_->async_wait(std::bind(&Connection::connectHandler, this, std::placeholders::_1));
+        if (g_config->enable_TCP_nagle_ == false)
+        {
+            socket_->set_option(asio::ip::tcp::no_delay(true));
+        } // close nagle
+        socket_->async_connect(ep, [this](const std::error_code &ec)
+                               {
+        timer_->cancel();
+        if (ec)
+        {
+            status_ = kDisconnected;
+            LOG_ERROR("%s async connect error: %s,%s", remote_info_.c_str(), ec.message().c_str(), ec.category().name());
+            doClose();
+            return;
+        }
+        status_ = kConnected;
+        socket_->set_option(asio::ip::tcp::no_delay(true));
+        setLocalInfo();
+        LOG_INFO("l:%s->r:%s is connected", local_info_.c_str(), remote_info_.c_str());
+
+        doRead();
+        doWrite(); });
+    }
+
+    void Connection::sendBuf(SendBuffer *buf)
+    {
+        auto self = shared_from_this();
+        executor_->postTask([self, this, buf]()
+                            {
+        if (std::find(write_queue_.begin(), write_queue_.end(), buf) != write_queue_.end())
+        {
+            LOG_ERROR("send_buf (uid:%d) is repeat in connection write_queue", buf->uniqId());
+            return;
+        }
+
+        bool queue_empty = write_queue_.empty();
+        write_queue_.push_back(buf);
+        waiting_send_.increment();
+        executor_->waiting_send_.increment();
+        LOG_TRACE("send_buf(uid:%d) is added to connection send queue, conn r:%s", buf->uniqId(), remote_info_.c_str());
+        if (isConnected() && queue_empty) { doWrite(); } });
+    }
+
+    void Connection::sendHB(bool isBinHB)
+    {
+        if (isStop())
+        {
+            return;
+        }
+        if (!write_queue_.empty())
+        {
+            LOG_INFO("conn l:%s->r:%s has cache msg to send, heartbeat will try next time", local_info_.c_str(), remote_info_.c_str());
+            return;
+        }
+        binHB_.total_len = htonl(sizeof(BinaryHB) - 4);
+        binHB_.msg_type = 8;
+        binHB_.data_time = htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000));
+        binHB_.body_ver = 1;
+        binHB_.body_len = 0;
+        binHB_.attr_len = 0;
+        binHB_.magic = htons(constants::kBinaryMagic);
+
+        char *hb;
+        uint32_t hb_len = 0;
+        if (isBinHB)
+        {
+            hb_len = sizeof(binHB_);
+            hb = (char *)&binHB_;
+        }
+        else
+        {
+            hb_len = sizeof(msgHB);
+            hb = (char *)msgHB;
+        }
+        LOG_DEBUG("conn l:%s->r:%s send %s", local_info_.c_str(), remote_info_.c_str(), isBinHB ? "binaryHB" : "msgHB");
+        auto self = shared_from_this();
+        asio::async_write(*socket_, asio::buffer(hb, hb_len), [self, this, hb_len, isBinHB](const std::error_code &ec, std::size_t reslen)
+                          {
+        if (!ec && reslen == hb_len)  // send success
+        {
+            LOG_DEBUG("conn l:%s->r:%s send %s successfully", local_info_.c_str(), remote_info_.c_str(), isBinHB ? "binaryHB" : "msgHB");
+            send_err_nums_.getAndSet(0);
+            retry_hb_.increment();
+        }
+        else 
+        {
+            send_err_nums_.increment();
+            LOG_ERROR("conn l:%s->r:%s send heartbeat error, this conn send_error_num:%d, error message:%s", local_info_.c_str(),
+                      remote_info_.c_str(), send_err_nums_.get(), ec.message().c_str());
+        }
+
+        if (retry_hb_.get() > g_config->retry_num_)  //close and create new conn
+        {
+            LOG_ERROR("conn l:%s->r:%s send_error_num:%d, more than max_retry_num:%d, this conn will close", local_info_.c_str(),
+                      remote_info_.c_str(), retry_hb_.get(), g_config->retry_num_);
+            doClose(&ec);
+        } });
+    }
+
+    void Connection::connClose()
+    {
+        auto self = shared_from_this();
+        executor_->postTask([self, this]()
+                            { doClose(); });
+        LOG_DEBUG("post close request: conn l:%s->r:%s", local_info_.c_str(), remote_info_.c_str());
+    }
+
+    void Connection::doClose(const std::error_code *err)
+    {
+        if (isStop())
+        {
+            return;
+        }
+
+        status_ = kDisconnected;
+        LOG_WARN("close conn, l:%s->r:%s", local_info_.c_str(), remote_info_.c_str());
+        socket_->close();
+
+        //clean
+        write_queue_.clear();
+        recv_buf_->Reset();
+    }
+
+    void Connection::doWrite()
+    {
+        if (isStop())
+        {
+            return;
+        }
+        if (write_queue_.empty())
+        {
+            return;
+        }
+
+        auto self = shared_from_this();
+        auto curBuf = write_queue_.front();
+
+        std::lock_guard<std::mutex> buf_lck(curBuf->mutex_);
+        asio::async_write(*socket_, asio::buffer(curBuf->content(), curBuf->len()), [self, this, curBuf](const std::error_code &ec, std::size_t length)
+                          {
+        write_queue_.pop_front();
+
+        if (!ec)  //send success
+        {
+            
+            {  // lock sendbuf
+                std::lock_guard<std::mutex> buf_lck(curBuf->mutex_);
+
+                total_send_.increment();
+                waiting_send_.decrement();
+                executor_->waiting_send_.decrement();
+                send_err_nums_.getAndSet(0);
+                LOG_TRACE("l:%s->r:%s async write data success(len:%d), content_len:%d,  buf_id:%d", local_info_.c_str(), remote_info_.c_str(),
+                          length, curBuf->len(), curBuf->uniqId());
+
+                //ack buf
+                if (g_config->msg_type_ == 2) { curBuf->timeout_timer_->cancel(); }
+
+            }
+
+            doWrite();
+        }
+        else 
+        {
+            std::lock_guard<std::mutex> buf_lck(curBuf->mutex_);
+            send_err_nums_.increment();
+            LOG_ERROR("l:%s->r:%s async write data error, buf_id:%d, error message:%s", local_info_.c_str(), remote_info_.c_str(), curBuf->uniqId(),
+                      ec.message().c_str());
+            doClose(&ec);
+
+            auto new_conn = g_clusters->createActiveConn(curBuf->inlong_group_id(), thread_id_);
+            if (!new_conn) { curBuf->fail_create_conn_.increment(); }
+            curBuf->setTarget(new_conn);
+            return;
+        } });
+    }
+
+    void Connection::doRead()
+    {
+        if (isStop())
+        {
+            return;
+        }
+
+        if (recv_buf_->length() == 0)
+            recv_buf_->Reset();
+
+        recv_buf_->EnsureWritableBytes(1024);
+        auto self = shared_from_this();
+        socket_->async_receive(
+            asio::buffer(recv_buf_->WriteBegin(), recv_buf_->WritableBytes()), [self, this](const std::error_code &ec, std::size_t len)
+            {
+            if (ec)
+            {
+                LOG_ERROR("async read data error, l:%s->r:%s, error message:%s", local_info_.c_str(), remote_info_.c_str(), ec.message().c_str());
+                doClose(&ec);
+                return;
+            }
+            if (len == 0)
+            {
+                LOG_ERROR("async read 0 bytes, l:%s->r:%s", local_info_.c_str(), remote_info_.c_str());
+                doClose(&ec);
+                return;
+            }
+            recv_buf_->WriteBytes(len);
+
+            //read the rest content
+            std::error_code tmp_error;
+            size_t left_size = socket_->available(tmp_error);
+            if (left_size > 0 && !tmp_error)
+            {
+                recv_buf_->EnsureWritableBytes(left_size);
+                size_t rlen = socket_->receive(asio::buffer(recv_buf_->WriteBegin(), left_size));
+                if (rlen > 0) { recv_buf_->WriteBytes(rlen); }
+            }
+            LOG_TRACE("conn read %d bytes,  l:%s->r:%s", recv_buf_->length(), local_info_.c_str(), remote_info_.c_str());
+            // parse ack package
+            doParse();
+            doRead(); });
+    }
+
+    void Connection::connectHandler(const std::error_code &ec)
+    {
+        if (ec)
+            return;
+        if (isStop())
+            return;
+        LOG_ERROR("connect timeout, %s", remote_info_.c_str());
+        doClose();
+    }
+
+    int32_t Connection::doParse()
+    {
+        while (true)
+        {
+            if (recv_buf_->length() < 5)
+            {
+                return 0;
+            }
+
+            if (recv_buf_->length() < recv_buf_->PeekUint32() + 4)
+            {
+                return 0;
+            }
+            //read ack package
+            uint32_t total_len = recv_buf_->ReadUint32();
+            uint8_t msg_type = recv_buf_->ReadUint8();
+            if (msg_type == 3 || msg_type == 5 || msg_type == 6 || (msg_type & 0x1F) == 7)
+            {
+                if ((msg_type & 0x1F) != 7)
+                {
+                    bool ret = parseProtocolAck(total_len);
+                    LOG_TRACE("parseProtocolAck success? %d, %s", ret, remote_info_.c_str());
+                }
+                else
+                {
+                    bool ret = parseBinaryAck(total_len);
+                    LOG_TRACE("parseBinaryAck success? %d, %s", ret, remote_info_.c_str());
+                }
+            }
+            else if (msg_type == 1 && total_len == 0x1) 
+            {
+                retry_hb_.getAndSet(0);
+                LOG_TRACE("success to parse a msghb_ack from %s", remote_info_.c_str());
+            }
+            else if (msg_type == 8) //binary hb
+            {
+                retry_hb_.getAndSet(0);
+                bool ret = parseBinaryHB(total_len);
+                LOG_TRACE("parseBinaryHB success? %d,%s", ret, remote_info_.c_str());
+            }
+            else
+            {
+                //wrong msg_type 
+                LOG_ERROR("parse ack, and get wrong msgtype: %d, proxy info%s", msg_type, remote_info_.c_str());
+                return 1;
+                // FIXME: need add other handler, such as close conn?
+            }
+        }
+        return 0;
+    }
+
+    bool Connection::parseProtocolAck(uint32_t total_len)
+    {
+        uint32_t body_len = recv_buf_->ReadUint32();
+        if (body_len > recv_buf_->length())
+        {
+            LOG_ERROR("body_len is %d, more than recv_buf left len:%d ", body_len, recv_buf_->length());
+            if (total_len < 4)
+            {
+                LOG_ERROR("total_len is less than 4, this should be check");
+            }
+            recv_buf_->Skip(total_len - 4);
+            return false;
+        }
+        recv_buf_->Skip(body_len);
+        LOG_TRACE("body_len is %d, and skip body");
+        uint32_t attr_len = recv_buf_->ReadUint32();
+        char attr[attr_len + 1];
+        memset(attr, 0x0, attr_len + 1);
+        strncpy(attr, recv_buf_->data(), attr_len);
+        recv_buf_->Skip(attr_len);
+        LOG_TRACE("attr_len is %d, attr info: %s", attr_len, attr);
+        uint32_t buf_uniqId = parseAttr(attr, attr_len);
+
+        auto bpr=g_pools->getUidBufPool(buf_uniqId);
+        if(bpr!=nullptr){
+            bpr->ackBuf(buf_uniqId);
+            return true;
+        }
+
+        return false;
+    }
+
+    bool Connection::parseBinaryAck(uint32_t total_len)
+    {
+        uint32_t uniq = recv_buf_->ReadUint32();
+        uint16_t attr_len = recv_buf_->ReadUint16();
+        recv_buf_->Skip(attr_len);
+        uint16_t magic = recv_buf_->ReadUint16();
+
+        if (total_len + 4 != 13 + attr_len)
+        {
+            LOG_ERROR("failed to parse binary ack, total_len(%d) + 4 != attr_len(%d) + 13", total_len, attr_len);
+            return false;
+        }
+        if (magic != constants::kBinaryMagic)
+        {
+            LOG_ERROR("failed to parse binary ack, get error magic: %d", magic);
+            return false;
+        }
+
+        auto bpr=g_pools->getUidBufPool(uniq);
+        if(bpr!=nullptr){
+            bpr->ackBuf(uniq);
+        }
+
+        return true;
+    }
+
+    bool Connection::parseBinaryHB(uint32_t total_len)
+    {
+        uint32_t data_time = recv_buf_->ReadUint32();
+        uint8_t body_ver = recv_buf_->ReadUint8();
+        uint32_t body_len = recv_buf_->ReadUint32();
+        uint16_t load = recv_buf_->PeekUint16(); // proxy load
+        recv_buf_->Skip(body_len);
+        uint16_t attr_len = recv_buf_->ReadUint16();
+        recv_buf_->Skip(attr_len);
+        uint16_t magic = recv_buf_->ReadUint16();
+
+        if (total_len + 4 != 18 + attr_len + body_len)
+        {
+            LOG_ERROR("failed to parse binary heartbeat ack, total_len(%d) + 4 != 18 + attr_len(%d) + body_len(%d)", total_len, attr_len, body_len);
+            return false;
+        }
+
+        if (magic != constants::kBinaryMagic)
+        {
+            LOG_ERROR("failed to parse binary heartbeat ack, get error magic: %d", magic);
+            return false;
+        }
+
+        std::lock_guard<std::mutex> lck(load_mutex_);
+        if (body_ver == 1 && body_len == 2)
+        {
+            loads_[next_load_idx_ % 30] = load;
+            LOG_TRACE("update proxy%s load, cur_load:%d, cur_idx:%d", remote_info_.c_str(), load, next_load_idx_);
+        }
+        else
+        {
+            loads_[next_load_idx_ % 30] = 0;
+            LOG_TRACE("update proxy%s loads, cur_load:%d, cur_idx:%d", remote_info_.c_str(), load, next_load_idx_);
+        }
+        ++next_load_idx_;
+
+        return true;
+    }
+
+    uint32_t Connection::parseAttr(char *attr, int32_t attr_len)
+    {
+        char *mid = nullptr;
+        LOG_TRACE("ack attr:%s", attr);
+
+        mid = strstr(attr, "mid=");
+        if (!mid)
+        {
+            if (attr[attr_len - 1] != '\0')
+            {
+                attr[attr_len - 1] = '\0';
+                LOG_ERROR("force show len(%d) attr:%s.", attr_len, attr);
+            }
+            else
+            {
+                LOG_ERROR("show len(%d) attr:%s.", attr_len, attr);
+            }
+
+            return -1;
+        }
+
+        uint32_t buf_uniqId = atoi(&mid[4]);
+        LOG_TRACE("parse ack and get buf uid:%d", buf_uniqId);
+
+        return buf_uniqId;
+    }
+
+    int32_t Connection::getAvgLoad()
+    {
+        if (isStop())
+            return -1;
+
+        std::lock_guard<std::mutex> lck(load_mutex_);
+        int32_t numerator = 0;   
+        int32_t denominator = 0;
+        for (int i = 0; i < loads_.size(); i++)
+        {
+            if (loads_[i] > 0)
+            {
+                numerator += loads_[i] * constants::kWeight[i];
+                denominator += constants::kWeight[i];
+            }
+        }
+        if (0 == denominator)
+            return 0;
+        return numerator / denominator;
+    }
+
+    void Connection::setLocalInfo()
+    {
+        local_info_ = "[ip:" + socket_->local_endpoint().address().to_string() + ", port:" + std::to_string(socket_->local_endpoint().port()) + "]";
+    }
+
+} // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.h
new file mode 100644
index 000000000..7eaf8a602
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/socket_connection.h
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_NET_CONNECTION_H_
+#define DATAPROXY_SDK_NET_CONNECTION_H_
+
+#include <asio.hpp>
+#include <deque>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <system_error>
+#include <unordered_map>
+
+#include "atomic.h"
+#include "sdk_core.h"
+#include "executor_thread_pool.h"
+#include "logger.h"
+#include "noncopyable.h"
+#include "recv_buffer.h"
+#include "msg_protocol.h"
+
+namespace dataproxy_sdk
+{
+  class SendBuffer;
+  class Connection : noncopyable, public std::enable_shared_from_this<Connection>
+  {
+  public:
+    enum Status
+    {
+      kConnecting,
+      kConnected,
+      kDisconnected
+    };
+
+  private:
+    ExecutorThreadPtr executor_; //network sending thread, executor:socket=1:n
+    TcpSocketPtr socket_;
+    std::atomic<Status> status_;
+    SteadyTimerPtr timer_;
+    ProxyInfoPtr proxyinfo_;
+    std::string local_info_; //local ip+port
+    std::string remote_info_;
+
+    std::deque<SendBuffer *> write_queue_; //waiting for sending 
+    RecvBufferPtr recv_buf_;
+
+    BinaryHB binHB_ = {0};               //binary hb
+    char msgHB[5] = {0, 0, 0, 0x1, 0x1}; 
+
+    std::vector<int32_t> loads_; // connection loads in last 30 times
+    int32_t next_load_idx_;
+
+    int32_t thread_id_;
+
+    enum
+    {
+      kConnectTimeout = 1000 * 20, //ms //FIXME: improve, need fix?
+    };
+
+    AtomicUInt send_err_nums_; 
+    AtomicUInt total_send_;    //total send buf
+    AtomicUInt total_read_;    //total read ack package
+    AtomicInt waiting_send_;
+    AtomicInt retry_hb_; //hb retry times, reset while receiving ack
+
+    mutable std::mutex load_mutex_; // loads lock
+
+  public:
+    Connection(ExecutorThreadPtr &executor, ProxyInfoPtr &proxyinfo);
+    ~Connection();
+
+    void sendBuf(SendBuffer* buf);
+    void sendHB(bool isBinHB);
+    void connClose();
+    Status status() const { return status_; }
+    inline bool isStop() const { return status_ == kDisconnected; }
+    inline bool isConnected() const { return status_ == kConnected; }
+    int32_t getThreadId() const { return thread_id_; }
+    inline int32_t getWaitingSend() { return waiting_send_.get(); }
+    inline void decreaseWaiting()
+    {
+      if (waiting_send_.get() > 0)
+      {
+        waiting_send_.decrement();
+        executor_->waiting_send_.decrement();
+      }
+    }
+    std::string getRemoteInfo() const { return remote_info_; }
+    ProxyInfoPtr getBusInfo() const { return proxyinfo_; }
+
+    int32_t getAvgLoad();
+
+  private:
+    void doConnect(const asio::ip::tcp::endpoint &ep);
+    void doWrite(); //async send data
+    void doRead();
+    void connectHandler(const std::error_code &ec);
+    void setLocalInfo();
+    void doClose(const std::error_code *err = nullptr);
+    int32_t doParse();                         
+    bool parseProtocolAck(uint32_t total_len); 
+    bool parseBinaryAck(uint32_t total_len);   
+    bool parseBinaryHB(uint32_t total_len);
+    uint32_t parseAttr(char *attr, int32_t attr_len);
+  };
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_NET_CONNECTION_H_
\ No newline at end of file