You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/04/15 09:48:00 UTC

[doris] branch master updated: [Chore](thrift) add some check on client cache && remove some unused code && catch st… #18683

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 975b373896 [Chore](thrift) add some check on client cache && remove some unused code && catch st… #18683
975b373896 is described below

commit 975b373896363ada0528121f7cb82675c0ee9b6e
Author: Pxl <px...@qq.com>
AuthorDate: Sat Apr 15 17:47:51 2023 +0800

    [Chore](thrift) add some check on client cache && remove some unused code && catch st… #18683
---
 be/src/agent/utils.cpp           |  7 +++---
 be/src/http/utils.cpp            |  1 -
 be/src/olap/tablet.cpp           | 28 +++++++++++----------
 be/src/runtime/client_cache.cpp  | 43 +++-----------------------------
 be/src/runtime/client_cache.h    | 19 ++------------
 be/src/service/backend_service.h | 54 ++++++++++++++++++----------------------
 be/src/util/thrift_client.cpp    |  8 ++++--
 be/src/util/thrift_client.h      |  5 ++--
 8 files changed, 55 insertions(+), 110 deletions(-)

diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp
index 4543a44e83..dfe4bc04c8 100644
--- a/be/src/agent/utils.cpp
+++ b/be/src/agent/utils.cpp
@@ -32,7 +32,6 @@
 using std::map;
 using std::string;
 using std::stringstream;
-using apache::thrift::TException;
 using apache::thrift::transport::TTransportException;
 
 namespace doris {
@@ -80,7 +79,7 @@ Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMaste
             }
             client->finishTask(*result, request);
         }
-    } catch (TException& e) {
+    } catch (std::exception& e) {
         client.reopen(config::thrift_rpc_timeout_ms);
         LOG(WARNING) << "fail to finish_task. "
                      << "host=" << _master_info.network_address.hostname
@@ -130,7 +129,7 @@ Status MasterServerClient::report(const TReportRequest& request, TMasterResult*
                 return Status::InternalError("Fail to report to master");
             }
         }
-    } catch (TException& e) {
+    } catch (std::exception& e) {
         client.reopen(config::thrift_rpc_timeout_ms);
         LOG(WARNING) << "fail to report to master. "
                      << "host=" << _master_info.network_address.hostname
@@ -181,7 +180,7 @@ Status MasterServerClient::confirm_unused_remote_files(
                         client_status.code(), e.what());
             }
         }
-    } catch (TException& e) {
+    } catch (std::exception& e) {
         client.reopen(config::thrift_rpc_timeout_ms);
         return Status::InternalError(
                 "fail to confirm unused remote files. host={}, port={}, code={}, reason={}",
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index a4a9348119..ec793b9c30 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -174,7 +174,6 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req) {
 
     std::string result_str = result.str();
     HttpChannel::send_reply(req, result_str);
-    return;
 }
 
 } // namespace doris
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 017da4473b..8abe60c80c 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -82,8 +82,6 @@ namespace doris {
 using namespace ErrorCode;
 
 using std::pair;
-using std::nothrow;
-using std::sort;
 using std::string;
 using std::vector;
 using io::FileSystemSPtr;
@@ -483,7 +481,9 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
 }
 
 void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
-    if (to_add.empty()) return;
+    if (to_add.empty()) {
+        return;
+    }
     std::vector<RowsetMetaSharedPtr> rs_metas;
     rs_metas.reserve(to_add.size());
     for (auto& rs : to_add) {
@@ -495,7 +495,9 @@ void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
 }
 
 void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool move_to_stale) {
-    if (to_delete.empty()) return;
+    if (to_delete.empty()) {
+        return;
+    }
     std::vector<RowsetMetaSharedPtr> rs_metas;
     rs_metas.reserve(to_delete.size());
     for (auto& rs : to_delete) {
@@ -860,7 +862,7 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector<Version>&
                 rowsets->push_back(it_expired->second);
                 break;
             }
-        } while (0);
+        } while (false);
 
         if (!is_find) {
             LOG(WARNING) << "fail to find Rowset for version. tablet=" << full_name()
@@ -1452,10 +1454,10 @@ bool Tablet::_contains_rowset(const RowsetId rowset_id) {
 void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
                                       bool enable_consecutive_missing_check) {
     std::shared_lock rdlock(_meta_lock);
-    tablet_info->tablet_id = _tablet_meta->tablet_id();
-    tablet_info->schema_hash = _tablet_meta->schema_hash();
-    tablet_info->row_count = _tablet_meta->num_rows();
-    tablet_info->data_size = _tablet_meta->tablet_local_size();
+    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
+    tablet_info->__set_schema_hash(_tablet_meta->schema_hash());
+    tablet_info->__set_row_count(_tablet_meta->num_rows());
+    tablet_info->__set_data_size(_tablet_meta->tablet_local_size());
 
     // Here we need to report to FE if there are any missing versions of tablet.
     // We start from the initial version and traverse backwards until we meet a discontinuous version.
@@ -1511,9 +1513,9 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
     }
 
     // the report version is the largest continuous version, same logic as in FE side
-    tablet_info->version = cversion.second;
+    tablet_info->__set_version(cversion.second);
     // Useless but it is a required filed in TTabletInfo
-    tablet_info->version_hash = 0;
+    tablet_info->__set_version_hash(0);
     tablet_info->__set_partition_id(_tablet_meta->partition_id());
     tablet_info->__set_storage_medium(_data_dir->storage_medium());
     tablet_info->__set_version_count(_tablet_meta->version_count());
@@ -1708,7 +1710,7 @@ Status Tablet::create_initial_rowset(const int64_t req_version) {
             LOG(WARNING) << "failed to add rowset for tablet " << full_name();
             break;
         }
-    } while (0);
+    } while (false);
 
     // Unregister index and delete files(index and data) if failed
     if (!res.ok()) {
@@ -2003,7 +2005,7 @@ Status Tablet::_follow_cooldowned_data() {
             auto rs_meta = std::make_shared<RowsetMeta>();
             rs_meta->init_from_pb(*rs_pb_it);
             RowsetSharedPtr rs;
-            RowsetFactory::create_rowset(_schema, _tablet_path, std::move(rs_meta), &rs);
+            RowsetFactory::create_rowset(_schema, _tablet_path, rs_meta, &rs);
             to_add.push_back(std::move(rs));
         }
         // Note: We CANNOT call `modify_rowsets` here because `modify_rowsets` cannot process version graph correctly.
diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp
index 020d46b141..d4edbca6c0 100644
--- a/be/src/runtime/client_cache.cpp
+++ b/be/src/runtime/client_cache.cpp
@@ -123,12 +123,15 @@ Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,
         *client_key = nullptr;
         return status;
     }
+
+    DCHECK(*client_key != nullptr);
     client_impl->set_send_timeout(timeout_ms);
     client_impl->set_recv_timeout(timeout_ms);
 
     {
         std::lock_guard<std::mutex> lock(_lock);
         // Because the client starts life 'checked out', we don't add it to the cache map
+        DCHECK(_client_map.count(*client_key) == 0);
         _client_map[*client_key] = client_impl.release();
     }
 
@@ -177,33 +180,6 @@ void ClientCacheHelper::release_client(void** client_key) {
     *client_key = nullptr;
 }
 
-void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) {
-    std::vector<ThriftClientImpl*> to_close;
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto cache_entry = _client_cache.find(hostport);
-
-        if (cache_entry == _client_cache.end()) {
-            return;
-        }
-
-        VLOG_RPC << "Invalidating all " << cache_entry->second.size()
-                 << " clients for: " << hostport;
-        for (void* client_key : cache_entry->second) {
-            auto client_map_entry = _client_map.find(client_key);
-            DCHECK(client_map_entry != _client_map.end());
-            ThriftClientImpl* info = client_map_entry->second;
-            _client_map.erase(client_key);
-            to_close.push_back(info);
-        }
-    }
-
-    for (auto* info : to_close) {
-        info->close();
-        delete info;
-    }
-}
-
 std::string ClientCacheHelper::debug_string() {
     std::stringstream out;
     out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " [";
@@ -221,19 +197,6 @@ std::string ClientCacheHelper::debug_string() {
     return out.str();
 }
 
-void ClientCacheHelper::test_shutdown() {
-    std::vector<TNetworkAddress> hostports;
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        for (const auto& [endpoint, _] : _client_cache) {
-            hostports.push_back(endpoint);
-        }
-    }
-    for (const auto& endpoint : hostports) {
-        close_connections(endpoint);
-    }
-}
-
 void ClientCacheHelper::init_metrics(const std::string& name) {
     // Not strictly needed if init_metrics is called before any cache
     // usage, but ensures that _metrics_enabled is published.
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index a8d1f47b33..c461a45082 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -75,14 +75,8 @@ public:
     // Return a client to the cache, without closing it, and set *client_key to nullptr.
     void release_client(void** client_key);
 
-    // Close all connections to a host (e.g., in case of failure) so that on their
-    // next use they will have to be Reopen'ed.
-    void close_connections(const TNetworkAddress& address);
-
     std::string debug_string();
 
-    void test_shutdown();
-
     void init_metrics(const std::string& name);
 
 private:
@@ -161,6 +155,7 @@ public:
                 DCHECK(_client != nullptr);
                 break;
             }
+            DCHECK(_client == nullptr);
             if (num_retries++ < max_retries) {
                 // exponential backoff retry with starting delay of 500ms
                 usleep(500000 * (1 << num_retries));
@@ -194,7 +189,7 @@ class ClientCache {
 public:
     using Client = ThriftClient<T>;
 
-    ClientCache() : _client_cache_helper() {
+    ClientCache() {
         _client_factory =
                 std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this,
                                              std::placeholders::_1, std::placeholders::_2);
@@ -206,19 +201,9 @@ public:
                                              std::placeholders::_1, std::placeholders::_2);
     }
 
-    // Close all clients connected to the supplied address, (e.g., in
-    // case of failure) so that on their next use they will have to be
-    // Reopen'ed.
-    void close_connections(const TNetworkAddress& hostport) {
-        return _client_cache_helper.close_connections(hostport);
-    }
-
     // Helper method which returns a debug string
     std::string debug_string() { return _client_cache_helper.debug_string(); }
 
-    // For testing only: shutdown all clients
-    void test_shutdown() { return _client_cache_helper.test_shutdown(); }
-
     // Adds metrics for this cache.
     // The metrics have an identification by the 'name' argument
     // (which should not end in a period).
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index a4c4edd7f9..64a06e34ec 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -64,44 +64,38 @@ class BackendService : public BackendServiceIf {
 public:
     BackendService(ExecEnv* exec_env);
 
-    virtual ~BackendService() {
-        // _is_stop = true;
-        // _keep_alive_reaper->join();
-    }
+    ~BackendService() override = default;
 
     // NOTE: now we do not support multiple backend in one process
     static Status create_service(ExecEnv* exec_env, int port, ThriftServer** server);
 
     // Agent service
-    virtual void submit_tasks(TAgentResult& return_value,
-                              const std::vector<TAgentTaskRequest>& tasks) override {
+    void submit_tasks(TAgentResult& return_value,
+                      const std::vector<TAgentTaskRequest>& tasks) override {
         _agent_server->submit_tasks(return_value, tasks);
     }
 
-    virtual void make_snapshot(TAgentResult& return_value,
-                               const TSnapshotRequest& snapshot_request) override {
+    void make_snapshot(TAgentResult& return_value,
+                       const TSnapshotRequest& snapshot_request) override {
         _agent_server->make_snapshot(return_value, snapshot_request);
     }
 
-    virtual void release_snapshot(TAgentResult& return_value,
-                                  const std::string& snapshot_path) override {
+    void release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) override {
         _agent_server->release_snapshot(return_value, snapshot_path);
     }
 
-    virtual void publish_cluster_state(TAgentResult& result,
-                                       const TAgentPublishRequest& request) override {
+    void publish_cluster_state(TAgentResult& result, const TAgentPublishRequest& request) override {
         _agent_server->publish_cluster_state(result, request);
     }
 
     // DorisServer service
-    virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
-                                    const TExecPlanFragmentParams& params) override;
+    void exec_plan_fragment(TExecPlanFragmentResult& return_val,
+                            const TExecPlanFragmentParams& params) override;
 
-    virtual void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
-                                      const TCancelPlanFragmentParams& params) override;
+    void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
+                              const TCancelPlanFragmentParams& params) override;
 
-    virtual void transmit_data(TTransmitDataResult& return_val,
-                               const TTransmitDataParams& params) override;
+    void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) override;
 
     void submit_export_task(TStatus& t_status, const TExportTaskRequest& request) override;
 
@@ -109,30 +103,30 @@ public:
 
     void erase_export_task(TStatus& t_status, const TUniqueId& task_id) override;
 
-    virtual void get_tablet_stat(TTabletStatResult& result) override;
+    void get_tablet_stat(TTabletStatResult& result) override;
 
-    virtual int64_t get_trash_used_capacity() override;
+    int64_t get_trash_used_capacity() override;
 
-    virtual void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
+    void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
 
-    virtual void submit_routine_load_task(TStatus& t_status,
-                                          const std::vector<TRoutineLoadTask>& tasks) override;
+    void submit_routine_load_task(TStatus& t_status,
+                                  const std::vector<TRoutineLoadTask>& tasks) override;
 
     // used for external service, open means start the scan procedure
-    virtual void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) override;
+    void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) override;
 
     // used for external service, external use getNext to fetch data batch after batch until eos = true
-    virtual void get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) override;
+    void get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) override;
 
     // used for external service, close some context and release resource related with this context
-    virtual void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) override;
+    void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) override;
 
-    virtual void get_stream_load_record(TStreamLoadRecordResult& result,
-                                        const int64_t last_stream_record_time) override;
+    void get_stream_load_record(TStreamLoadRecordResult& result,
+                                const int64_t last_stream_record_time) override;
 
-    virtual void clean_trash() override;
+    void clean_trash() override;
 
-    virtual void check_storage_format(TCheckStorageFormatResult& result) override;
+    void check_storage_format(TCheckStorageFormatResult& result) override;
 
 private:
     Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);
diff --git a/be/src/util/thrift_client.cpp b/be/src/util/thrift_client.cpp
index bc2699f7d9..82bf291047 100644
--- a/be/src/util/thrift_client.cpp
+++ b/be/src/util/thrift_client.cpp
@@ -76,14 +76,18 @@ Status ThriftClientImpl::open_with_retry(int num_tries, int wait_ms) {
 
 void ThriftClientImpl::close() {
     try {
-        if (_transport.get() != nullptr && _transport->isOpen()) _transport->close();
+        if (_transport != nullptr && _transport->isOpen()) {
+            _transport->close();
+        }
     } catch (const apache::thrift::transport::TTransportException& e) {
         LOG(INFO) << "Error closing connection to: " << ipaddress() << ":" << port()
                   << ", ignoring (" << e.what() << ")";
         // Forcibly close the socket (since the transport may have failed to get that far
         // during close())
         try {
-            if (_socket.get() != nullptr) _socket->close();
+            if (_socket != nullptr) {
+                _socket->close();
+            }
         } catch (const apache::thrift::transport::TTransportException& e) {
             LOG(INFO) << "Error closing socket to: " << ipaddress() << ":" << port()
                       << ", ignoring (" << e.what() << ")";
diff --git a/be/src/util/thrift_client.h b/be/src/util/thrift_client.h
index b248bb7132..0bc7c1b95c 100644
--- a/be/src/util/thrift_client.h
+++ b/be/src/util/thrift_client.h
@@ -41,7 +41,7 @@ class ThriftClientImpl {
 public:
     virtual ~ThriftClientImpl() { close(); }
     const std::string& ipaddress() { return _ipaddress; }
-    int port() { return _port; }
+    int port() const { return _port; }
 
     // Open the connection to the remote server. May be called
     // repeatedly, is idempotent unless there is a failure to connect.
@@ -69,7 +69,6 @@ protected:
               _port(port),
               _socket(new apache::thrift::transport::TSocket(ipaddress, port)) {}
 
-protected:
     std::string _ipaddress;
     int _port;
 
@@ -106,7 +105,7 @@ ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port
 template <class InterfaceType>
 ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port,
                                           ThriftServer::ServerType server_type)
-        : ThriftClientImpl(ipaddress, port), _iface(new InterfaceType(_protocol)) {
+        : ThriftClientImpl(ipaddress, port) {
     switch (server_type) {
     case ThriftServer::NON_BLOCKING:
         _transport.reset(new apache::thrift::transport::TFramedTransport(_socket));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org