You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:06 UTC
[hbase] 18/133: HBASE-15731 Add on a connection pool
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1131bff0b2db4b1384e0677ec7ac0cc687aae8f2
Author: Elliott Clark <ec...@apache.org>
AuthorDate: Thu Apr 28 19:53:42 2016 -0700
HBASE-15731 Add on a connection pool
Summary:
Add on a connection pool protected by read write mutex.
Add on a service filter that will remove a connection from a connection pool when closed
Test Plan: Need to add on tests.
Differential Revision: https://reviews.facebook.net/D57411
---
hbase-native-client/connection/BUCK | 5 +
.../connection/connection-factory.cc | 4 +-
.../connection/connection-factory.h | 5 +-
.../connection/connection-pool-test.cc | 77 +++++++++++++++
hbase-native-client/connection/connection-pool.cc | 89 +++++++++++++++++
hbase-native-client/connection/connection-pool.h | 59 ++++++++++++
hbase-native-client/connection/service.h | 4 +-
hbase-native-client/core/BUCK | 4 -
hbase-native-client/core/client.h | 3 -
hbase-native-client/core/get-request.cc | 19 ----
hbase-native-client/core/get-request.h | 35 -------
hbase-native-client/core/get-result.cc | 19 ----
hbase-native-client/core/get-result.h | 32 -------
hbase-native-client/core/location-cache.cc | 1 -
hbase-native-client/core/simple-client.cc | 105 ++++++++++++---------
15 files changed, 297 insertions(+), 164 deletions(-)
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index d393885..96f2136 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -22,6 +22,7 @@ cxx_library(name="connection",
"client-dispatcher.h",
"client-handler.h",
"connection-factory.h",
+ "connection-pool.h",
"pipeline.h",
"request.h",
"response.h",
@@ -31,6 +32,7 @@ cxx_library(name="connection",
"client-dispatcher.cc",
"client-handler.cc",
"connection-factory.cc",
+ "connection-pool.cc",
"pipeline.cc",
"request.cc",
],
@@ -42,3 +44,6 @@ cxx_library(name="connection",
"//third-party:wangle",
],
visibility=['//core/...', ], )
+cxx_test(name="connection-pool-test",
+ srcs=["connection-pool-test.cc", ],
+ deps=[":connection", ], )
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 7073f9d..b546269 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -45,8 +45,8 @@ ConnectionFactory::ConnectionFactory() {
bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
}
-std::shared_ptr<Service<std::unique_ptr<Request>, Response>>
-ConnectionFactory::make_connection(std::string host, int port) {
+std::shared_ptr<HBaseService>
+ConnectionFactory::make_connection(const std::string &host, int port) {
// Connect to a given server
// Then when connected create a ClientDispactcher.
auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get();
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index 8d1d2f0..5a45316 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -31,8 +31,9 @@ namespace hbase {
class ConnectionFactory {
public:
ConnectionFactory();
- std::shared_ptr<wangle::Service<std::unique_ptr<Request>, Response>>
- make_connection(std::string host, int port);
+
+ virtual std::shared_ptr<HBaseService> make_connection(const std::string &host,
+ int port);
private:
wangle::ClientBootstrap<SerializePipeline> bootstrap_;
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
new file mode 100644
index 0000000..975bc5e
--- /dev/null
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/connection-pool.h"
+
+#include <folly/Logging.h>
+#include <gmock/gmock.h>
+
+#include "connection/connection-factory.h"
+#include "if/HBase.pb.h"
+
+using namespace hbase;
+
+using hbase::pb::ServerName;
+using ::testing::Return;
+using ::testing::_;
+
+class MockConnectionFactory : public ConnectionFactory {
+public:
+ MOCK_METHOD2(make_connection,
+ std::shared_ptr<HBaseService>(const std::string &hostname,
+ int port));
+};
+
+class MockServiceBase : public HBaseService {
+public:
+ folly::Future<Response> operator()(std::unique_ptr<Request> req) override {
+ return do_operation(req.get());
+ }
+ virtual folly::Future<Response> do_operation(Request *req) {
+ return folly::makeFuture<Response>(Response{});
+ }
+};
+
+class MockService : public MockServiceBase {
+public:
+ MOCK_METHOD1(do_operation, folly::Future<Response>(Request *));
+};
+
+TEST(TestConnectionPool, TestOnlyCreateOnce) {
+ std::string hostname{"hostname"};
+ auto mock_service = std::make_shared<MockService>();
+ uint32_t port{999};
+
+ LOG(ERROR) << "About to make a MockConnectionFactory";
+ auto mock_cf = std::make_shared<MockConnectionFactory>();
+ EXPECT_CALL((*mock_cf), make_connection(_, _))
+ .Times(1)
+ .WillRepeatedly(Return(mock_service));
+ ConnectionPool cp{mock_cf};
+
+ LOG(ERROR) << "Created ConnectionPool";
+
+ ServerName sn;
+ sn.set_host_name(hostname);
+ sn.set_port(port);
+
+ auto result = cp.get(sn);
+ ASSERT_TRUE(result != nullptr);
+ result = cp.get(sn);
+}
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
new file mode 100644
index 0000000..a967df2
--- /dev/null
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/connection-pool.h"
+
+#include <wangle/service/Service.h>
+
+using std::mutex;
+using std::unique_ptr;
+using std::shared_ptr;
+using hbase::pb::ServerName;
+using wangle::ServiceFilter;
+using folly::SharedMutexWritePriority;
+
+namespace hbase {
+
+class RemoveServiceFilter
+ : public ServiceFilter<unique_ptr<Request>, Response> {
+
+public:
+ RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn,
+ ConnectionPool *cp)
+ : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn),
+ cp_(cp) {}
+
+ folly::Future<folly::Unit> close() override {
+ if (!released.exchange(true)) {
+ return this->service_->close().then(
+ [this]() { this->cp_->close(this->sn_); });
+ } else {
+ return folly::makeFuture();
+ }
+ }
+
+ virtual bool isAvailable() override { return service_->isAvailable(); }
+
+ folly::Future<Response> operator()(unique_ptr<Request> req) override {
+ return (*this->service_)(std::move(req));
+ }
+
+private:
+ std::atomic<bool> released{false};
+ hbase::pb::ServerName sn_;
+ ConnectionPool *cp_;
+};
+
+ConnectionPool::ConnectionPool() : cf_(std::make_shared<ConnectionFactory>()) {}
+ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf)
+ : cf_(cf) {}
+
+std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) {
+ SharedMutexWritePriority::UpgradeHolder holder(map_mutex_);
+ auto found = connections_.find(sn);
+ if (found == connections_.end() || found->second == nullptr) {
+ SharedMutexWritePriority::WriteHolder holder(std::move(holder));
+ auto new_con = cf_->make_connection(sn.host_name(), sn.port());
+ auto wrapped = std::make_shared<RemoveServiceFilter>(new_con, sn, this);
+ connections_[sn] = wrapped;
+ return new_con;
+ }
+ return found->second;
+}
+void ConnectionPool::close(ServerName sn) {
+ SharedMutexWritePriority::WriteHolder holder(map_mutex_);
+
+ auto found = connections_.find(sn);
+ if (found == connections_.end() || found->second == nullptr) {
+ return;
+ }
+ auto service = found->second;
+ connections_.erase(found);
+}
+}
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
new file mode 100644
index 0000000..394cd71
--- /dev/null
+++ b/hbase-native-client/connection/connection-pool.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/SharedMutex.h>
+#include <mutex>
+#include <unordered_map>
+
+#include "connection/connection-factory.h"
+#include "connection/service.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+struct MyServerNameEquals {
+ bool operator()(const hbase::pb::ServerName &lhs,
+ const hbase::pb::ServerName &rhs) const {
+ return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port();
+ }
+};
+struct MyServerNameHash {
+ std::size_t operator()(hbase::pb::ServerName const &s) const {
+ std::size_t h1 = std::hash<std::string>()(s.host_name());
+ std::size_t h2 = std::hash<uint32_t>()(s.port());
+ return h1 ^ (h2 << 1);
+ }
+};
+
+class ConnectionPool {
+public:
+ ConnectionPool();
+ explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf);
+ std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn);
+ void close(hbase::pb::ServerName sn);
+
+private:
+ std::shared_ptr<ConnectionFactory> cf_;
+ std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>,
+ MyServerNameHash, MyServerNameEquals>
+ connections_;
+ folly::SharedMutexWritePriority map_mutex_;
+};
+
+} // namespace hbase
diff --git a/hbase-native-client/connection/service.h b/hbase-native-client/connection/service.h
index feb14ec..79f087d 100644
--- a/hbase-native-client/connection/service.h
+++ b/hbase-native-client/connection/service.h
@@ -18,9 +18,11 @@
*/
#pragma once
+#include <wangle/service/Service.h>
+
#include "connection/request.h"
#include "connection/response.h"
namespace hbase {
-using HBaseService = wangle::Service<Request, Response>;
+using HBaseService = wangle::Service<std::unique_ptr<Request>, Response>;
} // namespace hbase
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 9db6fda..e555ba4 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -21,8 +21,6 @@ cxx_library(
exported_headers=[
"client.h",
"connection.h",
- "get-request.h",
- "get-result.h",
"hbase_macros.h",
"location-cache.h",
"table-name.h",
@@ -32,8 +30,6 @@ cxx_library(
],
srcs=[
"client.cc",
- "get-request.cc",
- "get-result.cc",
"location-cache.cc",
"meta-utils.cc",
"table-name.cc",
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index b583285..4bed751 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -24,8 +24,6 @@
#include <string>
-#include "core/get-request.h"
-#include "core/get-result.h"
#include "core/location-cache.h"
#include "if/Cell.pb.h"
@@ -33,7 +31,6 @@ namespace hbase {
class Client {
public:
explicit Client(std::string quorum_spec);
- folly::Future<GetResult> get(const GetRequest &get_request);
private:
LocationCache location_cache_;
diff --git a/hbase-native-client/core/get-request.cc b/hbase-native-client/core/get-request.cc
deleted file mode 100644
index e927ccc..0000000
--- a/hbase-native-client/core/get-request.cc
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "core/get-request.h"
diff --git a/hbase-native-client/core/get-request.h b/hbase-native-client/core/get-request.h
deleted file mode 100644
index bb755c5..0000000
--- a/hbase-native-client/core/get-request.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <string>
-
-#include "core/table-name.h"
-
-namespace hbase {
-
-class GetRequest {
-public:
- GetRequest(TableName table_name, std::string key);
-
-private:
- TableName table_name_;
- std::string key_;
-};
-} // namespace hbase
diff --git a/hbase-native-client/core/get-result.cc b/hbase-native-client/core/get-result.cc
deleted file mode 100644
index 7eea483..0000000
--- a/hbase-native-client/core/get-result.cc
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "core/get-result.h"
diff --git a/hbase-native-client/core/get-result.h b/hbase-native-client/core/get-result.h
deleted file mode 100644
index a49ad98..0000000
--- a/hbase-native-client/core/get-result.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <string>
-
-namespace hbase {
-
-class GetResult {
-public:
- explicit GetResult(std::string key);
-
-private:
- std::string key_;
-};
-} // namespace hbase
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 5925f4a..c81deba 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -77,7 +77,6 @@ ServerName LocationCache::ReadMetaLocation() {
int zk_result =
zoo_get(this->zk_, META_ZNODE_NAME, 0,
reinterpret_cast<char *>(buf->writableData()), &len, nullptr);
- LOG(ERROR) << "len = " << len;
if (zk_result != ZOK || len < 9) {
LOG(ERROR) << "Error getting meta location.";
throw runtime_error("Error getting meta location");
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 2cb6200..11dcd68 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -25,7 +25,7 @@
#include <chrono>
#include <iostream>
-#include "connection/connection-factory.h"
+#include "connection/connection-pool.h"
#include "core/client.h"
#include "if/Client.pb.h"
#include "if/ZooKeeper.pb.h"
@@ -33,13 +33,20 @@
using namespace folly;
using namespace std;
using namespace std::chrono;
-using namespace hbase;
-using namespace hbase::pb;
-using namespace google::protobuf;
+using hbase::Response;
+using hbase::Request;
+using hbase::HBaseService;
+using hbase::LocationCache;
+using hbase::ConnectionPool;
+using hbase::pb::ServerName;
+using hbase::pb::RegionSpecifier_RegionSpecifierType;
+using hbase::pb::GetRequest;
+using hbase::pb::GetResponse;
// TODO(eclark): remove the need for this.
DEFINE_string(region, "1588230740", "What region to send a get to");
DEFINE_string(row, "test", "What row to get");
+DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
int main(int argc, char *argv[]) {
google::SetUsageMessage(
@@ -48,46 +55,52 @@ int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
// Create a connection factory
- ConnectionFactory cf;
-
- LocationCache cache{"localhost:2181", wangle::getCPUExecutor()};
-
- auto result = cache.LocateMeta().get();
-
- // Create a connection to the local host
- auto conn = cf.make_connection(result.host_name(), result.port());
-
- // Send the request
- auto r = Request::get();
-
- // This is a get request so make that
- auto req_msg = static_pointer_cast<hbase::pb::GetRequest>(r->req_msg());
-
- // Set what region
- req_msg->mutable_region()->set_value(FLAGS_region);
- // It's always this.
- req_msg->mutable_region()->set_type(
- RegionSpecifier_RegionSpecifierType::
- RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
-
- // What row.
- req_msg->mutable_get()->set_row(FLAGS_row);
-
- // Send it.
- auto resp = (*conn)(std::move(r)).get(milliseconds(5000));
-
- auto get_resp = std::static_pointer_cast<GetResponse>(resp.response());
- cout << "GetResponse has_result = " << get_resp->has_result() << '\n';
- if (get_resp->has_result()) {
- auto &r = get_resp->result();
- cout << "Result cell_size = " << r.cell_size() << endl;
- for (auto &cell : r.cell()) {
- cout << "\trow = " << cell.row() << " family = " << cell.family()
- << " qualifier = " << cell.qualifier()
- << " timestamp = " << cell.timestamp() << " value = " << cell.value()
- << endl;
- }
- }
-
- return 0;
+ ConnectionPool cp;
+ auto cpu_ex = wangle::getCPUExecutor();
+ LocationCache cache{FLAGS_zookeeper, cpu_ex};
+ auto result =
+ cache.LocateMeta()
+ .then([&cp = cp](ServerName sn) { return cp.get(sn); })
+ .then([](shared_ptr<HBaseService> con) {
+ // Send the request
+ auto r = Request::get();
+ // This is a get request so make that
+ auto req_msg = static_pointer_cast<GetRequest>(r->req_msg());
+ // Set what region
+ req_msg->mutable_region()->set_value(FLAGS_region);
+ // It's always this.
+ req_msg->mutable_region()->set_type(
+ RegionSpecifier_RegionSpecifierType::
+ RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
+
+ // What row.
+ req_msg->mutable_get()->set_row(FLAGS_row);
+
+ return (*con)(std::move(r));
+ })
+ .then([](Response resp) {
+ return static_pointer_cast<GetResponse>(resp.response());
+ })
+ .via(cpu_ex.get())
+ .then([](shared_ptr<GetResponse> get_resp) {
+ cout << "GetResponse has_result = " << get_resp->has_result()
+ << '\n';
+ if (get_resp->has_result()) {
+ auto &r = get_resp->result();
+ cout << "Result cell_size = " << r.cell_size() << endl;
+ for (auto &cell : r.cell()) {
+ cout << "\trow = " << cell.row()
+ << " family = " << cell.family()
+ << " qualifier = " << cell.qualifier()
+ << " timestamp = " << cell.timestamp()
+ << " value = " << cell.value() << endl;
+ }
+ return 0;
+ }
+
+ return 1;
+ })
+ .get(milliseconds(5000));
+
+ return result;
}