You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2016/07/11 23:48:05 UTC
[12/50] [abbrv] hbase git commit: HBASE-15766 Show working puts
HBASE-15766 Show working puts
Summary:
Add on showing how a set of puts to a single connection will work.
This still needs retries and looking up what region each request is going to
Test Plan:
./buck-out/gen/core/simple-client -columns 100
../bin/hbase shell
count 't'
100 row(s) in 0.2470 seconds
Differential Revision: https://reviews.facebook.net/D57603
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/79b5085d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/79b5085d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/79b5085d
Branch: refs/heads/HBASE-14850
Commit: 79b5085db5ca6283714ec5be2f5439d4d5d5eb72
Parents: f034294
Author: Elliott Clark <ec...@apache.org>
Authored: Wed May 4 01:54:21 2016 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Mon Jul 11 16:47:26 2016 -0700
----------------------------------------------------------------------
.../connection/client-dispatcher.cc | 9 +--
.../connection/client-dispatcher.h | 11 ++-
.../connection/client-handler.cc | 13 ++--
hbase-native-client/connection/client-handler.h | 4 +-
.../connection/connection-factory.cc | 46 ++++++------
.../connection/connection-factory.h | 11 ++-
.../connection/connection-pool-test.cc | 54 +++++++++++---
.../connection/connection-pool.cc | 33 +++++++--
.../connection/connection-pool.h | 10 ++-
hbase-native-client/core/client.cc | 6 +-
hbase-native-client/core/location-cache.cc | 25 +++----
hbase-native-client/core/location-cache.h | 6 +-
hbase-native-client/core/meta-utils.cc | 6 +-
hbase-native-client/core/meta-utils.h | 4 +-
hbase-native-client/core/region-location.h | 8 ++-
hbase-native-client/core/simple-client.cc | 76 +++++++++++++++++---
.../serde/region-info-deserializer-test.cc | 1 -
hbase-native-client/serde/region-info.h | 6 +-
18 files changed, 231 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/client-dispatcher.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 6e2dc54..655d765 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -22,10 +22,11 @@ using namespace folly;
using namespace hbase;
using namespace wangle;
-ClientDispatcher::ClientDispatcher() : requests_(), current_call_id_(9) {}
+ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {}
void ClientDispatcher::read(Context *ctx, Response in) {
auto call_id = in.call_id();
+
auto search = requests_.find(call_id);
CHECK(search != requests_.end());
auto p = std::move(search->second);
@@ -38,10 +39,10 @@ void ClientDispatcher::read(Context *ctx, Response in) {
}
Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
- auto call_id = ++current_call_id_;
-
+ auto call_id = current_call_id_++;
arg->set_call_id(call_id);
- auto &p = requests_[call_id];
+ requests_.insert(call_id, Promise<Response>{});
+ auto &p = requests_.find(call_id)->second;
auto f = p.getFuture();
p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
LOG(ERROR) << "e = " << call_id;
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/client-dispatcher.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h
index 826fc6a..4435a1b 100644
--- a/hbase-native-client/connection/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -19,8 +19,12 @@
#pragma once
+#include <folly/AtomicHashMap.h>
+#include <folly/Logging.h>
#include <wangle/service/ClientDispatcher.h>
+#include <atomic>
+
#include "connection/pipeline.h"
#include "connection/request.h"
#include "connection/response.h"
@@ -31,13 +35,16 @@ class ClientDispatcher
std::unique_ptr<Request>, Response> {
public:
ClientDispatcher();
+ ~ClientDispatcher() {
+ LOG(ERROR) << "Killing ClientDispatcher call_id = " << current_call_id_;
+ }
void read(Context *ctx, Response in) override;
folly::Future<Response> operator()(std::unique_ptr<Request> arg) override;
folly::Future<folly::Unit> close(Context *ctx) override;
folly::Future<folly::Unit> close() override;
private:
- std::unordered_map<uint32_t, folly::Promise<Response>> requests_;
+ folly::AtomicHashMap<uint32_t, folly::Promise<Response>> requests_;
// Start at some number way above what could
// be there for un-initialized call id counters.
//
@@ -46,6 +53,6 @@ private:
//
// uint32_t has a max of 4Billion so 10 more or less is
// not a big deal.
- uint32_t current_call_id_;
+ std::atomic<uint32_t> current_call_id_;
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 496e4f2..b92ad89 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -37,7 +37,10 @@ using hbase::pb::GetResponse;
using google::protobuf::Message;
ClientHandler::ClientHandler(std::string user_name)
- : user_name_(user_name), need_send_header_(true), serde_(), resp_msgs_() {}
+ : user_name_(user_name), need_send_header_(true), serde_(),
+ resp_msgs_(
+ make_unique<folly::AtomicHashMap<
+ uint32_t, std::shared_ptr<google::protobuf::Message>>>(5000)) {}
void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
if (LIKELY(buf != nullptr)) {
@@ -51,14 +54,14 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
<< " has_exception=" << header.has_exception();
// Get the response protobuf from the map
- auto search = resp_msgs_.find(header.call_id());
+ auto search = resp_msgs_->find(header.call_id());
// It's an error if it's not there.
- CHECK(search != resp_msgs_.end());
+ CHECK(search != resp_msgs_->end());
auto resp_msg = search->second;
CHECK(resp_msg != nullptr);
// Make sure we don't leak the protobuf
- resp_msgs_.erase(search);
+ resp_msgs_->erase(header.call_id());
// set the call_id.
// This will be used to by the dispatcher to match up
@@ -96,7 +99,7 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
ctx->fireWrite(std::move(pre));
}
- resp_msgs_[r->call_id()] = r->resp_msg();
+ resp_msgs_->insert(r->call_id(), r->resp_msg());
return ctx->fireWrite(
serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/client-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index ce99c9e..be5143c 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -18,6 +18,7 @@
*/
#pragma once
+#include <folly/AtomicHashMap.h>
#include <wangle/channel/Handler.h>
#include <string>
@@ -51,7 +52,8 @@ private:
RpcSerde serde_;
// in flight requests
- std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>>
+ std::unique_ptr<folly::AtomicHashMap<
+ uint32_t, std::shared_ptr<google::protobuf::Message>>>
resp_msgs_;
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 9102d60..635d12d 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -19,40 +19,36 @@
#include "connection/connection-factory.h"
-#include <folly/futures/Future.h>
-#include <wangle/bootstrap/ClientBootstrap.h>
-#include <wangle/channel/AsyncSocketHandler.h>
-#include <wangle/channel/EventBaseHandler.h>
-#include <wangle/channel/OutputBufferingHandler.h>
-#include <wangle/service/ClientDispatcher.h>
-#include <wangle/service/CloseOnReleaseFilter.h>
-#include <wangle/service/ExpiringFilter.h>
-
-#include <string>
+#include <wangle/concurrent/GlobalExecutor.h>
#include "connection/client-dispatcher.h"
#include "connection/pipeline.h"
-#include "connection/request.h"
-#include "connection/response.h"
#include "connection/service.h"
using namespace folly;
using namespace hbase;
-using namespace wangle;
-ConnectionFactory::ConnectionFactory() : bootstrap_() {
- bootstrap_.group(std::make_shared<wangle::IOThreadPoolExecutor>(1));
- bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
-}
+ConnectionFactory::ConnectionFactory()
+ : io_pool_(std::static_pointer_cast<wangle::IOThreadPoolExecutor>(
+ wangle::getIOExecutor())),
+ pipeline_factory_(std::make_shared<RpcPipelineFactory>()) {}
-std::shared_ptr<HBaseService>
-ConnectionFactory::make_connection(const std::string &host, int port) {
- // Connect to a given server
- // Then when connected create a ClientDispactcher.
- auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get();
+std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>
+ConnectionFactory::MakeBootstrap() {
+ auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();
+ client->group(io_pool_);
+ client->pipelineFactory(pipeline_factory_);
+
+ return client;
+}
+std::shared_ptr<HBaseService> ConnectionFactory::Connect(
+ std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client,
+ const std::string &hostname, int port) {
+ // Yes this will block however it makes dealing with connection pool soooooo
+ // much nicer.
+ // TODO see about using shared promise for this.
+ auto pipeline = client->connect(SocketAddress(hostname, port, true)).get();
auto dispatcher = std::make_shared<ClientDispatcher>();
dispatcher->setPipeline(pipeline);
- auto service = std::make_shared<
- CloseOnReleaseFilter<std::unique_ptr<Request>, Response>>(dispatcher);
- return service;
+ return dispatcher;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index fc4e161..2284a7c 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -33,10 +33,15 @@ public:
ConnectionFactory();
virtual ~ConnectionFactory() = default;
- virtual std::shared_ptr<HBaseService> make_connection(const std::string &host,
- int port);
+ virtual std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>
+ MakeBootstrap();
+
+ virtual std::shared_ptr<HBaseService>
+ Connect(std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client,
+ const std::string &hostname, int port);
private:
- wangle::ClientBootstrap<SerializePipeline> bootstrap_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool_;
+ std::shared_ptr<RpcPipelineFactory> pipeline_factory_;
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
index 975bc5e..b1a0ba0 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -24,6 +24,7 @@
#include "connection/connection-factory.h"
#include "if/HBase.pb.h"
+#include "serde/server-name.h"
using namespace hbase;
@@ -33,11 +34,16 @@ using ::testing::_;
class MockConnectionFactory : public ConnectionFactory {
public:
- MOCK_METHOD2(make_connection,
- std::shared_ptr<HBaseService>(const std::string &hostname,
- int port));
+ MOCK_METHOD0(MakeBootstrap,
+ std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>());
+ MOCK_METHOD3(Connect,
+ std::shared_ptr<HBaseService>(
+ std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
+ const std::string &hostname, int port));
};
+class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {};
+
class MockServiceBase : public HBaseService {
public:
folly::Future<Response> operator()(std::unique_ptr<Request> req) override {
@@ -54,19 +60,20 @@ public:
};
TEST(TestConnectionPool, TestOnlyCreateOnce) {
- std::string hostname{"hostname"};
+ auto hostname = std::string{"hostname"};
+ auto mock_boot = std::make_shared<MockBootstrap>();
auto mock_service = std::make_shared<MockService>();
+ auto mock_cf = std::make_shared<MockConnectionFactory>();
uint32_t port{999};
- LOG(ERROR) << "About to make a MockConnectionFactory";
- auto mock_cf = std::make_shared<MockConnectionFactory>();
- EXPECT_CALL((*mock_cf), make_connection(_, _))
+ EXPECT_CALL((*mock_cf), Connect(_, _, _))
.Times(1)
.WillRepeatedly(Return(mock_service));
+ EXPECT_CALL((*mock_cf), MakeBootstrap())
+ .Times(1)
+ .WillRepeatedly(Return(mock_boot));
ConnectionPool cp{mock_cf};
- LOG(ERROR) << "Created ConnectionPool";
-
ServerName sn;
sn.set_host_name(hostname);
sn.set_port(port);
@@ -75,3 +82,32 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) {
ASSERT_TRUE(result != nullptr);
result = cp.get(sn);
}
+
+TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) {
+ std::string hostname_one{"hostname"};
+ std::string hostname_two{"hostname_two"};
+ uint32_t port{999};
+
+ auto mock_boot = std::make_shared<MockBootstrap>();
+ auto mock_service = std::make_shared<MockService>();
+ auto mock_cf = std::make_shared<MockConnectionFactory>();
+
+ EXPECT_CALL((*mock_cf), Connect(_, _, _))
+ .Times(2)
+ .WillRepeatedly(Return(mock_service));
+ EXPECT_CALL((*mock_cf), MakeBootstrap())
+ .Times(2)
+ .WillRepeatedly(Return(mock_boot));
+ ConnectionPool cp{mock_cf};
+
+ {
+ auto result_one = cp.get(folly::to<ServerName>(
+ hostname_one + ":" + folly::to<std::string>(port)));
+ auto result_two = cp.get(folly::to<ServerName>(
+ hostname_two + ":" + folly::to<std::string>(port)));
+ }
+ auto result_one = cp.get(
+ folly::to<ServerName>(hostname_one + ":" + folly::to<std::string>(port)));
+ auto result_two = cp.get(
+ folly::to<ServerName>(hostname_two + ":" + folly::to<std::string>(port)));
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index eafe60a..6ed5ad9 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -19,6 +19,7 @@
#include "connection/connection-pool.h"
+#include <folly/SocketAddress.h>
#include <wangle/service/Service.h>
using std::mutex;
@@ -26,28 +27,46 @@ using std::unique_ptr;
using std::shared_ptr;
using hbase::pb::ServerName;
using folly::SharedMutexWritePriority;
+using folly::SocketAddress;
namespace hbase {
ConnectionPool::ConnectionPool()
- : cf_(std::make_shared<ConnectionFactory>()), connections_(), map_mutex_() {
-}
+ : cf_(std::make_shared<ConnectionFactory>()), clients_(), connections_(),
+ map_mutex_() {}
ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf)
- : cf_(cf), connections_(), map_mutex_() {}
+ : cf_(cf), clients_(), connections_(), map_mutex_() {}
+
+ConnectionPool::~ConnectionPool() {
+ SharedMutexWritePriority::WriteHolder holder(map_mutex_);
+ for (auto &item : connections_) {
+ auto &con = item.second;
+ con->close();
+ }
+ connections_.clear();
+ clients_.clear();
+}
std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) {
+ // Create a read lock.
SharedMutexWritePriority::UpgradeHolder holder(map_mutex_);
+
auto found = connections_.find(sn);
if (found == connections_.end() || found->second == nullptr) {
+ // Move the upgradable lock into the write lock if the connection
+ // hasn't been found.
SharedMutexWritePriority::WriteHolder holder(std::move(holder));
- auto new_con = cf_->make_connection(sn.host_name(), sn.port());
- connections_[sn] = new_con;
- return new_con;
+ auto client = cf_->MakeBootstrap();
+ auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port());
+ clients_.insert(std::make_pair(sn, client));
+ connections_.insert(std::make_pair(sn, dispatcher));
+ return dispatcher;
}
return found->second;
}
+
void ConnectionPool::close(const ServerName &sn) {
- SharedMutexWritePriority::WriteHolder holder(map_mutex_);
+ SharedMutexWritePriority::WriteHolder holder{map_mutex_};
auto found = connections_.find(sn);
if (found == connections_.end() || found->second == nullptr) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-pool.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index b8330e3..907afdb 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -37,23 +37,29 @@ struct ServerNameHash {
std::size_t operator()(hbase::pb::ServerName const &s) const {
std::size_t h1 = std::hash<std::string>()(s.host_name());
std::size_t h2 = std::hash<uint32_t>()(s.port());
- return h1 ^ (h2 << 1);
+ return h1 ^ (h2 << 2);
}
};
class ConnectionPool {
public:
ConnectionPool();
+ ~ConnectionPool();
explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf);
std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn);
void close(const hbase::pb::ServerName &sn);
private:
- std::shared_ptr<ConnectionFactory> cf_;
std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>,
ServerNameHash, ServerNameEquals>
connections_;
+ std::unordered_map<
+ hbase::pb::ServerName,
+ std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
+ ServerNameHash, ServerNameEquals>
+ clients_;
folly::SharedMutexWritePriority map_mutex_;
+ std::shared_ptr<ConnectionFactory> cf_;
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 266c239..4b9f844 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -33,8 +33,4 @@ using namespace folly;
using namespace std;
using namespace hbase::pb;
-namespace hbase {
-
-Client::Client(string quorum_spec)
- : location_cache_(quorum_spec, wangle::getCPUExecutor()) {}
-}
+namespace hbase {}
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 2667f11..e2a6251 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -25,8 +25,8 @@
#include "connection/response.h"
#include "if/Client.pb.h"
#include "if/ZooKeeper.pb.h"
-#include "serde/server-name.h"
#include "serde/region-info.h"
+#include "serde/server-name.h"
#include "serde/zk.h"
using namespace std;
@@ -109,17 +109,22 @@ ServerName LocationCache::ReadMetaLocation() {
Future<std::shared_ptr<RegionLocation>>
LocationCache::LocateFromMeta(const TableName &tn, const string &row) {
- auto exc = wangle::getIOExecutor();
+ auto exec = wangle::getCPUExecutor();
return this->LocateMeta()
- .then([&](ServerName sn) { return this->cp_.get(sn); })
- .via(exc.get()) // Need to handle all rpc's on the IOExecutor.
+ .via(exec.get())
+ .then([ exec = exec, this ](ServerName sn) { return this->cp_.get(sn); })
.then([&](std::shared_ptr<HBaseService> service) {
return (*service)(std::move(meta_util_.MetaRequest(tn, row)));
})
- .then([&](Response resp) {
+ .then([this](Response resp) {
// take the protobuf response and make it into
// a region location.
return this->CreateLocation(std::move(resp));
+ })
+ .then([ exec = exec, this ](std::shared_ptr<RegionLocation> rl) {
+ // Now fill out the connection.
+ rl->set_service(cp_.get(rl->server_name()));
+ return rl;
});
}
@@ -162,16 +167,12 @@ private:
};
std::shared_ptr<RegionLocation>
-LocationCache::CreateLocation(const Response &resp){
+LocationCache::CreateLocation(const Response &resp) {
auto resp_msg = static_pointer_cast<ScanResponse>(resp.response());
auto &results = resp_msg->results().Get(0);
auto &cells = results.cell();
- LOG(ERROR) << "resp_msg = " << resp_msg->DebugString();
auto ri = folly::to<RegionInfo>(cells.Get(0).value());
auto sn = folly::to<ServerName>(cells.Get(1).value());
-
- LOG(ERROR) << "RegionInfo = " << ri.DebugString();
- LOG(ERROR) << "ServerName = " << sn.DebugString();
- auto wrapped = make_shared<RemoveServiceFilter>(cp_.get(sn), sn, this->cp_);
- return std::make_shared<RegionLocation>(std::move(ri), std::move(sn), wrapped);
+ return std::make_shared<RegionLocation>(cells.Get(0).row(), std::move(ri), sn,
+ nullptr);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/location-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index 99b5e5e..7f76428 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -48,9 +48,10 @@ public:
// Meta Related Methods.
// These are only public until testing is complete
folly::Future<hbase::pb::ServerName> LocateMeta();
- folly::Future<std::shared_ptr<RegionLocation>> LocateFromMeta(const hbase::pb::TableName &tn,
- const std::string &row);
+ folly::Future<std::shared_ptr<RegionLocation>>
+ LocateFromMeta(const hbase::pb::TableName &tn, const std::string &row);
void InvalidateMeta();
+ ConnectionPool cp_;
private:
void RefreshMetaLocation();
@@ -61,7 +62,6 @@ private:
std::shared_ptr<folly::Executor> executor_;
std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_;
std::mutex meta_lock_;
- ConnectionPool cp_;
MetaUtil meta_util_;
// TODO: migrate this to a smart pointer with a deleter.
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/meta-utils.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index 1325d83..23d2041 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -37,12 +37,12 @@ using hbase::pb::RegionSpecifier_RegionSpecifierType;
static const std::string META_REGION = "1588230740";
std::string MetaUtil::RegionLookupRowkey(const TableName &tn,
- const std::string &row) const {
+ const std::string &row) const {
return folly::to<std::string>(tn, ",", row, ",", "999999999999999999");
}
-std::unique_ptr<Request>
-MetaUtil::MetaRequest(const TableName tn, const std::string &row) const {
+std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn,
+ const std::string &row) const {
auto request = Request::scan();
auto msg = std::static_pointer_cast<ScanRequest>(request->req_msg());
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/meta-utils.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h
index 5a659f3..dfef065 100644
--- a/hbase-native-client/core/meta-utils.h
+++ b/hbase-native-client/core/meta-utils.h
@@ -29,8 +29,8 @@ namespace hbase {
class MetaUtil {
public:
std::string RegionLookupRowkey(const hbase::pb::TableName &tn,
- const std::string &row) const;
+ const std::string &row) const;
std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn,
- const std::string &row) const;
+ const std::string &row) const;
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/region-location.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index 7922c95..7887526 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -27,15 +27,19 @@ namespace hbase {
class RegionLocation {
public:
- RegionLocation(hbase::pb::RegionInfo ri, hbase::pb::ServerName sn,
+ RegionLocation(std::string region_name, hbase::pb::RegionInfo ri,
+ hbase::pb::ServerName sn,
std::shared_ptr<HBaseService> service)
- : ri_(ri), sn_(sn), service_(service) {}
+ : region_name_(region_name), ri_(ri), sn_(sn), service_(service) {}
const hbase::pb::RegionInfo ®ion_info() { return ri_; }
const hbase::pb::ServerName &server_name() { return sn_; }
+ const std::string ®ion_name() { return region_name_; }
std::shared_ptr<HBaseService> service() { return service_; }
+ void set_service(std::shared_ptr<HBaseService> s) { service_ = s; }
private:
+ std::string region_name_;
hbase::pb::RegionInfo ri_;
hbase::pb::ServerName sn_;
std::shared_ptr<HBaseService> service_;
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 00e3369..39c82c3 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -19,16 +19,21 @@
#include <folly/Logging.h>
#include <folly/Random.h>
+#include <folly/futures/Future.h>
#include <gflags/gflags.h>
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
#include <wangle/concurrent/GlobalExecutor.h>
+#include <atomic>
#include <chrono>
#include <iostream>
+#include <thread>
#include "connection/connection-pool.h"
#include "core/client.h"
#include "if/Client.pb.h"
#include "if/ZooKeeper.pb.h"
+#include "serde/server-name.h"
#include "serde/table-name.h"
using namespace folly;
@@ -39,16 +44,41 @@ using hbase::Request;
using hbase::HBaseService;
using hbase::LocationCache;
using hbase::ConnectionPool;
+using hbase::ConnectionFactory;
using hbase::pb::TableName;
using hbase::pb::ServerName;
using hbase::pb::RegionSpecifier_RegionSpecifierType;
-using hbase::pb::GetRequest;
-using hbase::pb::GetResponse;
+using hbase::pb::MutateRequest;
+using hbase::pb::MutationProto_MutationType;
// TODO(eclark): remove the need for this.
DEFINE_string(table, "t", "What region to send a get");
DEFINE_string(row, "test", "What row to get");
DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
+DEFINE_uint64(columns, 10000, "How many columns to write");
+DEFINE_int32(threads, 6, "How many cpu threads");
+
+std::unique_ptr<Request> MakeRequest(uint64_t col, std::string region_name) {
+ auto req = Request::mutate();
+ auto msg = std::static_pointer_cast<MutateRequest>(req->req_msg());
+ auto region = msg->mutable_region();
+ auto suf = folly::to<std::string>(col);
+
+ region->set_value(region_name);
+ region->set_type(RegionSpecifier_RegionSpecifierType::
+ RegionSpecifier_RegionSpecifierType_REGION_NAME);
+ auto mutation = msg->mutable_mutation();
+ mutation->set_row(FLAGS_row + suf);
+ mutation->set_mutate_type(
+ MutationProto_MutationType::MutationProto_MutationType_PUT);
+ auto column = mutation->add_column_value();
+ column->set_family("d");
+ auto qual = column->add_qualifier_value();
+ qual->set_qualifier(suf);
+ qual->set_value(".");
+
+ return std::move(req);
+}
int main(int argc, char *argv[]) {
google::SetUsageMessage(
@@ -56,13 +86,41 @@ int main(int argc, char *argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
- // Create a connection factory
- ConnectionPool cp;
- auto cpu_ex = wangle::getCPUExecutor();
- LocationCache cache{FLAGS_zookeeper, cpu_ex};
- auto result =
- cache.LocateFromMeta(folly::to<TableName>(FLAGS_table), FLAGS_row)
- .get(milliseconds(5000));
+ // Set up thread pools.
+ auto cpu_pool =
+ std::make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_threads);
+ wangle::setCPUExecutor(cpu_pool);
+ auto io_pool = std::make_shared<wangle::IOThreadPoolExecutor>(5);
+ wangle::setIOExecutor(io_pool);
+
+ // Create the cache.
+ LocationCache cache{FLAGS_zookeeper, cpu_pool};
+
+ auto row = FLAGS_row;
+ auto tn = folly::to<TableName>(FLAGS_table);
+
+ auto loc = cache.LocateFromMeta(tn, row).get(milliseconds(5000));
+ auto connection = loc->service();
+
+ auto num_puts = FLAGS_columns;
+
+ auto results = std::vector<Future<Response>>{};
+ uint64_t col{0};
+ for (; col < num_puts; col++) {
+ results.push_back(folly::makeFuture(col)
+ .via(cpu_pool.get())
+ .then([loc](uint64_t col) {
+ return MakeRequest(col, loc->region_name());
+ })
+ .then([connection](std::unique_ptr<Request> req) {
+ return (*connection)(std::move(req));
+ }));
+ }
+ auto allf = folly::collect(results).get();
+
+ LOG(ERROR) << "Successfully sent " << allf.size() << " requests.";
+
+ io_pool->stop();
return 0;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/serde/region-info-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info-deserializer-test.cc b/hbase-native-client/serde/region-info-deserializer-test.cc
index ce8dedf..5cb8482 100644
--- a/hbase-native-client/serde/region-info-deserializer-test.cc
+++ b/hbase-native-client/serde/region-info-deserializer-test.cc
@@ -44,7 +44,6 @@ TEST(TestRegionInfoDesializer, TestDeserialize) {
ri_out.set_start_key(start_row);
ri_out.set_end_key(stop_row);
-
string header{"PBUF"};
string ser = header + ri_out.SerializeAsString();
http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/serde/region-info.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h
index 6af351c..e2ecfc9 100644
--- a/hbase-native-client/serde/region-info.h
+++ b/hbase-native-client/serde/region-info.h
@@ -21,16 +21,16 @@
#include "if/HBase.pb.h"
-#include <folly/Conv.h>
#include <boost/algorithm/string/predicate.hpp>
+#include <folly/Conv.h>
namespace hbase {
namespace pb {
-template <class String> void parseTo(String in, RegionInfo& out) {
+template <class String> void parseTo(String in, RegionInfo &out) {
// TODO(eclark): there has to be something better.
std::string s = folly::to<std::string>(in);
- if (!boost::starts_with(s, "PBUF") ) {
+ if (!boost::starts_with(s, "PBUF")) {
throw std::runtime_error("Region Info field doesn't contain preamble");
}
if (!out.ParseFromArray(s.data() + 4, s.size() - 4)) {