You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/04/15 09:48:00 UTC
[doris] branch master updated: [Chore](thrift) add some check on client cache && remove some unused code && catch st… #18683
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 975b373896 [Chore](thrift) add some check on client cache && remove some unused code && catch st… #18683
975b373896 is described below
commit 975b373896363ada0528121f7cb82675c0ee9b6e
Author: Pxl <px...@qq.com>
AuthorDate: Sat Apr 15 17:47:51 2023 +0800
[Chore](thrift) add some check on client cache && remove some unused code && catch st… #18683
---
be/src/agent/utils.cpp | 7 +++---
be/src/http/utils.cpp | 1 -
be/src/olap/tablet.cpp | 28 +++++++++++----------
be/src/runtime/client_cache.cpp | 43 +++-----------------------------
be/src/runtime/client_cache.h | 19 ++------------
be/src/service/backend_service.h | 54 ++++++++++++++++++----------------------
be/src/util/thrift_client.cpp | 8 ++++--
be/src/util/thrift_client.h | 5 ++--
8 files changed, 55 insertions(+), 110 deletions(-)
diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp
index 4543a44e83..dfe4bc04c8 100644
--- a/be/src/agent/utils.cpp
+++ b/be/src/agent/utils.cpp
@@ -32,7 +32,6 @@
using std::map;
using std::string;
using std::stringstream;
-using apache::thrift::TException;
using apache::thrift::transport::TTransportException;
namespace doris {
@@ -80,7 +79,7 @@ Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMaste
}
client->finishTask(*result, request);
}
- } catch (TException& e) {
+ } catch (std::exception& e) {
client.reopen(config::thrift_rpc_timeout_ms);
LOG(WARNING) << "fail to finish_task. "
<< "host=" << _master_info.network_address.hostname
@@ -130,7 +129,7 @@ Status MasterServerClient::report(const TReportRequest& request, TMasterResult*
return Status::InternalError("Fail to report to master");
}
}
- } catch (TException& e) {
+ } catch (std::exception& e) {
client.reopen(config::thrift_rpc_timeout_ms);
LOG(WARNING) << "fail to report to master. "
<< "host=" << _master_info.network_address.hostname
@@ -181,7 +180,7 @@ Status MasterServerClient::confirm_unused_remote_files(
client_status.code(), e.what());
}
}
- } catch (TException& e) {
+ } catch (std::exception& e) {
client.reopen(config::thrift_rpc_timeout_ms);
return Status::InternalError(
"fail to confirm unused remote files. host={}, port={}, code={}, reason={}",
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index a4a9348119..ec793b9c30 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -174,7 +174,6 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req) {
std::string result_str = result.str();
HttpChannel::send_reply(req, result_str);
- return;
}
} // namespace doris
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 017da4473b..8abe60c80c 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -82,8 +82,6 @@ namespace doris {
using namespace ErrorCode;
using std::pair;
-using std::nothrow;
-using std::sort;
using std::string;
using std::vector;
using io::FileSystemSPtr;
@@ -483,7 +481,9 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
}
void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
- if (to_add.empty()) return;
+ if (to_add.empty()) {
+ return;
+ }
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_add.size());
for (auto& rs : to_add) {
@@ -495,7 +495,9 @@ void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
}
void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool move_to_stale) {
- if (to_delete.empty()) return;
+ if (to_delete.empty()) {
+ return;
+ }
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_delete.size());
for (auto& rs : to_delete) {
@@ -860,7 +862,7 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector<Version>&
rowsets->push_back(it_expired->second);
break;
}
- } while (0);
+ } while (false);
if (!is_find) {
LOG(WARNING) << "fail to find Rowset for version. tablet=" << full_name()
@@ -1452,10 +1454,10 @@ bool Tablet::_contains_rowset(const RowsetId rowset_id) {
void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
bool enable_consecutive_missing_check) {
std::shared_lock rdlock(_meta_lock);
- tablet_info->tablet_id = _tablet_meta->tablet_id();
- tablet_info->schema_hash = _tablet_meta->schema_hash();
- tablet_info->row_count = _tablet_meta->num_rows();
- tablet_info->data_size = _tablet_meta->tablet_local_size();
+ tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
+ tablet_info->__set_schema_hash(_tablet_meta->schema_hash());
+ tablet_info->__set_row_count(_tablet_meta->num_rows());
+ tablet_info->__set_data_size(_tablet_meta->tablet_local_size());
// Here we need to report to FE if there are any missing versions of tablet.
// We start from the initial version and traverse backwards until we meet a discontinuous version.
@@ -1511,9 +1513,9 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
}
// the report version is the largest continuous version, same logic as in FE side
- tablet_info->version = cversion.second;
+ tablet_info->__set_version(cversion.second);
// Useless but it is a required filed in TTabletInfo
- tablet_info->version_hash = 0;
+ tablet_info->__set_version_hash(0);
tablet_info->__set_partition_id(_tablet_meta->partition_id());
tablet_info->__set_storage_medium(_data_dir->storage_medium());
tablet_info->__set_version_count(_tablet_meta->version_count());
@@ -1708,7 +1710,7 @@ Status Tablet::create_initial_rowset(const int64_t req_version) {
LOG(WARNING) << "failed to add rowset for tablet " << full_name();
break;
}
- } while (0);
+ } while (false);
// Unregister index and delete files(index and data) if failed
if (!res.ok()) {
@@ -2003,7 +2005,7 @@ Status Tablet::_follow_cooldowned_data() {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->init_from_pb(*rs_pb_it);
RowsetSharedPtr rs;
- RowsetFactory::create_rowset(_schema, _tablet_path, std::move(rs_meta), &rs);
+ RowsetFactory::create_rowset(_schema, _tablet_path, rs_meta, &rs);
to_add.push_back(std::move(rs));
}
// Note: We CANNOT call `modify_rowsets` here because `modify_rowsets` cannot process version graph correctly.
diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp
index 020d46b141..d4edbca6c0 100644
--- a/be/src/runtime/client_cache.cpp
+++ b/be/src/runtime/client_cache.cpp
@@ -123,12 +123,15 @@ Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,
*client_key = nullptr;
return status;
}
+
+ DCHECK(*client_key != nullptr);
client_impl->set_send_timeout(timeout_ms);
client_impl->set_recv_timeout(timeout_ms);
{
std::lock_guard<std::mutex> lock(_lock);
// Because the client starts life 'checked out', we don't add it to the cache map
+ DCHECK(_client_map.count(*client_key) == 0);
_client_map[*client_key] = client_impl.release();
}
@@ -177,33 +180,6 @@ void ClientCacheHelper::release_client(void** client_key) {
*client_key = nullptr;
}
-void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) {
- std::vector<ThriftClientImpl*> to_close;
- {
- std::lock_guard<std::mutex> lock(_lock);
- auto cache_entry = _client_cache.find(hostport);
-
- if (cache_entry == _client_cache.end()) {
- return;
- }
-
- VLOG_RPC << "Invalidating all " << cache_entry->second.size()
- << " clients for: " << hostport;
- for (void* client_key : cache_entry->second) {
- auto client_map_entry = _client_map.find(client_key);
- DCHECK(client_map_entry != _client_map.end());
- ThriftClientImpl* info = client_map_entry->second;
- _client_map.erase(client_key);
- to_close.push_back(info);
- }
- }
-
- for (auto* info : to_close) {
- info->close();
- delete info;
- }
-}
-
std::string ClientCacheHelper::debug_string() {
std::stringstream out;
out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " [";
@@ -221,19 +197,6 @@ std::string ClientCacheHelper::debug_string() {
return out.str();
}
-void ClientCacheHelper::test_shutdown() {
- std::vector<TNetworkAddress> hostports;
- {
- std::lock_guard<std::mutex> lock(_lock);
- for (const auto& [endpoint, _] : _client_cache) {
- hostports.push_back(endpoint);
- }
- }
- for (const auto& endpoint : hostports) {
- close_connections(endpoint);
- }
-}
-
void ClientCacheHelper::init_metrics(const std::string& name) {
// Not strictly needed if init_metrics is called before any cache
// usage, but ensures that _metrics_enabled is published.
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index a8d1f47b33..c461a45082 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -75,14 +75,8 @@ public:
// Return a client to the cache, without closing it, and set *client_key to nullptr.
void release_client(void** client_key);
- // Close all connections to a host (e.g., in case of failure) so that on their
- // next use they will have to be Reopen'ed.
- void close_connections(const TNetworkAddress& address);
-
std::string debug_string();
- void test_shutdown();
-
void init_metrics(const std::string& name);
private:
@@ -161,6 +155,7 @@ public:
DCHECK(_client != nullptr);
break;
}
+ DCHECK(_client == nullptr);
if (num_retries++ < max_retries) {
// exponential backoff retry with starting delay of 500ms
usleep(500000 * (1 << num_retries));
@@ -194,7 +189,7 @@ class ClientCache {
public:
using Client = ThriftClient<T>;
- ClientCache() : _client_cache_helper() {
+ ClientCache() {
_client_factory =
std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this,
std::placeholders::_1, std::placeholders::_2);
@@ -206,19 +201,9 @@ public:
std::placeholders::_1, std::placeholders::_2);
}
- // Close all clients connected to the supplied address, (e.g., in
- // case of failure) so that on their next use they will have to be
- // Reopen'ed.
- void close_connections(const TNetworkAddress& hostport) {
- return _client_cache_helper.close_connections(hostport);
- }
-
// Helper method which returns a debug string
std::string debug_string() { return _client_cache_helper.debug_string(); }
- // For testing only: shutdown all clients
- void test_shutdown() { return _client_cache_helper.test_shutdown(); }
-
// Adds metrics for this cache.
// The metrics have an identification by the 'name' argument
// (which should not end in a period).
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index a4c4edd7f9..64a06e34ec 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -64,44 +64,38 @@ class BackendService : public BackendServiceIf {
public:
BackendService(ExecEnv* exec_env);
- virtual ~BackendService() {
- // _is_stop = true;
- // _keep_alive_reaper->join();
- }
+ ~BackendService() override = default;
// NOTE: now we do not support multiple backend in one process
static Status create_service(ExecEnv* exec_env, int port, ThriftServer** server);
// Agent service
- virtual void submit_tasks(TAgentResult& return_value,
- const std::vector<TAgentTaskRequest>& tasks) override {
+ void submit_tasks(TAgentResult& return_value,
+ const std::vector<TAgentTaskRequest>& tasks) override {
_agent_server->submit_tasks(return_value, tasks);
}
- virtual void make_snapshot(TAgentResult& return_value,
- const TSnapshotRequest& snapshot_request) override {
+ void make_snapshot(TAgentResult& return_value,
+ const TSnapshotRequest& snapshot_request) override {
_agent_server->make_snapshot(return_value, snapshot_request);
}
- virtual void release_snapshot(TAgentResult& return_value,
- const std::string& snapshot_path) override {
+ void release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) override {
_agent_server->release_snapshot(return_value, snapshot_path);
}
- virtual void publish_cluster_state(TAgentResult& result,
- const TAgentPublishRequest& request) override {
+ void publish_cluster_state(TAgentResult& result, const TAgentPublishRequest& request) override {
_agent_server->publish_cluster_state(result, request);
}
// DorisServer service
- virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
- const TExecPlanFragmentParams& params) override;
+ void exec_plan_fragment(TExecPlanFragmentResult& return_val,
+ const TExecPlanFragmentParams& params) override;
- virtual void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
- const TCancelPlanFragmentParams& params) override;
+ void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
+ const TCancelPlanFragmentParams& params) override;
- virtual void transmit_data(TTransmitDataResult& return_val,
- const TTransmitDataParams& params) override;
+ void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) override;
void submit_export_task(TStatus& t_status, const TExportTaskRequest& request) override;
@@ -109,30 +103,30 @@ public:
void erase_export_task(TStatus& t_status, const TUniqueId& task_id) override;
- virtual void get_tablet_stat(TTabletStatResult& result) override;
+ void get_tablet_stat(TTabletStatResult& result) override;
- virtual int64_t get_trash_used_capacity() override;
+ int64_t get_trash_used_capacity() override;
- virtual void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
+ void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
- virtual void submit_routine_load_task(TStatus& t_status,
- const std::vector<TRoutineLoadTask>& tasks) override;
+ void submit_routine_load_task(TStatus& t_status,
+ const std::vector<TRoutineLoadTask>& tasks) override;
// used for external service, open means start the scan procedure
- virtual void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) override;
+ void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) override;
// used for external service, external use getNext to fetch data batch after batch until eos = true
- virtual void get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) override;
+ void get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) override;
// used for external service, close some context and release resource related with this context
- virtual void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) override;
+ void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) override;
- virtual void get_stream_load_record(TStreamLoadRecordResult& result,
- const int64_t last_stream_record_time) override;
+ void get_stream_load_record(TStreamLoadRecordResult& result,
+ const int64_t last_stream_record_time) override;
- virtual void clean_trash() override;
+ void clean_trash() override;
- virtual void check_storage_format(TCheckStorageFormatResult& result) override;
+ void check_storage_format(TCheckStorageFormatResult& result) override;
private:
Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);
diff --git a/be/src/util/thrift_client.cpp b/be/src/util/thrift_client.cpp
index bc2699f7d9..82bf291047 100644
--- a/be/src/util/thrift_client.cpp
+++ b/be/src/util/thrift_client.cpp
@@ -76,14 +76,18 @@ Status ThriftClientImpl::open_with_retry(int num_tries, int wait_ms) {
void ThriftClientImpl::close() {
try {
- if (_transport.get() != nullptr && _transport->isOpen()) _transport->close();
+ if (_transport != nullptr && _transport->isOpen()) {
+ _transport->close();
+ }
} catch (const apache::thrift::transport::TTransportException& e) {
LOG(INFO) << "Error closing connection to: " << ipaddress() << ":" << port()
<< ", ignoring (" << e.what() << ")";
// Forcibly close the socket (since the transport may have failed to get that far
// during close())
try {
- if (_socket.get() != nullptr) _socket->close();
+ if (_socket != nullptr) {
+ _socket->close();
+ }
} catch (const apache::thrift::transport::TTransportException& e) {
LOG(INFO) << "Error closing socket to: " << ipaddress() << ":" << port()
<< ", ignoring (" << e.what() << ")";
diff --git a/be/src/util/thrift_client.h b/be/src/util/thrift_client.h
index b248bb7132..0bc7c1b95c 100644
--- a/be/src/util/thrift_client.h
+++ b/be/src/util/thrift_client.h
@@ -41,7 +41,7 @@ class ThriftClientImpl {
public:
virtual ~ThriftClientImpl() { close(); }
const std::string& ipaddress() { return _ipaddress; }
- int port() { return _port; }
+ int port() const { return _port; }
// Open the connection to the remote server. May be called
// repeatedly, is idempotent unless there is a failure to connect.
@@ -69,7 +69,6 @@ protected:
_port(port),
_socket(new apache::thrift::transport::TSocket(ipaddress, port)) {}
-protected:
std::string _ipaddress;
int _port;
@@ -106,7 +105,7 @@ ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port
template <class InterfaceType>
ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port,
ThriftServer::ServerType server_type)
- : ThriftClientImpl(ipaddress, port), _iface(new InterfaceType(_protocol)) {
+ : ThriftClientImpl(ipaddress, port) {
switch (server_type) {
case ThriftServer::NON_BLOCKING:
_transport.reset(new apache::thrift::transport::TFramedTransport(_socket));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org