You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:42 UTC

[hbase] 54/133: HBASE-17463 [C++] RpcClient should close the thread pool

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit a2cc4548fcb44ffb8150249716ce28637a642aab
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Jan 13 11:29:00 2017 -0800

    HBASE-17463 [C++] RpcClient should close the thread pool
---
 hbase-native-client/connection/connection-pool.cc |  3 +++
 hbase-native-client/connection/connection-pool.h  |  5 +++++
 hbase-native-client/connection/rpc-client.cc      | 14 +++++++++-----
 hbase-native-client/connection/rpc-client.h       |  1 +
 hbase-native-client/serde/rpc.cc                  |  1 +
 5 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index e022f9e..ee14c9d 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -117,3 +117,6 @@ void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
   found->second->Close();
   connections_.erase(found);
 }
+
+void ConnectionPool::Close() {
+}
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 96d89ac..5101c68 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -75,6 +75,11 @@ class ConnectionPool {
    */
   void Close(std::shared_ptr<ConnectionId> remote_id);
 
+  /**
+   * Close the Connection Pool
+   */
+  void Close();
+
  private:
   std::shared_ptr<RpcConnection> GetCachedConnection(
       std::shared_ptr<ConnectionId> remote_id);
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 66ec231..3f0cfaf 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -43,13 +43,15 @@ class RpcChannelImplementation : public AbstractRpcChannel {
 }  // namespace hbase
 
 RpcClient::RpcClient() {
-  auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(
+  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(
       sysconf(_SC_NPROCESSORS_ONLN));
 
-  cp_ = std::make_shared<ConnectionPool>(io_executor);
+  cp_ = std::make_shared<ConnectionPool>(io_executor_);
 }
 
-void RpcClient::Close() {}
+void RpcClient::Close() {
+  io_executor_->stop();
+}
 
 std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host,
                                               uint16_t port,
@@ -114,6 +116,8 @@ void RpcClient::CallMethod(const MethodDescriptor* method,
   std::unique_ptr<Request> req =
       std::make_unique<Request>(shared_req, shared_resp, method->name());
 
-  AsyncCall(host, port, std::move(req), ticket)
-      .then([done, this](Response resp) { done->Run(); });
+  AsyncCall(host, port, std::move(req), ticket, method->service()->name())
+      .then([done, this](Response resp) {
+	  done->Run();
+  });
 }
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index c24db9d..dbf857d 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -91,6 +91,7 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
 
  private:
   std::shared_ptr<ConnectionPool> cp_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
 };
 
 class AbstractRpcChannel : public RpcChannel {
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc
index d863d50..e4ade22 100644
--- a/hbase-native-client/serde/rpc.cc
+++ b/hbase-native-client/serde/rpc.cc
@@ -110,6 +110,7 @@ unique_ptr<IOBuf> RpcSerde::Header(const string &user) {
   // That may or may not be the correct thing to do.
   // It worked for a while with the java client; until it
   // didn't.
+  // TODO: send the service name and user from the RpcClient
   h.set_service_name(INTERFACE);
   return PrependLength(SerializeMessage(h));
 }