You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:42 UTC
[hbase] 54/133: HBASE-17463 [C++] RpcClient should close the thread
pool
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit a2cc4548fcb44ffb8150249716ce28637a642aab
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Jan 13 11:29:00 2017 -0800
HBASE-17463 [C++] RpcClient should close the thread pool
---
hbase-native-client/connection/connection-pool.cc | 3 +++
hbase-native-client/connection/connection-pool.h | 5 +++++
hbase-native-client/connection/rpc-client.cc | 14 +++++++++-----
hbase-native-client/connection/rpc-client.h | 1 +
hbase-native-client/serde/rpc.cc | 1 +
5 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index e022f9e..ee14c9d 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -117,3 +117,6 @@ void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
found->second->Close();
connections_.erase(found);
}
+
+void ConnectionPool::Close() {
+}
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 96d89ac..5101c68 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -75,6 +75,11 @@ class ConnectionPool {
*/
void Close(std::shared_ptr<ConnectionId> remote_id);
+ /**
+ * Close the Connection Pool
+ */
+ void Close();
+
private:
std::shared_ptr<RpcConnection> GetCachedConnection(
std::shared_ptr<ConnectionId> remote_id);
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 66ec231..3f0cfaf 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -43,13 +43,15 @@ class RpcChannelImplementation : public AbstractRpcChannel {
} // namespace hbase
RpcClient::RpcClient() {
- auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(
+ io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(
sysconf(_SC_NPROCESSORS_ONLN));
- cp_ = std::make_shared<ConnectionPool>(io_executor);
+ cp_ = std::make_shared<ConnectionPool>(io_executor_);
}
-void RpcClient::Close() {}
+void RpcClient::Close() {
+ io_executor_->stop();
+}
std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host,
uint16_t port,
@@ -114,6 +116,8 @@ void RpcClient::CallMethod(const MethodDescriptor* method,
std::unique_ptr<Request> req =
std::make_unique<Request>(shared_req, shared_resp, method->name());
- AsyncCall(host, port, std::move(req), ticket)
- .then([done, this](Response resp) { done->Run(); });
+ AsyncCall(host, port, std::move(req), ticket, method->service()->name())
+ .then([done, this](Response resp) {
+ done->Run();
+ });
}
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index c24db9d..dbf857d 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -91,6 +91,7 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
private:
std::shared_ptr<ConnectionPool> cp_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
};
class AbstractRpcChannel : public RpcChannel {
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc
index d863d50..e4ade22 100644
--- a/hbase-native-client/serde/rpc.cc
+++ b/hbase-native-client/serde/rpc.cc
@@ -110,6 +110,7 @@ unique_ptr<IOBuf> RpcSerde::Header(const string &user) {
// That may or may not be the correct thing to do.
// It worked for a while with the java client; until it
// didn't.
+ // TODO: send the service name and user from the RpcClient
h.set_service_name(INTERFACE);
return PrependLength(SerializeMessage(h));
}