You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/01/13 19:29:11 UTC
hbase git commit: HBASE-17463 [C++] RpcClient should close the thread
pool
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 85a96220f -> db30da651
HBASE-17463 [C++] RpcClient should close the thread pool
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/db30da65
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/db30da65
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/db30da65
Branch: refs/heads/HBASE-14850
Commit: db30da651477b2bbd3540763276c7fc9909a9a06
Parents: 85a9622
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Jan 13 11:29:00 2017 -0800
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jan 13 11:29:00 2017 -0800
----------------------------------------------------------------------
hbase-native-client/connection/connection-pool.cc | 3 +++
hbase-native-client/connection/connection-pool.h | 5 +++++
hbase-native-client/connection/rpc-client.cc | 14 +++++++++-----
hbase-native-client/connection/rpc-client.h | 1 +
hbase-native-client/serde/rpc.cc | 1 +
5 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/db30da65/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index e022f9e..ee14c9d 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -117,3 +117,6 @@ void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
found->second->Close();
connections_.erase(found);
}
+
+void ConnectionPool::Close() {
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db30da65/hbase-native-client/connection/connection-pool.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 96d89ac..5101c68 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -75,6 +75,11 @@ class ConnectionPool {
*/
void Close(std::shared_ptr<ConnectionId> remote_id);
+ /**
+ * Close the Connection Pool
+ */
+ void Close();
+
private:
std::shared_ptr<RpcConnection> GetCachedConnection(
std::shared_ptr<ConnectionId> remote_id);
http://git-wip-us.apache.org/repos/asf/hbase/blob/db30da65/hbase-native-client/connection/rpc-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 66ec231..3f0cfaf 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -43,13 +43,15 @@ class RpcChannelImplementation : public AbstractRpcChannel {
} // namespace hbase
RpcClient::RpcClient() {
- auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(
+ io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(
sysconf(_SC_NPROCESSORS_ONLN));
- cp_ = std::make_shared<ConnectionPool>(io_executor);
+ cp_ = std::make_shared<ConnectionPool>(io_executor_);
}
-void RpcClient::Close() {}
+void RpcClient::Close() {
+ io_executor_->stop();
+}
std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host,
uint16_t port,
@@ -114,6 +116,8 @@ void RpcClient::CallMethod(const MethodDescriptor* method,
std::unique_ptr<Request> req =
std::make_unique<Request>(shared_req, shared_resp, method->name());
- AsyncCall(host, port, std::move(req), ticket)
- .then([done, this](Response resp) { done->Run(); });
+ AsyncCall(host, port, std::move(req), ticket, method->service()->name())
+ .then([done, this](Response resp) {
+ done->Run();
+ });
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db30da65/hbase-native-client/connection/rpc-client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index c24db9d..dbf857d 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -91,6 +91,7 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
private:
std::shared_ptr<ConnectionPool> cp_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
};
class AbstractRpcChannel : public RpcChannel {
http://git-wip-us.apache.org/repos/asf/hbase/blob/db30da65/hbase-native-client/serde/rpc.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc
index d863d50..e4ade22 100644
--- a/hbase-native-client/serde/rpc.cc
+++ b/hbase-native-client/serde/rpc.cc
@@ -110,6 +110,7 @@ unique_ptr<IOBuf> RpcSerde::Header(const string &user) {
// That may or may not be the correct thing to do.
// It worked for a while with the java client; until it
// didn't.
+ // TODO: send the service name and user from the RpcClient
h.set_service_name(INTERFACE);
return PrependLength(SerializeMessage(h));
}