You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/11/05 01:45:50 UTC

[incubator-doris] branch master updated: Added bprc stub cache check and reset api, used to test whether the bprc stub cache is available, and reset the bprc stub cache (#6916)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 760fc02  Added bprc stub cache check and reset api, used to test whether the bprc stub cache is available, and reset the bprc stub cache (#6916)
760fc02 is described below

commit 760fc02bfe3a0d0a978f1e1a2e3e6489968e51cf
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Fri Nov 5 09:45:37 2021 +0800

    Added bprc stub cache check and reset api, used to test whether the bprc stub cache is available, and reset the bprc stub cache (#6916)
    
    Added bprc stub cache check and reset api, used to test whether the bprc stub cache is available, and reset the bprc stub cache
    add a config used for auto check and reset bprc stub
---
 be/src/exec/tablet_sink.cpp                        |  40 ++++---
 be/src/exec/tablet_sink.h                          |  26 ++---
 be/src/exprs/runtime_filter_rpc.cpp                |  11 +-
 be/src/http/CMakeLists.txt                         |  23 +----
 be/src/http/action/check_rpc_channel_action.cpp    | 101 ++++++++++++++++++
 .../check_rpc_channel_action.h}                    |  25 ++---
 be/src/http/{ => action}/download_action.cpp       |   2 +-
 be/src/http/{ => action}/download_action.h         |   0
 be/src/http/{ => action}/monitor_action.cpp        |   2 +-
 be/src/http/{ => action}/monitor_action.h          |   0
 be/src/http/action/reset_rpc_channel_action.cpp    |  70 +++++++++++++
 .../reset_rpc_channel_action.h}                    |  25 ++---
 be/src/runtime/data_stream_sender.cpp              |  15 +--
 be/src/runtime/data_stream_sender.h                |  36 +++----
 be/src/runtime/routine_load/data_consumer.cpp      |  32 +++---
 be/src/runtime/runtime_filter_mgr.cpp              |  11 +-
 be/src/service/http_service.cpp                    |  33 ++++--
 be/src/service/internal_service.cpp                |  82 ++++++++++++++-
 be/src/service/internal_service.h                  |  10 ++
 be/src/util/brpc_stub_cache.cpp                    |   9 +-
 be/src/util/brpc_stub_cache.h                      | 115 ++++++++++++++++++---
 be/src/util/string_util.h                          |  67 +++++++++++-
 be/test/util/string_util_test.cpp                  | 111 ++++++++++++++++++++
 build.sh                                           |  40 +++++--
 docs/.vuepress/sidebar/en.js                       |   1 +
 docs/.vuepress/sidebar/zh-CN.js                    |   1 +
 docs/en/administrator-guide/config/be_config.md    |   6 ++
 .../http-actions/check-reset-rpc-cache.md          |  47 +++++++++
 docs/zh-CN/administrator-guide/config/be_config.md |   6 ++
 .../http-actions/check-reset-rpc-cache.md          |  46 +++++++++
 .../doris/planner/StreamLoadScanNodeTest.java      |  22 +---
 gensrc/proto/internal_service.proto                |  32 ++++++
 gensrc/proto/palo_internal_service.proto           |  47 ---------
 run-be-ut.sh                                       |  22 ++--
 34 files changed, 860 insertions(+), 256 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index b818918..c6eb456 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -31,8 +31,8 @@
 #include "util/brpc_stub_cache.h"
 #include "util/debug/sanitizer_scopes.h"
 #include "util/monotime.h"
-#include "util/time.h"
 #include "util/threadpool.h"
+#include "util/time.h"
 #include "util/uid_util.h"
 
 namespace doris {
@@ -75,7 +75,7 @@ Status NodeChannel::init(RuntimeState* state) {
     _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker.get()));
 
     _stub = state->exec_env()->brpc_stub_cache()->get_stub(_node_info->host, _node_info->brpc_port);
-    if (_stub == nullptr) {
+    if (!_stub) {
         LOG(WARNING) << "Get rpc stub failed, host=" << _node_info->host
                      << ", port=" << _node_info->brpc_port;
         _cancelled = true;
@@ -143,6 +143,10 @@ void NodeChannel::_cancel_with_msg(const std::string& msg) {
 Status NodeChannel::open_wait() {
     _open_closure->join();
     if (_open_closure->cntl.Failed()) {
+        if (!ExecEnv::GetInstance()->brpc_stub_cache()->available(_stub, _node_info->host,
+                                                                  _node_info->brpc_port)) {
+            ExecEnv::GetInstance()->brpc_stub_cache()->erase(_open_closure->cntl.remote_side());
+        }
         std::stringstream ss;
         ss << "failed to open tablet writer, error=" << berror(_open_closure->cntl.ErrorCode())
            << ", error_text=" << _open_closure->cntl.ErrorText();
@@ -164,7 +168,8 @@ Status NodeChannel::open_wait() {
     // add batch closure
     _add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create();
     _add_batch_closure->addFailedHandler([this]() {
-        _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), _add_batch_closure->cntl.ErrorText()));
+        _cancel_with_msg(
+                fmt::format("{}, err: {}", channel_info(), _add_batch_closure->cntl.ErrorText()));
     });
 
     _add_batch_closure->addSuccessHandler([this](const PTabletWriterAddBatchResult& result,
@@ -181,7 +186,8 @@ Status NodeChannel::open_wait() {
                 _add_batches_finished = true;
             }
         } else {
-            _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}", channel_info(), status.get_error_msg()));
+            _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}",
+                                         channel_info(), status.get_error_msg()));
         }
 
         if (result.has_execution_time_us()) {
@@ -341,7 +347,7 @@ int NodeChannel::try_send_and_fetch_status(std::unique_ptr<ThreadPoolToken>& thr
     }
     bool is_finished = true;
     if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 &&
-            _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) {
+        _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) {
         auto s = thread_pool_token->submit_func(std::bind(&NodeChannel::try_send_batch, this));
         if (!s.ok()) {
             _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
@@ -370,6 +376,10 @@ void NodeChannel::try_send_batch() {
     if (row_batch->num_rows() > 0) {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         row_batch->serialize(request.mutable_row_batch());
+        if (request.row_batch().ByteSizeLong() >= double(config::brpc_max_body_size) * 0.95f) {
+            LOG(WARNING) << "send batch too large, this rpc may failed. send size: "
+                         << request.row_batch().ByteSizeLong() << ", " << channel_info();
+        }
     }
 
     _add_batch_closure->reset();
@@ -398,8 +408,8 @@ void NodeChannel::try_send_batch() {
     }
 
     _add_batch_closure->set_in_flight();
-    _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
-                                   &_add_batch_closure->result, _add_batch_closure);
+    _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result,
+                                   _add_batch_closure);
 
     _next_packet_seq++;
     _last_patch_processed_finished = true;
@@ -676,7 +686,8 @@ Status OlapTableSink::open(RuntimeState* state) {
             return Status::InternalError(ss.str());
         }
     }
-    int32_t send_batch_parallelism = MIN(_send_batch_parallelism, config::max_send_batch_parallelism_per_job);
+    int32_t send_batch_parallelism =
+            MIN(_send_batch_parallelism, config::max_send_batch_parallelism_per_job);
     _send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token(
             ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
     RETURN_IF_ERROR(Thread::create(
@@ -776,7 +787,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
                             if (!s.ok()) {
                                 // 'status' will store the last non-ok status of all channels
                                 status = s;
-                                LOG(WARNING) << ch->channel_info() << ", close channel failed, err: " << s.get_error_msg();
+                                LOG(WARNING)
+                                        << ch->channel_info()
+                                        << ", close channel failed, err: " << s.get_error_msg();
                             }
                             ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns,
                                             &mem_exceeded_block_ns, &queue_push_lock_ns,
@@ -825,7 +838,8 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
         LOG(INFO) << ss.str();
     } else {
         for (auto channel : _channels) {
-            channel->for_each_node_channel([&status](NodeChannel* ch) { ch->cancel(status.get_error_msg()); });
+            channel->for_each_node_channel(
+                    [&status](NodeChannel* ch) { ch->cancel(status.get_error_msg()); });
         }
     }
 
@@ -947,7 +961,8 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap*
                 if (str_val->len > desc->type().MAX_STRING_LENGTH) {
                     ss << "the length of input is too long than schema. "
                        << "column_name: " << desc->col_name() << "; "
-                       << "first 128 bytes of input_str: [" << std::string(str_val->ptr, 128) << "] "
+                       << "first 128 bytes of input_str: [" << std::string(str_val->ptr, 128)
+                       << "] "
                        << "schema length: " << desc->type().MAX_STRING_LENGTH << "; "
                        << "actual length: " << str_val->len << "; ";
                     row_valid = false;
@@ -1010,7 +1025,8 @@ void OlapTableSink::_send_batch_process() {
         int running_channels_num = 0;
         for (auto index_channel : _channels) {
             index_channel->for_each_node_channel([&running_channels_num, this](NodeChannel* ch) {
-                running_channels_num += ch->try_send_and_fetch_status(this->_send_batch_thread_pool_token);
+                running_channels_num +=
+                        ch->try_send_and_fetch_status(this->_send_batch_thread_pool_token);
             });
         }
 
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 9b58548..216fdd6 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -137,8 +137,8 @@ public:
 
 private:
     brpc::CallId cid;
-    std::atomic<bool> _packet_in_flight{false};
-    std::atomic<bool> _is_last_rpc{false};
+    std::atomic<bool> _packet_in_flight {false};
+    std::atomic<bool> _is_last_rpc {false};
     std::function<void()> failed_handler;
     std::function<void(const T&, bool)> success_handler;
 };
@@ -227,19 +227,19 @@ private:
     MonotonicStopWatch _timeout_watch;
 
     // user cancel or get some errors
-    std::atomic<bool> _cancelled{false};
+    std::atomic<bool> _cancelled {false};
     SpinLock _cancel_msg_lock;
     std::string _cancel_msg = "";
 
     // send finished means the consumer thread which send the rpc can exit
-    std::atomic<bool> _send_finished{false};
+    std::atomic<bool> _send_finished {false};
 
     // add batches finished means the last rpc has be response, used to check whether this channel can be closed
-    std::atomic<bool> _add_batches_finished{false};
+    std::atomic<bool> _add_batches_finished {false};
 
-    std::atomic<bool> _last_patch_processed_finished{true};
+    std::atomic<bool> _last_patch_processed_finished {true};
 
-    bool _eos_is_produced{false}; // only for restricting producer behaviors
+    bool _eos_is_produced {false}; // only for restricting producer behaviors
 
     std::unique_ptr<RowDescriptor> _row_desc;
     int _batch_size = 0;
@@ -249,9 +249,9 @@ private:
     std::mutex _pending_batches_lock;
     using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, PTabletWriterAddBatchRequest>;
     std::queue<AddBatchReq> _pending_batches;
-    std::atomic<int> _pending_batches_num{0};
+    std::atomic<int> _pending_batches_num {0};
 
-    PBackendService_Stub* _stub = nullptr;
+    std::shared_ptr<PBackendService_Stub> _stub = nullptr;
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
     ReusableClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr;
 
@@ -259,10 +259,10 @@ private:
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
 
     AddBatchCounter _add_batch_counter;
-    std::atomic<int64_t> _serialize_batch_ns{0};
-    std::atomic<int64_t> _mem_exceeded_block_ns{0};
-    std::atomic<int64_t> _queue_push_lock_ns{0};
-    std::atomic<int64_t> _actual_consume_ns{0};
+    std::atomic<int64_t> _serialize_batch_ns {0};
+    std::atomic<int64_t> _mem_exceeded_block_ns {0};
+    std::atomic<int64_t> _queue_push_lock_ns {0};
+    std::atomic<int64_t> _actual_consume_ns {0};
 };
 
 class IndexChannel {
diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp
index f1e6e20..c20779d 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -39,7 +39,14 @@ struct IRuntimeFilter::rpc_context {
 Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) {
     DCHECK(is_producer());
     DCHECK(_rpc_context == nullptr);
-    PBackendService_Stub* stub = state->exec_env()->brpc_stub_cache()->get_stub(*addr);
+    std::shared_ptr<PBackendService_Stub> stub(
+            state->exec_env()->brpc_stub_cache()->get_stub(*addr));
+    if (!stub) {
+        std::string msg =
+                fmt::format("Get rpc stub failed, host={},  port=", addr->hostname, addr->port);
+        LOG(WARNING) << msg;
+        return Status::InternalError(msg);
+    }
     _rpc_context = std::make_shared<IRuntimeFilter::rpc_context>();
     void* data = nullptr;
     int len = 0;
@@ -86,6 +93,8 @@ Status IRuntimeFilter::join_rpc() {
         brpc::Join(_rpc_context->cid);
         if (_rpc_context->cntl.Failed()) {
             LOG(WARNING) << "runtimefilter rpc err:" << _rpc_context->cntl.ErrorText();
+            // reset stub cache
+            ExecEnv::GetInstance()->brpc_stub_cache()->erase(_rpc_context->cntl.remote_side());
         }
     }
     return Status::OK();
diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt
index eb42232..19e2211 100644
--- a/be/src/http/CMakeLists.txt
+++ b/be/src/http/CMakeLists.txt
@@ -22,7 +22,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/http")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/http")
 
 add_library(Webserver STATIC
-  download_action.cpp
   http_headers.cpp
   http_method.cpp
   http_request.cpp
@@ -30,12 +29,13 @@ add_library(Webserver STATIC
   http_status.cpp
   http_parser.cpp
   web_page_handler.cpp
-  monitor_action.cpp
   default_path_handlers.cpp
   utils.cpp
   ev_http_server.cpp
   http_client.cpp
+  action/download_action.cpp
   action/mini_load.cpp
+  action/monitor_action.cpp
   action/health_action.cpp
   action/tablet_migration_action.cpp
   action/tablets_info_action.cpp
@@ -50,21 +50,6 @@ add_library(Webserver STATIC
   action/meta_action.cpp
   action/compaction_action.cpp
   action/config_action.cpp
-  #  action/multi_start.cpp
-  #  action/multi_show.cpp
-  #  action/multi_commit.cpp
-  #  action/multi_unload.cpp
+  action/check_rpc_channel_action
+  action/reset_rpc_channel_action
 )
-
-# target_link_libraries(Webserver pthread dl Util)
-#ADD_BE_TEST(integer-array-test)
-#ADD_BE_TEST(runtime-profile-test)
-#ADD_BE_TEST(benchmark-test)
-#ADD_BE_TEST(decompress-test)
-#ADD_BE_TEST(metrics-test)
-#ADD_BE_TEST(debug-util-test)
-#ADD_BE_TEST(url-coding-test)
-#ADD_BE_TEST(thrift-util-test)
-#ADD_BE_TEST(bit-util-test)
-#ADD_BE_TEST(rle-test)
-##ADD_BE_TEST(perf-counters-test)
diff --git a/be/src/http/action/check_rpc_channel_action.cpp b/be/src/http/action/check_rpc_channel_action.cpp
new file mode 100644
index 0000000..a26031f
--- /dev/null
+++ b/be/src/http/action/check_rpc_channel_action.cpp
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/check_rpc_channel_action.h"
+
+#include <fmt/core.h>
+
+#include "gen_cpp/internal_service.pb.h"
+#include "http/http_channel.h"
+#include "http/http_request.h"
+#include "runtime/exec_env.h"
+#include "service/brpc.h"
+#include "util/brpc_stub_cache.h"
+#include "util/md5.h"
+
+namespace doris {
+CheckRPCChannelAction::CheckRPCChannelAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
+void CheckRPCChannelAction::handle(HttpRequest* req) {
+    std::string req_ip = req->param("ip");
+    std::string req_port = req->param("port");
+    std::string req_payload_size = req->param("payload_size");
+    uint64_t port = 0;
+    uint64_t payload_size = 0;
+    try {
+        port = std::stoull(req_port);
+        payload_size = std::stoull(req_payload_size);
+        if (port > 65535) {
+            HttpChannel::send_reply(
+                    req, HttpStatus::INTERNAL_SERVER_ERROR,
+                    fmt::format("invalid argument port, should between 0-65535, actrual is {0}",
+                                req_port));
+            return;
+        }
+        if (payload_size > (10 * 2 << 20) /* 10M */ || payload_size == 0) {
+            HttpChannel::send_reply(
+                    req, HttpStatus::INTERNAL_SERVER_ERROR,
+                    fmt::format(
+                            "invalid argument payload_size, should between 1-10M, actrual is {0}",
+                            req_payload_size));
+            return;
+        }
+    } catch (const std::exception& e) {
+        std::string err = fmt::format("invalid argument. port:{0}, payload_size: {1}", req_port,
+                                      req_payload_size);
+        LOG(WARNING) << err;
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, err);
+        return;
+    }
+    PCheckRPCChannelRequest request;
+    PCheckRPCChannelResponse response;
+    brpc::Controller cntl;
+    std::string* buf = request.mutable_data();
+    buf->resize(payload_size);
+    request.set_size(payload_size);
+    Md5Digest digest;
+    digest.update(static_cast<const void*>(buf->c_str()), payload_size);
+    digest.digest();
+    request.set_md5(digest.hex());
+    std::shared_ptr<PBackendService_Stub> stub(
+            _exec_env->brpc_stub_cache()->get_stub(req_ip, port));
+    if (!stub) {
+        HttpChannel::send_reply(
+                req, HttpStatus::INTERNAL_SERVER_ERROR,
+                fmt::format("cannot find valid connection to {0}:{1}.", req_ip, req_port));
+        return;
+    }
+    stub->check_rpc_channel(&cntl, &request, &response, nullptr);
+    if (cntl.Failed()) {
+        std::string err = fmt::format("open brpc connection to {0}:{1} failed: {2}", req_ip,
+                                      req_port, cntl.ErrorText());
+        LOG(WARNING) << err;
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, err);
+        return;
+    }
+    if (response.status().status_code() == 0) {
+        std::string err =
+                fmt::format("open brpc connection to {0}:{1} succcess.", req_ip, req_port);
+        LOG(WARNING) << err;
+        HttpChannel::send_reply(req, HttpStatus::OK, err);
+    } else {
+        std::string err = fmt::format("open brpc connection to {0}:{1} failed.", req_ip, req_port);
+        LOG(WARNING) << err;
+        HttpChannel::send_reply(req, HttpStatus::OK, err);
+    }
+}
+
+} // namespace doris
diff --git a/be/src/http/monitor_action.h b/be/src/http/action/check_rpc_channel_action.h
similarity index 67%
copy from be/src/http/monitor_action.h
copy to be/src/http/action/check_rpc_channel_action.h
index f51f51c..d8b456c 100644
--- a/be/src/http/monitor_action.h
+++ b/be/src/http/action/check_rpc_channel_action.h
@@ -15,34 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_COMMON_UTIL_MONITOR_ACTION_H
-#define DORIS_BE_SRC_COMMON_UTIL_MONITOR_ACTION_H
-
-#include <map>
-#include <string>
+#pragma once
 
 #include "http/http_handler.h"
 
 namespace doris {
-
-class HttpRequest;
-class HttpChannel;
-class RestMonitorIface;
-
-class MonitorAction : public HttpHandler {
+class ExecEnv;
+class CheckRPCChannelAction : public HttpHandler {
 public:
-    MonitorAction();
+    explicit CheckRPCChannelAction(ExecEnv* exec_env);
 
-    virtual ~MonitorAction() {}
-
-    void register_module(const std::string& name, RestMonitorIface* module);
+    virtual ~CheckRPCChannelAction() {}
 
     void handle(HttpRequest* req) override;
 
 private:
-    std::map<std::string, RestMonitorIface*> _module_by_name;
+    ExecEnv* _exec_env;
 };
-
 } // namespace doris
-
-#endif
diff --git a/be/src/http/download_action.cpp b/be/src/http/action/download_action.cpp
similarity index 99%
rename from be/src/http/download_action.cpp
rename to be/src/http/action/download_action.cpp
index fb89485..a3816a6 100644
--- a/be/src/http/download_action.cpp
+++ b/be/src/http/action/download_action.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "http/download_action.h"
+#include "http/action/download_action.h"
 
 #include <sys/types.h>
 #include <unistd.h>
diff --git a/be/src/http/download_action.h b/be/src/http/action/download_action.h
similarity index 100%
rename from be/src/http/download_action.h
rename to be/src/http/action/download_action.h
diff --git a/be/src/http/monitor_action.cpp b/be/src/http/action/monitor_action.cpp
similarity index 98%
rename from be/src/http/monitor_action.cpp
rename to be/src/http/action/monitor_action.cpp
index 0b6c698..138582b 100644
--- a/be/src/http/monitor_action.cpp
+++ b/be/src/http/action/monitor_action.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "http/monitor_action.h"
+#include "http/action/monitor_action.h"
 
 #include <sstream>
 
diff --git a/be/src/http/monitor_action.h b/be/src/http/action/monitor_action.h
similarity index 100%
copy from be/src/http/monitor_action.h
copy to be/src/http/action/monitor_action.h
diff --git a/be/src/http/action/reset_rpc_channel_action.cpp b/be/src/http/action/reset_rpc_channel_action.cpp
new file mode 100644
index 0000000..38e4a7e
--- /dev/null
+++ b/be/src/http/action/reset_rpc_channel_action.cpp
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/reset_rpc_channel_action.h"
+
+#include <fmt/core.h>
+
+#include "http/http_channel.h"
+#include "http/http_request.h"
+#include "runtime/exec_env.h"
+#include "util/brpc_stub_cache.h"
+#include "util/string_util.h"
+
+namespace doris {
+ResetRPCChannelAction::ResetRPCChannelAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
+void ResetRPCChannelAction::handle(HttpRequest* req) {
+    std::string endpoints = req->param("endpoints");
+    if (iequal(endpoints, "all")) {
+        int size = _exec_env->brpc_stub_cache()->size();
+        if (size > 0) {
+            std::vector<std::string> endpoints;
+            _exec_env->brpc_stub_cache()->get_all(&endpoints);
+            _exec_env->brpc_stub_cache()->clear();
+            HttpChannel::send_reply(req, HttpStatus::OK,
+                                    fmt::format("reseted: {0}", join(endpoints, ",")));
+            return;
+        } else {
+            HttpChannel::send_reply(req, HttpStatus::OK, "no cached channel.");
+            return;
+        }
+    } else {
+        std::vector<std::string> reseted;
+        for (const std::string& endpoint : split(endpoints, ",")) {
+            if (!_exec_env->brpc_stub_cache()->exist(endpoint)) {
+                std::string err = fmt::format("{0}: not found.", endpoint);
+                LOG(WARNING) << err;
+                HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, err);
+                return;
+            }
+
+            if (_exec_env->brpc_stub_cache()->erase(endpoint)) {
+                reseted.push_back(endpoint);
+            } else {
+                std::string err = fmt::format("{0}: reset failed.", endpoint);
+                LOG(WARNING) << err;
+                HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, err);
+                return;
+            }
+        }
+        HttpChannel::send_reply(req, HttpStatus::OK,
+                                fmt::format("reseted: {0}", join(reseted, ",")));
+        return;
+    }
+}
+
+} // namespace doris
diff --git a/be/src/http/monitor_action.h b/be/src/http/action/reset_rpc_channel_action.h
similarity index 67%
rename from be/src/http/monitor_action.h
rename to be/src/http/action/reset_rpc_channel_action.h
index f51f51c..912a324 100644
--- a/be/src/http/monitor_action.h
+++ b/be/src/http/action/reset_rpc_channel_action.h
@@ -15,34 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_COMMON_UTIL_MONITOR_ACTION_H
-#define DORIS_BE_SRC_COMMON_UTIL_MONITOR_ACTION_H
-
-#include <map>
-#include <string>
+#pragma once
 
 #include "http/http_handler.h"
 
 namespace doris {
-
-class HttpRequest;
-class HttpChannel;
-class RestMonitorIface;
-
-class MonitorAction : public HttpHandler {
+class ExecEnv;
+class ResetRPCChannelAction : public HttpHandler {
 public:
-    MonitorAction();
+    explicit ResetRPCChannelAction(ExecEnv* exec_env);
 
-    virtual ~MonitorAction() {}
-
-    void register_module(const std::string& name, RestMonitorIface* module);
+    virtual ~ResetRPCChannelAction() {}
 
     void handle(HttpRequest* req) override;
 
 private:
-    std::map<std::string, RestMonitorIface*> _module_by_name;
+    ExecEnv* _exec_env;
 };
-
 } // namespace doris
-
-#endif
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index ceda653..d651159 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -105,17 +105,20 @@ Status DataStreamSender::Channel::init(RuntimeState* state) {
     _brpc_request.set_be_number(_be_number);
 
     _brpc_timeout_ms = std::min(3600, state->query_options().query_timeout) * 1000;
-    if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
-        _brpc_stub =
-                state->exec_env()->brpc_stub_cache()->get_stub("127.0.0.1", _brpc_dest_addr.port);
-    } else {
-        _brpc_stub = state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr);
-    }
 
     // In bucket shuffle join will set fragment_instance_id (-1, -1)
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
     _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1);
+    if (_need_close) {
+        _brpc_stub = state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr);
+        if (!_brpc_stub) {
+            std::string msg = fmt::format("Get rpc stub failed, dest_addr={}:{}",
+                                          _brpc_dest_addr.hostname, _brpc_dest_addr.port);
+            LOG(WARNING) << msg;
+            return Status::InternalError(msg);
+        }
+    }
     return Status::OK();
 }
 
diff --git a/be/src/runtime/data_stream_sender.h b/be/src/runtime/data_stream_sender.h
index 7193a14..71bc41f 100644
--- a/be/src/runtime/data_stream_sender.h
+++ b/be/src/runtime/data_stream_sender.h
@@ -25,16 +25,16 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "exec/data_sink.h"
-#include "gen_cpp/data.pb.h" // for PRowBatch
 #include "gen_cpp/BackendService.h"
-#include "gen_cpp/Types_types.h"
 #include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/Types_types.h"
+#include "gen_cpp/data.pb.h" // for PRowBatch
 #include "gen_cpp/internal_service.pb.h"
-#include "util/runtime_profile.h"
+#include "service/backend_options.h"
+#include "service/brpc.h"
 #include "util/ref_count_closure.h"
+#include "util/runtime_profile.h"
 #include "util/uid_util.h"
-#include "service/brpc.h"
-#include "service/backend_options.h"
 
 namespace doris {
 
@@ -165,18 +165,18 @@ protected:
         bool is_local() { return _is_local; }
 
         inline Status _wait_last_brpc() {
-                if (_closure == nullptr) return Status::OK();
-                auto cntl = &_closure->cntl;
-                brpc::Join(cntl->call_id());
-                if (cntl->Failed()) {
-                    std::stringstream ss;
-                    ss << "failed to send brpc batch, error=" << berror(cntl->ErrorCode())
-                       << ", error_text=" << cntl->ErrorText()
-                       << ", client: " << BackendOptions::get_localhost();
-                    LOG(WARNING) << ss.str();
-                    return Status::ThriftRpcError(ss.str());
-                }
-                return Status::OK();
+            if (_closure == nullptr) return Status::OK();
+            auto cntl = &_closure->cntl;
+            brpc::Join(cntl->call_id());
+            if (cntl->Failed()) {
+                std::stringstream ss;
+                ss << "failed to send brpc batch, error=" << berror(cntl->ErrorCode())
+                   << ", error_text=" << cntl->ErrorText()
+                   << ", client: " << BackendOptions::get_localhost();
+                LOG(WARNING) << ss.str();
+                return Status::ThriftRpcError(ss.str());
+            }
+            return Status::OK();
         }
         // Serialize _batch into _thrift_batch and send via send_batch().
         // Returns send_batch() status.
@@ -206,7 +206,7 @@ protected:
         PUniqueId _finst_id;
         PRowBatch _pb_batch;
         PTransmitDataParams _brpc_request;
-        PBackendService_Stub* _brpc_stub = nullptr;
+        std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
         RefCountClosure<PTransmitDataResult>* _closure = nullptr;
         int32_t _brpc_timeout_ms = 500;
         // whether the dest can be treated as query statistics transfer chain.
diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp
index c387b07..8d4fcdc 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -46,8 +46,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
     RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 
     // conf has to be deleted finally
-    Defer delete_conf{[conf]() { delete conf; }};
-
+    Defer delete_conf {[conf]() { delete conf; }};
 
     std::string errstr;
     auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
@@ -80,7 +79,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
     RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
 
     for (auto& item : ctx->kafka_info->properties) {
-        if (boost::algorithm::starts_with(item.second, "FILE:")) {
+        if (starts_with(item.second, "FILE:")) {
             // file property should has format: FILE:file_id:md5
             std::vector<std::string> parts =
                     strings::Split(item.second, ":", strings::SkipWhitespace());
@@ -154,7 +153,7 @@ Status KafkaDataConsumer::assign_topic_partitions(
               << " assign topic partitions: " << topic << ", " << ss.str();
 
     // delete TopicPartition finally
-    Defer delete_tp{[&topic_partitions]() {
+    Defer delete_tp {[&topic_partitions]() {
         std::for_each(topic_partitions.begin(), topic_partitions.end(),
                       [](RdKafka::TopicPartition* tp1) { delete tp1; });
     }};
@@ -244,7 +243,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
 Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
     // create topic conf
     RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
-    Defer delete_conf{[tconf]() { delete tconf; }};
+    Defer delete_conf {[tconf]() { delete tconf; }};
 
     // create topic
     std::string errstr;
@@ -256,7 +255,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
         return Status::InternalError(ss.str());
     }
 
-    Defer delete_topic{[topic]() { delete topic; }};
+    Defer delete_topic {[topic]() { delete topic; }};
 
     // get topic metadata
     RdKafka::Metadata* metadata = nullptr;
@@ -269,7 +268,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
         return Status::InternalError(ss.str());
     }
 
-    Defer delete_meta{[metadata]() { delete metadata; }};
+    Defer delete_meta {[metadata]() { delete metadata; }};
 
     // get partition ids
     RdKafka::Metadata::TopicMetadataIterator it;
@@ -310,7 +309,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
 // corresponding partition.
 // See librdkafka/rdkafkacpp.h##offsetsForTimes()
 Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times,
-        std::vector<PIntegerPair>* offsets) {
+                                                std::vector<PIntegerPair>* offsets) {
     // create topic partition
     std::vector<RdKafka::TopicPartition*> topic_partitions;
     for (const auto& entry : times) {
@@ -319,7 +318,7 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>&
         topic_partitions.push_back(tp1);
     }
     // delete TopicPartition finally
-    Defer delete_tp{[&topic_partitions]() {
+    Defer delete_tp {[&topic_partitions]() {
         std::for_each(topic_partitions.begin(), topic_partitions.end(),
                       [](RdKafka::TopicPartition* tp1) { delete tp1; });
     }};
@@ -344,19 +343,21 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>&
 }
 
 // get latest offsets for given partitions
-Status KafkaDataConsumer::get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids,
-        std::vector<PIntegerPair>* offsets) {
+Status KafkaDataConsumer::get_latest_offsets_for_partitions(
+        const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets) {
     for (int32_t partition_id : partition_ids) {
         int64_t low = 0;
         int64_t high = 0;
-        RdKafka::ErrorCode err = _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, 5000);
+        RdKafka::ErrorCode err =
+                _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, 5000);
         if (err != RdKafka::ERR_NO_ERROR) {
             std::stringstream ss;
-            ss << "failed to get latest offset for partition: " << partition_id << ", err: " << RdKafka::err2str(err);
+            ss << "failed to get latest offset for partition: " << partition_id
+               << ", err: " << RdKafka::err2str(err);
             LOG(WARNING) << ss.str();
             return Status::InternalError(ss.str());
         }
-        
+
         PIntegerPair pair;
         pair.set_key(partition_id);
         pair.set_val(high);
@@ -412,7 +413,8 @@ bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
         return false;
     }
     for (auto& item : ctx->kafka_info->properties) {
-        std::unordered_map<std::string, std::string>::const_iterator itr =_custom_properties.find(item.first);
+        std::unordered_map<std::string, std::string>::const_iterator itr =
+                _custom_properties.find(item.first);
         if (itr == _custom_properties.end()) {
             return false;
         }
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 64f7501..5915bc2 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -229,12 +229,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
             request_fragment_id->set_hi(targets[i].target_fragment_instance_id.hi);
             request_fragment_id->set_lo(targets[i].target_fragment_instance_id.lo);
 
-            PBackendService_Stub* stub = ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
-                    targets[i].target_fragment_instance_addr);
+            std::shared_ptr<PBackendService_Stub> stub(
+                    ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
+                            targets[i].target_fragment_instance_addr));
             VLOG_NOTICE << "send filter " << rpc_contexts[i]->request.filter_id()
-                      << " to:" << targets[i].target_fragment_instance_addr.hostname << ":"
-                      << targets[i].target_fragment_instance_addr.port
-                      << rpc_contexts[i]->request.ShortDebugString();
+                        << " to:" << targets[i].target_fragment_instance_addr.hostname << ":"
+                        << targets[i].target_fragment_instance_addr.port
+                        << rpc_contexts[i]->request.ShortDebugString();
             if (stub == nullptr) {
                 rpc_contexts.pop_back();
                 continue;
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 1b7c603..e19427f 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -17,26 +17,28 @@
 
 #include "service/http_service.h"
 
+#include "http/action/check_rpc_channel_action.h"
 #include "http/action/checksum_action.h"
 #include "http/action/compaction_action.h"
+#include "http/action/config_action.h"
+#include "http/action/download_action.h"
 #include "http/action/health_action.h"
 #include "http/action/meta_action.h"
 #include "http/action/metrics_action.h"
 #include "http/action/mini_load.h"
+#include "http/action/monitor_action.h"
 #include "http/action/pprof_actions.h"
 #include "http/action/reload_tablet_action.h"
+#include "http/action/reset_rpc_channel_action.h"
 #include "http/action/restore_tablet_action.h"
 #include "http/action/snapshot_action.h"
 #include "http/action/stream_load.h"
-#include "http/action/tablets_distribution_action.h"
 #include "http/action/tablet_migration_action.h"
+#include "http/action/tablets_distribution_action.h"
 #include "http/action/tablets_info_action.h"
-#include "http/action/config_action.h"
 #include "http/default_path_handlers.h"
-#include "http/download_action.h"
 #include "http/ev_http_server.h"
 #include "http/http_method.h"
-#include "http/monitor_action.h"
 #include "http/web_page_handler.h"
 #include "runtime/exec_env.h"
 #include "runtime/load_path_mgr.h"
@@ -92,12 +94,15 @@ Status HttpService::start() {
     _ev_http_server->register_handler(HttpMethod::GET, "/tablets_json", tablets_info_action);
 
     // Register Tablets Distribution action
-    TabletsDistributionAction* tablets_distribution_action = _pool.add(new TabletsDistributionAction());
-    _ev_http_server->register_handler(HttpMethod::GET, "/api/tablets_distribution", tablets_distribution_action);
+    TabletsDistributionAction* tablets_distribution_action =
+            _pool.add(new TabletsDistributionAction());
+    _ev_http_server->register_handler(HttpMethod::GET, "/api/tablets_distribution",
+                                      tablets_distribution_action);
 
     // Register tablet migration action
     TabletMigrationAction* tablet_migration_action = _pool.add(new TabletMigrationAction());
-    _ev_http_server->register_handler(HttpMethod::GET, "/api/tablet_migration", tablet_migration_action);
+    _ev_http_server->register_handler(HttpMethod::GET, "/api/tablet_migration",
+                                      tablet_migration_action);
 
     // register pprof actions
     PprofActions::setup(_env, _ev_http_server.get(), _pool);
@@ -144,14 +149,22 @@ Status HttpService::start() {
     _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status",
                                       run_status_compaction_action);
 
-    ConfigAction* update_config_action = 
+    ConfigAction* update_config_action =
             _pool.add(new ConfigAction(ConfigActionType::UPDATE_CONFIG));
     _ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action);
 
-    ConfigAction* show_config_action = 
-            _pool.add(new ConfigAction(ConfigActionType::SHOW_CONFIG));
+    ConfigAction* show_config_action = _pool.add(new ConfigAction(ConfigActionType::SHOW_CONFIG));
     _ev_http_server->register_handler(HttpMethod::GET, "/api/show_config", show_config_action);
 
+    // 3 check action
+    CheckRPCChannelAction* check_rpc_channel_action = _pool.add(new CheckRPCChannelAction(_env));
+    _ev_http_server->register_handler(HttpMethod::GET,
+                                      "/api/check_rpc_channel/{ip}/{port}/{payload_size}",
+                                      check_rpc_channel_action);
+
+    ResetRPCChannelAction* reset_rpc_channel_action = _pool.add(new ResetRPCChannelAction(_env));
+    _ev_http_server->register_handler(HttpMethod::GET, "/api/reset_rpc_channel/{endpoints}",
+                                      reset_rpc_channel_action);
     _ev_http_server->start();
     return Status::OK();
 }
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 93dc6b1..837a910 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -30,6 +30,9 @@
 #include "runtime/routine_load/routine_load_task_executor.h"
 #include "runtime/runtime_state.h"
 #include "service/brpc.h"
+#include "util/brpc_stub_cache.h"
+#include "util/md5.h"
+#include "util/string_util.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
 
@@ -212,9 +215,9 @@ void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
         if (!kafka_request.partition_id_for_latest_offsets().empty()) {
             // get latest offsets for specified partition ids
             std::vector<PIntegerPair> partition_offsets;
-            Status st =
-                    _exec_env->routine_load_task_executor()->get_kafka_latest_offsets_for_partitions(
-                            request->kafka_meta_request(), &partition_offsets);
+            Status st = _exec_env->routine_load_task_executor()
+                                ->get_kafka_latest_offsets_for_partitions(
+                                        request->kafka_meta_request(), &partition_offsets);
             if (st.ok()) {
                 PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
                 for (const auto& entry : partition_offsets) {
@@ -422,6 +425,79 @@ void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn
     }
 }
 
+template <typename T>
+void PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController* controller,
+                                                const PCheckRPCChannelRequest* request,
+                                                PCheckRPCChannelResponse* response,
+                                                google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    response->mutable_status()->set_status_code(0);
+    if (request->data().size() != request->size()) {
+        std::stringstream ss;
+        ss << "data size not same, expected: " << request->size()
+           << ", actrual: " << request->data().size();
+        response->mutable_status()->add_error_msgs(ss.str());
+        response->mutable_status()->set_status_code(1);
+
+    } else {
+        Md5Digest digest;
+        digest.update(static_cast<const void*>(request->data().c_str()), request->data().size());
+        digest.digest();
+        if (!iequal(digest.hex(), request->md5())) {
+            std::stringstream ss;
+            ss << "md5 not same, expected: " << request->md5() << ", actrual: " << digest.hex();
+            response->mutable_status()->add_error_msgs(ss.str());
+            response->mutable_status()->set_status_code(1);
+        }
+    }
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController* controller,
+                                                const PResetRPCChannelRequest* request,
+                                                PResetRPCChannelResponse* response,
+                                                google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    response->mutable_status()->set_status_code(0);
+    if (request->all()) {
+        int size = ExecEnv::GetInstance()->brpc_stub_cache()->size();
+        if (size > 0) {
+            std::vector<std::string> endpoints;
+            ExecEnv::GetInstance()->brpc_stub_cache()->get_all(&endpoints);
+            ExecEnv::GetInstance()->brpc_stub_cache()->clear();
+            *response->mutable_channels() = {endpoints.begin(), endpoints.end()};
+        }
+    } else {
+        for (const std::string& endpoint : request->endpoints()) {
+            if (!ExecEnv::GetInstance()->brpc_stub_cache()->exist(endpoint)) {
+                response->mutable_status()->add_error_msgs(endpoint + ": not found.");
+                continue;
+            }
+
+            if (ExecEnv::GetInstance()->brpc_stub_cache()->erase(endpoint)) {
+                response->add_channels(endpoint);
+            } else {
+                response->mutable_status()->add_error_msgs(endpoint + ": reset failed.");
+            }
+        }
+        if (request->endpoints_size() != response->channels_size()) {
+            response->mutable_status()->set_status_code(1);
+        }
+    }
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_base,
+                                         const PHandShakeRequest* request,
+                                         PHandShakeResponse* response,
+                                         google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    if (request->has_hello()) {
+        response->set_hello(request->hello());
+    }
+    response->mutable_status()->set_status_code(0);
+}
+
 template class PInternalServiceImpl<PBackendService>;
 
 } // namespace doris
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index f68243d..702bec1 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -105,6 +105,16 @@ public:
     void fold_constant_expr(google::protobuf::RpcController* controller,
                             const PConstantExprRequest* request, PConstantExprResult* response,
                             google::protobuf::Closure* done) override;
+    void check_rpc_channel(google::protobuf::RpcController* controller,
+                           const PCheckRPCChannelRequest* request,
+                           PCheckRPCChannelResponse* response,
+                           google::protobuf::Closure* done) override;
+    void reset_rpc_channel(google::protobuf::RpcController* controller,
+                           const PResetRPCChannelRequest* request,
+                           PResetRPCChannelResponse* response,
+                           google::protobuf::Closure* done) override;
+    void hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request,
+                    PHandShakeResponse* response, google::protobuf::Closure* done) override;
 
 private:
     Status _exec_plan_fragment(const std::string& s_request, bool compact);
diff --git a/be/src/util/brpc_stub_cache.cpp b/be/src/util/brpc_stub_cache.cpp
index 1e4e610..b62f34a 100644
--- a/be/src/util/brpc_stub_cache.cpp
+++ b/be/src/util/brpc_stub_cache.cpp
@@ -22,17 +22,10 @@ namespace doris {
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT);
 
 BrpcStubCache::BrpcStubCache() {
-    _stub_map.init(239);
-    REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() {
-        std::lock_guard<SpinLock> l(_lock);
-        return _stub_map.size();
-    });
+    REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); });
 }
 
 BrpcStubCache::~BrpcStubCache() {
     DEREGISTER_HOOK_METRIC(brpc_endpoint_stub_count);
-    for (auto& stub : _stub_map) {
-        delete stub.second;
-    }
 }
 } // namespace doris
diff --git a/be/src/util/brpc_stub_cache.h b/be/src/util/brpc_stub_cache.h
index 940883d..53ee3b7 100644
--- a/be/src/util/brpc_stub_cache.h
+++ b/be/src/util/brpc_stub_cache.h
@@ -17,28 +17,42 @@
 
 #pragma once
 
+#include <parallel_hashmap/phmap.h>
+
 #include <memory>
 #include <mutex>
 
+#include "common/config.h"
 #include "gen_cpp/Types_types.h" // TNetworkAddress
 #include "gen_cpp/internal_service.pb.h"
 #include "service/brpc.h"
 #include "util/doris_metrics.h"
-#include "util/spinlock.h"
 
+namespace std {
+template <>
+struct hash<butil::EndPoint> {
+    std::size_t operator()(butil::EndPoint const& p) const {
+        return phmap::HashState().combine(0, butil::ip2int(p.ip), p.port);
+    }
+};
+} // namespace std
+using SubMap = phmap::parallel_flat_hash_map<
+        butil::EndPoint, std::shared_ptr<doris::PBackendService_Stub>, std::hash<butil::EndPoint>,
+        std::equal_to<butil::EndPoint>,
+        std::allocator<
+                std::pair<const butil::EndPoint, std::shared_ptr<doris::PBackendService_Stub>>>,
+        8, std::mutex>;
 namespace doris {
 
-// map used
 class BrpcStubCache {
 public:
     BrpcStubCache();
-    virtual ~BrpcStubCache();
+    ~BrpcStubCache();
 
-    virtual PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) {
-        std::lock_guard<SpinLock> l(_lock);
-        auto stub_ptr = _stub_map.seek(endpoint);
-        if (stub_ptr != nullptr) {
-            return *stub_ptr;
+    inline std::shared_ptr<PBackendService_Stub> get_stub(const butil::EndPoint& endpoint) {
+        auto stub_ptr = _stub_map.find(endpoint);
+        if (LIKELY(stub_ptr != _stub_map.end())) {
+            return stub_ptr->second;
         }
         // new one stub and insert into map
         brpc::ChannelOptions options;
@@ -46,33 +60,100 @@ public:
         if (channel->Init(endpoint, &options)) {
             return nullptr;
         }
-        auto stub = new PBackendService_Stub(channel.release(),
-                                             google::protobuf::Service::STUB_OWNS_CHANNEL);
-        _stub_map.insert(endpoint, stub);
+        auto stub = std::make_shared<PBackendService_Stub>(
+                channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
+        _stub_map[endpoint] = stub;
         return stub;
     }
 
-    virtual PBackendService_Stub* get_stub(const TNetworkAddress& taddr) {
+    inline std::shared_ptr<PBackendService_Stub> get_stub(const TNetworkAddress& taddr) {
         butil::EndPoint endpoint;
         if (str2endpoint(taddr.hostname.c_str(), taddr.port, &endpoint)) {
-            LOG(WARNING) << "unknown endpoint, hostname=" << taddr.hostname;
+            LOG(WARNING) << "unknown endpoint, hostname=" << taddr.hostname
+                         << ", port=" << taddr.port;
             return nullptr;
         }
         return get_stub(endpoint);
     }
 
-    virtual PBackendService_Stub* get_stub(const std::string& host, int port) {
+    inline std::shared_ptr<PBackendService_Stub> get_stub(const std::string& host, int port) {
         butil::EndPoint endpoint;
         if (str2endpoint(host.c_str(), port, &endpoint)) {
-            LOG(WARNING) << "unknown endpoint, hostname=" << host;
+            LOG(WARNING) << "unknown endpoint, hostname=" << host << ", port=" << port;
             return nullptr;
         }
         return get_stub(endpoint);
     }
 
+    inline size_t size() { return _stub_map.size(); }
+
+    inline void clear() { _stub_map.clear(); }
+
+    inline size_t erase(const std::string& host_port) {
+        butil::EndPoint endpoint;
+        if (str2endpoint(host_port.c_str(), &endpoint)) {
+            LOG(WARNING) << "unknown endpoint: " << host_port;
+            return 0;
+        }
+        return erase(endpoint);
+    }
+
+    size_t erase(const std::string& host, int port) {
+        butil::EndPoint endpoint;
+        if (str2endpoint(host.c_str(), port, &endpoint)) {
+            LOG(WARNING) << "unknown endpoint, hostname=" << host << ", port=" << port;
+            return 0;
+        }
+        return erase(endpoint);
+    }
+
+    inline size_t erase(const butil::EndPoint& endpoint) { return _stub_map.erase(endpoint); }
+
+    inline bool exist(const std::string& host_port) {
+        butil::EndPoint endpoint;
+        if (str2endpoint(host_port.c_str(), &endpoint)) {
+            LOG(WARNING) << "unknown endpoint: " << host_port;
+            return false;
+        }
+        return _stub_map.find(endpoint) != _stub_map.end();
+    }
+
+    inline void get_all(std::vector<std::string>* endpoints) {
+        for (SubMap::const_iterator it = _stub_map.begin(); it != _stub_map.end(); ++it) {
+            endpoints->emplace_back(endpoint2str(it->first).c_str());
+        }
+    }
+
+    inline bool available(std::shared_ptr<PBackendService_Stub> stub,
+                          const butil::EndPoint& endpoint) {
+        if (!stub) {
+            return false;
+        }
+        PHandShakeRequest request;
+        PHandShakeResponse response;
+        brpc::Controller cntl;
+        stub->hand_shake(&cntl, &request, &response, nullptr);
+        if (!cntl.Failed()) {
+            return true;
+        } else {
+            LOG(WARNING) << "open brpc connection to  " << endpoint2str(endpoint).c_str()
+                         << " failed: " << cntl.ErrorText();
+            return false;
+        }
+    }
+
+    inline bool available(std::shared_ptr<PBackendService_Stub> stub, const std::string& host,
+                          int port) {
+        butil::EndPoint endpoint;
+        if (str2endpoint(host.c_str(), port, &endpoint)) {
+            LOG(WARNING) << "unknown endpoint, hostname=" << host;
+            return false;
+        }
+        return available(stub, endpoint);
+    }
+
 private:
-    SpinLock _lock;
-    butil::FlatMap<butil::EndPoint, PBackendService_Stub*> _stub_map;
+    SubMap _stub_map;
 };
 
 } // namespace doris
diff --git a/be/src/util/string_util.h b/be/src/util/string_util.h
index 5f15ed4..d5c7fe8 100644
--- a/be/src/util/string_util.h
+++ b/be/src/util/string_util.h
@@ -17,10 +17,12 @@
 
 #pragma once
 
-#include <boost/algorithm/string.hpp>
-#include <boost/algorithm/string/case_conv.hpp> // to_lower_copy
+#include <strings.h>
+
+#include <algorithm>
 #include <map>
 #include <set>
+#include <sstream>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -28,10 +30,69 @@
 
 namespace doris {
 
+inline std::string to_lower(const std::string& input) {
+    std::string output;
+    output.resize(input.size());
+    std::transform(input.begin(), input.end(), output.begin(),
+                   [](unsigned char c) { return std::tolower(c); });
+    return output;
+}
+
+inline std::string to_upper(const std::string& input) {
+    std::string output;
+    output.resize(input.size());
+    std::transform(input.begin(), input.end(), output.begin(),
+                   [](unsigned char c) { return std::toupper(c); });
+    return output;
+}
+
+inline bool iequal(const std::string& lhs, const std::string& rhs) {
+    if (lhs.size() != rhs.size()) {
+        return false;
+    }
+    return to_lower(lhs) == to_lower(rhs);
+}
+
+inline bool starts_with(const std::string& value, const std::string& beginning) {
+    return value.find(beginning) == 0;
+}
+
+inline bool ends_with(std::string const& value, std::string const& ending) {
+    if (ending.size() > value.size()) {
+        return false;
+    }
+    return std::equal(ending.rbegin(), ending.rend(), value.rbegin());
+}
+
+inline std::vector<std::string> split(const std::string& s, const std::string& delim) {
+    std::vector<std::string> out;
+    size_t pos {};
+
+    for (size_t find = 0; (find = s.find(delim, pos)) != std::string::npos;
+         pos = find + delim.size()) {
+        out.emplace_back(s.data() + pos, s.data() + find);
+    }
+
+    out.emplace_back(s.data() + pos, s.data() + s.size());
+    return out;
+}
+
+template <typename T>
+inline std::string join(const std::vector<T>& elems, const std::string& delim) {
+    std::stringstream ss;
+    for (size_t i = 0; i < elems.size(); ++i) {
+        if (i != 0) {
+            ss << delim.c_str();
+        }
+        ss << elems[i];
+    }
+    return ss.str();
+}
+
 struct StringCaseHasher {
 public:
     std::size_t operator()(const std::string& value) const {
-        std::string lower_value = boost::algorithm::to_lower_copy(value);
+        std::string lower_value = to_lower(value);
         return std::hash<std::string>()(lower_value);
     }
 };
diff --git a/be/test/util/string_util_test.cpp b/be/test/util/string_util_test.cpp
index 88cb88a..7d26a8e 100644
--- a/be/test/util/string_util_test.cpp
+++ b/be/test/util/string_util_test.cpp
@@ -31,6 +31,117 @@ public:
 
 TEST_F(StringUtilTest, normal) {
     {
+        ASSERT_EQ("abc", to_lower("ABC"));
+        ASSERT_EQ("abc", to_lower("abc"));
+        ASSERT_EQ("123", to_lower("123"));
+        ASSERT_EQ("abc,", to_lower("Abc,"));
+    }
+    {
+        ASSERT_EQ("ABC", to_upper("ABC"));
+        ASSERT_EQ("ABC", to_upper("abc"));
+        ASSERT_EQ("123", to_upper("123"));
+        ASSERT_EQ("ABC,", to_upper("Abc,"));
+    }
+    {
+        ASSERT_EQ("ABC", to_upper("ABC"));
+        ASSERT_EQ("ABC", to_upper("abc"));
+        ASSERT_EQ("123", to_upper("123"));
+        ASSERT_EQ("ABC,", to_upper("Abc,"));
+    }
+    {
+        ASSERT_TRUE(iequal("ABC", "ABC"));
+        ASSERT_TRUE(iequal("ABC", "abc"));
+        ASSERT_TRUE(iequal("ABC", "ABc"));
+        ASSERT_TRUE(iequal("123", "123"));
+        ASSERT_TRUE(iequal("123A,", "123a,"));
+        ASSERT_FALSE(iequal("12A,", "123a,"));
+        ASSERT_FALSE(iequal("abc", "ABD"));
+    }
+    {
+        ASSERT_TRUE(starts_with("abcd", "ab"));
+        ASSERT_TRUE(starts_with("abcd", "abc"));
+        ASSERT_TRUE(starts_with("abcd", "abcd"));
+        ASSERT_TRUE(starts_with("1234", "123"));
+        ASSERT_TRUE(starts_with("a", "a"));
+        ASSERT_TRUE(starts_with("", ""));
+        ASSERT_TRUE(starts_with("a", ""));
+        ASSERT_FALSE(starts_with("", " "));
+        ASSERT_FALSE(starts_with("1234", "123a"));
+    }
+    {
+        ASSERT_TRUE(ends_with("abcd", "cd"));
+        ASSERT_TRUE(ends_with("abcd", "bcd"));
+        ASSERT_TRUE(ends_with("abcd", "abcd"));
+        ASSERT_TRUE(ends_with("1234", "234"));
+        ASSERT_TRUE(ends_with("a", "a"));
+        ASSERT_TRUE(ends_with("", ""));
+        ASSERT_TRUE(ends_with("a", ""));
+        ASSERT_FALSE(ends_with("", " "));
+    }
+    {
+        std::vector<std::string> tokens1;
+        tokens1.push_back("xx");
+        tokens1.push_back("abc");
+        tokens1.push_back("xx");
+
+        std::vector<std::string> tokens2;
+        tokens2.push_back("");
+        tokens2.push_back("xx");
+        tokens2.push_back("abc");
+        tokens2.push_back("");
+        tokens2.push_back("abc");
+        tokens2.push_back("xx");
+        tokens2.push_back("");
+
+        std::vector<std::string> tokens3;
+        tokens3.push_back("");
+        tokens3.push_back("");
+        tokens3.push_back("");
+
+        std::vector<std::string> empty_tokens;
+
+        ASSERT_EQ(join(tokens1, "-"), "xx-abc-xx");
+        ASSERT_EQ(join(tokens2, "-"), "-xx-abc--abc-xx-");
+        ASSERT_EQ(join(empty_tokens, "-"), "");
+    }
+    {
+        std::string str1("xx-abc--xx-abb");
+        std::string str2("Xx-abc--xX-abb-xx");
+        std::string str3("xx");
+        std::string strempty("");
+        const char* pch1 = "xx-abc--xx-abb";
+        std::vector<std::string> tokens;
+        tokens = split(str2, "Xx");
+        ASSERT_EQ(tokens.size(), 2);
+        ASSERT_EQ(tokens[0], "");
+        ASSERT_EQ(tokens[1], "-abc--xX-abb-xx");
+        tokens = split(pch1, "x");
+        ASSERT_EQ(tokens.size(), 5);
+        ASSERT_EQ(tokens[0], "");
+        ASSERT_EQ(tokens[1], "");
+        ASSERT_EQ(tokens[2], "-abc--");
+        ASSERT_EQ(tokens[3], "");
+        ASSERT_EQ(tokens[4], "-abb");
+
+        tokens = split(str1, "-");
+        ASSERT_EQ(tokens.size(), 5);
+        ASSERT_EQ(tokens[0], "xx");
+        ASSERT_EQ(tokens[1], "abc");
+        ASSERT_EQ(tokens[2], "");
+        ASSERT_EQ(tokens[3], "xx");
+        ASSERT_EQ(tokens[4], "abb");
+
+        tokens = split(str3, ",");
+
+        ASSERT_EQ(tokens.size(), 1);
+        ASSERT_EQ(tokens[0], "xx");
+
+        tokens = split(strempty, "-");
+
+        ASSERT_EQ(tokens.size(), 1);
+        ASSERT_EQ(tokens[0], "");
+    }
+    {
         StringCaseSet test_set;
         test_set.emplace("AbC");
         test_set.emplace("AbCD");
diff --git a/build.sh b/build.sh
index 56a9571..72faec4 100755
--- a/build.sh
+++ b/build.sh
@@ -71,6 +71,26 @@ Usage: $0 <options>
   exit 1
 }
 
+clean_gensrc() {
+    pushd ${DORIS_HOME}/gensrc
+    make clean
+    rm -rf ${DORIS_HOME}/fe/fe-core/target
+    popd
+}
+
+clean_be() {
+    pushd ${DORIS_HOME}
+    rm -rf $CMAKE_BUILD_DIR
+    rm -rf ${DORIS_HOME}/be/output/
+    popd
+}
+
+clean_fe() {
+    pushd ${DORIS_HOME}/fe
+    ${MVN_CMD} clean
+    popd
+}
+
 OPTS=$(getopt \
   -n $0 \
   -o '' \
@@ -138,8 +158,10 @@ if [[ ! -f ${DORIS_THIRDPARTY}/installed/lib/libs2.a ]]; then
 fi
 
 if [ ${CLEAN} -eq 1 -a ${BUILD_BE} -eq 0 -a ${BUILD_FE} -eq 0 -a ${BUILD_SPARK_DPP} -eq 0 ]; then
-    echo "--clean can not be specified without --fe or --be or --spark-dpp"
-    exit 1
+    clean_gensrc
+    clean_be
+    clean_fe
+    exit 0
 fi
 
 if [[ -z ${WITH_MYSQL} ]]; then
@@ -169,15 +191,13 @@ echo "Get params:
 "
 
 # Clean and build generated code
-echo "Build generated code"
-cd ${DORIS_HOME}/gensrc
 if [ ${CLEAN} -eq 1 ]; then
-   make clean
-   rm -rf ${DORIS_HOME}/fe/fe-core/target
+    clean_gensrc
 fi
+echo "Build generated code"
+cd ${DORIS_HOME}/gensrc
 # DO NOT using parallel make(-j) for gensrc
 make
-cd ${DORIS_HOME}
 
 # Clean and build Backend
 if [ ${BUILD_BE} -eq 1 ] ; then
@@ -185,8 +205,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
     echo "Build Backend: ${CMAKE_BUILD_TYPE}"
     CMAKE_BUILD_DIR=${DORIS_HOME}/be/build_${CMAKE_BUILD_TYPE}
     if [ ${CLEAN} -eq 1 ]; then
-        rm -rf $CMAKE_BUILD_DIR
-        rm -rf ${DORIS_HOME}/be/output/
+        clean_be
     fi
     mkdir -p ${CMAKE_BUILD_DIR}
     cd ${CMAKE_BUILD_DIR}
@@ -221,7 +240,6 @@ if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then
     fi
 fi
 
-
 function build_ui() {
     # check NPM env here, not in env.sh.
     # Because UI should be considered a non-essential component at runtime.
@@ -262,7 +280,7 @@ if [ ${FE_MODULES}x != ""x ]; then
     echo "Build Frontend Modules: $FE_MODULES"
     cd ${DORIS_HOME}/fe
     if [ ${CLEAN} -eq 1 ]; then
-        ${MVN_CMD} clean
+        clean_fe
     fi
     ${MVN_CMD} package -pl ${FE_MODULES} -DskipTests
     cd ${DORIS_HOME}
diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index d9c2d53..e585509 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -137,6 +137,7 @@ module.exports = [
             ],
           },
           "cancel-label",
+          "check-reset-rpc-cache",
           "compaction-action",
           "connection-action",
           "fe-get-log-file",
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index 847784b..b393c2f 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -136,6 +136,7 @@ module.exports = [
             ],
           },
           "cancel-label",
+          "check-reset-rpc-cache",
           "compaction-action",
           "connection-action",
           "fe-get-log-file",
diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md
index cfbe04b..44111ef 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -1456,3 +1456,9 @@ Increasing this value can reduce the number of calls to read remote data, but it
 * Default value: 1000000
 
 The default value is currently only an empirical value, and may need to be modified according to actual scenarios. Increasing this value can cache more segments and avoid some IO. Decreasing this value will reduce memory usage.
+
+### `auto_refresh_brpc_channel`
+
+* Type: bool
+* Description: When obtaining a brpc connection, judge the availability of the connection through hand_shake rpc, and re-establish the connection if it is not available 。
+* Default value: false
diff --git a/docs/en/administrator-guide/http-actions/check-reset-rpc-cache.md b/docs/en/administrator-guide/http-actions/check-reset-rpc-cache.md
new file mode 100644
index 0000000..cbe3137
--- /dev/null
+++ b/docs/en/administrator-guide/http-actions/check-reset-rpc-cache.md
@@ -0,0 +1,47 @@
+---
+{
+    "title": "CHECK/RESET Stub Cache",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# CHECK/RESET Stub Cache
+## description
+   
+### Check Stub Cache
+    Check whether the connection cache is available
+
+    Description: Check whether the connection cache is available, the maximum load is 10M
+    METHOD: GET
+    URI: http://be_host:be_http_port/api/check_rpc_channel/{host_to_check}/{remot_brpc_port}/{payload_size}
+   
+### Reset Stub Cache
+    This api is used to reset the connection cache of brpc. Endpoints can be in the form of `all` to clear all caches, `host1:port1,host2:port2,...`: clear to the cache of the specified target
+
+    Description: Reset connection cache
+    METHOD: GET
+    URI: http://be_host:be_http_port/api/reset_rpc_channel/{endpoints}
+## example
+
+    curl -X GET "http://host:port/api/check_rpc_channel/host2/8060/1024000"
+    curl -X GET "http://host:port/api/reset_rpc_channel/all"
+
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md
index 5907a3e..221749b 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -1477,3 +1477,9 @@ webserver默认工作线程数
 * 默认值: 1000000
 
 默认值目前只是一个经验值,可能需要根据实际场景修改。增大该值可以缓存更多的segment从而避免一些IO。减少该值则会降低内存使用。
+
+### `auto_refresh_brpc_channel`
+
+* 类型: bool
+* 描述: 获取brpc连接时,通过hand_shake rpc 判断连接的可用性,如果不可用则重新建立连接 
+* 默认值: false
diff --git a/docs/zh-CN/administrator-guide/http-actions/check-reset-rpc-cache.md b/docs/zh-CN/administrator-guide/http-actions/check-reset-rpc-cache.md
new file mode 100644
index 0000000..8098bf6
--- /dev/null
+++ b/docs/zh-CN/administrator-guide/http-actions/check-reset-rpc-cache.md
@@ -0,0 +1,46 @@
+---
+{
+    "title": "检查和重置连接缓存",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 检查和重置连接缓存
+## description
+   
+### 检查连接缓存
+    该功能用于检查brpc的连接缓存。
+
+    说明:检查连接缓存是否可用,负载最大10M
+    METHOD: GET
+    URI: http://be_host:be_http_port/api/check_rpc_channel/{host_to_check}/{remot_brpc_port}/{payload_size}
+   
+### 重置连接缓存
+    该功能用于重置brpc的连接缓存。endpoints 可以时如下形式 `all` 清空全部缓存,  `host1:port1,host2:port2,...`: 清空到指定目标的缓存
+
+    说明:重置连接缓存,负载最大10M
+    METHOD: GET
+    URI: http://be_host:be_http_port/api/reset_rpc_channel/{endpoints}
+## example
+
+    curl -X GET "http://host:port/api/check_rpc_channel/host2/8060/1024000"
+    curl -X GET "http://host:port/api/reset_rpc_channel/all"
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
index e5f2fc5..7dce859 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
@@ -331,14 +331,12 @@ public class StreamLoadScanNodeTest {
                 slot.setIsNullable(false);
             }
         }
-
-        new Expectations() {{
-            catalog.getFunction((Function) any, (Function.CompareMode) any);
-            result = new ScalarFunction(new FunctionName(FunctionSet.HLL_HASH), Lists.newArrayList(), Type.BIGINT, false, true);
-        }};
         
         new Expectations() {
             {
+                catalog.getFunction((Function) any, (Function.CompareMode) any);
+                result = new ScalarFunction(new FunctionName(FunctionSet.HLL_HASH), Lists.newArrayList(), Type.BIGINT, false, true);
+
                 dstTable.getColumn("k1");
                 result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get();
 
@@ -523,7 +521,6 @@ public class StreamLoadScanNodeTest {
         TStreamLoadPutRequest request = getBaseRequest();
         request.setColumns("k1,k2,v1, v2=k3");
         StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
-
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
         scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
@@ -675,7 +672,6 @@ public class StreamLoadScanNodeTest {
         request.setColumns("k1,k2,v1, v2=k1");
         request.setWhere("k5 = 1");
         StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
-
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
         scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
@@ -684,7 +680,7 @@ public class StreamLoadScanNodeTest {
     }
 
     @Test(expected = UserException.class)
-    public void testWhereNotBool() throws UserException, UserException {
+    public void testWhereNotBool() throws UserException {
         Analyzer analyzer = new Analyzer(catalog, connectContext);
         DescriptorTable descTbl = analyzer.getDescTbl();
 
@@ -729,7 +725,6 @@ public class StreamLoadScanNodeTest {
         request.setColumns("k1,k2,v1, v2=k1");
         request.setWhere("k1 + v2");
         StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
-
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
         scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
@@ -763,11 +758,7 @@ public class StreamLoadScanNodeTest {
                 minTimes = 0;
                 dstTable.hasSequenceCol();
                 result = true;
-            }
-        };
 
-        new Expectations() {
-            {
                 dstTable.getColumn("k1");
                 result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get();
                 minTimes = 0;
@@ -833,11 +824,7 @@ public class StreamLoadScanNodeTest {
                 minTimes = 0;
                 dstTable.hasSequenceCol();
                 result = true;
-            }
-        };
 
-        new Expectations() {
-            {
                 dstTable.getBaseSchema(anyBoolean); result = columns;
                 dstTable.getFullSchema(); result = columns;
 
@@ -879,3 +866,4 @@ public class StreamLoadScanNodeTest {
         scanNode.toThrift(planNode);
     }
 }
+
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 2e07c46..b88b01d 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -387,6 +387,35 @@ message PConstantExprResult {
     map<string, PExprResultMap> expr_result_map = 2;
 };
 
+message PCheckRPCChannelRequest {
+    optional bytes data = 1;
+    optional uint32 size = 2;
+    optional string md5 = 3;
+};
+
+message PCheckRPCChannelResponse {
+    required PStatus status = 1;
+};
+
+message PResetRPCChannelRequest {
+    required bool all = 1;
+    repeated string endpoints = 2;
+};
+
+message PResetRPCChannelResponse {
+    required PStatus status = 1;
+    repeated string channels = 2;
+};
+
+message PHandShakeRequest {
+    optional string hello = 1;
+}
+
+message PHandShakeResponse {
+    optional PStatus status = 1;
+    optional string hello = 2;
+}
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
     rpc exec_plan_fragment(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult);
@@ -406,5 +435,8 @@ service PBackendService {
     rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
     rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
+    rpc check_rpc_channel(PCheckRPCChannelRequest) returns (PCheckRPCChannelResponse);
+    rpc reset_rpc_channel(PResetRPCChannelRequest) returns (PResetRPCChannelResponse);
+    rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse);
 };
 
diff --git a/gensrc/proto/palo_internal_service.proto b/gensrc/proto/palo_internal_service.proto
deleted file mode 100644
index 222fdf7..0000000
--- a/gensrc/proto/palo_internal_service.proto
+++ /dev/null
@@ -1,47 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// NOTE(XXX): DEPRECATED, just use to compatiple with old version.
-// Make system can grayscale upgrade
-syntax="proto2";
-
-import "internal_service.proto";
-
-package palo;
-option java_package = "org.apache.doris.proto";
-
-option cc_generic_services = true;
-
-service PInternalService {
-    rpc transmit_data(doris.PTransmitDataParams) returns (doris.PTransmitDataResult);
-    rpc exec_plan_fragment(doris.PExecPlanFragmentRequest) returns (doris.PExecPlanFragmentResult);
-    rpc cancel_plan_fragment(doris.PCancelPlanFragmentRequest) returns (doris.PCancelPlanFragmentResult);
-    rpc fetch_data(doris.PFetchDataRequest) returns (doris.PFetchDataResult);
-    rpc tablet_writer_open(doris.PTabletWriterOpenRequest) returns (doris.PTabletWriterOpenResult);
-    rpc tablet_writer_add_batch(doris.PTabletWriterAddBatchRequest) returns (doris.PTabletWriterAddBatchResult);
-    rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns (doris.PTabletWriterCancelResult);
-    rpc get_info(doris.PProxyRequest) returns (doris.PProxyResult);
-    rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse);
-    rpc fetch_cache(doris.PFetchCacheRequest) returns (doris.PFetchCacheResult);
-    rpc clear_cache(doris.PClearCacheRequest) returns (doris.PCacheResponse);
-
-    rpc merge_filter(doris.PMergeFilterRequest) returns (doris.PMergeFilterResponse);
-    rpc apply_filter(doris.PPublishFilterRequest) returns (doris.PPublishFilterResponse);
-
-    rpc fold_constant_expr(doris.PConstantExprRequest) returns (doris.PConstantExprResult);
-    rpc transmit_block(doris.PTransmitDataParams) returns (doris.PTransmitDataResult);
-};
diff --git a/run-be-ut.sh b/run-be-ut.sh
index 07d97d1..1fce850 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -117,10 +117,20 @@ if [[ -z ${GLIBC_COMPATIBILITY} ]]; then
     GLIBC_COMPATIBILITY=ON
 fi
 
+# get specified ut file if set
+RUN_FILE=
+if [ $# == 1 ]; then
+    RUN_FILE=$1
+    echo "=== Run test: $RUN_FILE ==="
+else
+    # run all ut
+    echo "=== Running All tests ==="
+fi
+
 cd ${CMAKE_BUILD_DIR}
 ${CMAKE_CMD} -G "${GENERATOR}" ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} \
     -DGLIBC_COMPATIBILITY=${GLIBC_COMPATIBILITY}
-${BUILD_SYSTEM} -j ${PARALLEL}
+${BUILD_SYSTEM} -j ${PARALLEL} $RUN_FILE
 
 if [ ${RUN} -ne 1 ]; then
     echo "Finished"
@@ -160,16 +170,6 @@ cp -r ${DORIS_HOME}/be/test/plugin/plugin_test ${DORIS_TEST_BINARY_DIR}/plugin/
 # find all executable test files
 test_files=`find ${DORIS_TEST_BINARY_DIR} -type f -perm -111 -name "*test"`
 
-# get specified ut file if set
-RUN_FILE=
-if [ $# == 1 ]; then
-    RUN_FILE=$1
-    echo "=== Run test: $RUN_FILE ==="
-else
-    # run all ut
-    echo "=== Running All tests ==="
-fi
-
 for test in ${test_files[@]}
 do
     file_name=${test##*/}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org