You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:45 UTC
[hbase] 57/133: HBASE-15705 Add on meta cache (Mikhail Antonov)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit bb6dc7330fea1a7410a0d020cbdbbf1221d47052
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Thu Jan 19 15:01:11 2017 -0800
HBASE-15705 Add on meta cache (Mikhail Antonov)
---
hbase-native-client/connection/connection-pool.cc | 20 +--
hbase-native-client/connection/rpc-client.cc | 5 +-
hbase-native-client/connection/rpc-client.h | 6 +-
hbase-native-client/core/client.cc | 17 ++-
hbase-native-client/core/client.h | 9 +-
hbase-native-client/core/location-cache-test.cc | 82 ++++++++++++-
hbase-native-client/core/location-cache.cc | 142 +++++++++++++++++++++-
hbase-native-client/core/location-cache.h | 97 ++++++++++++++-
hbase-native-client/core/region-location.h | 4 +
hbase-native-client/core/simple-client.cc | 2 +-
10 files changed, 349 insertions(+), 35 deletions(-)
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 07518c5..15dd64e 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -41,15 +41,7 @@ ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_
ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf)
: cf_(cf), clients_(), connections_(), map_mutex_() {}
-ConnectionPool::~ConnectionPool() {
- SharedMutexWritePriority::WriteHolder holder(map_mutex_);
- for (auto &item : connections_) {
- auto &con = item.second;
- con->Close();
- }
- connections_.clear();
- clients_.clear();
-}
+ConnectionPool::~ConnectionPool() { Close(); }
std::shared_ptr<RpcConnection> ConnectionPool::GetConnection(
std::shared_ptr<ConnectionId> remote_id) {
@@ -116,4 +108,12 @@ void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
connections_.erase(found);
}
-void ConnectionPool::Close() {}
+void ConnectionPool::Close() {
+ SharedMutexWritePriority::WriteHolder holder{map_mutex_};
+ for (auto &item : connections_) {
+ auto &con = item.second;
+ con->Close();
+ }
+ connections_.clear();
+ clients_.clear();
+}
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 9cfefb8..7621193 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -39,9 +39,8 @@ class RpcChannelImplementation : public AbstractRpcChannel {
};
} // namespace hbase
-RpcClient::RpcClient() {
- io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
-
+RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
+ : io_executor_(io_executor) {
cp_ = std::make_shared<ConnectionPool>(io_executor_);
}
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index 407d588..aeb9b56 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -26,6 +26,8 @@
#include <google/protobuf/service.h>
+#include <utility>
+
using hbase::security::User;
using hbase::pb::ServerName;
using hbase::Request;
@@ -49,7 +51,7 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
friend class RpcChannelImplementation;
public:
- RpcClient();
+ RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
virtual ~RpcClient() { Close(); }
@@ -77,6 +79,8 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
std::shared_ptr<User> ticket,
int rpc_timeout);
+ std::shared_ptr<ConnectionPool> connection_pool() const { return cp_; }
+
private:
void CallMethod(const MethodDescriptor *method, RpcController *controller, const Message *req_msg,
Message *resp_msg, Closure *done, const std::string &host, uint16_t port,
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 6eb3d8f..dd568ce 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -33,15 +33,22 @@ Client::Client() {
"hbase-site.xml is absent in the search path or problems in XML parsing";
throw std::runtime_error("Configuration object not present.");
}
- conf_ = std::make_shared<hbase::Configuration>(conf.value());
- auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
- location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_, io_executor_);
+ init(conf.value());
}
-Client::Client(const hbase::Configuration &conf) {
+Client::Client(const hbase::Configuration &conf) { init(conf); }
+
+void Client::init(const hbase::Configuration &conf) {
conf_ = std::make_shared<hbase::Configuration>(conf);
auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
- location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_, io_executor_);
+
+ cpu_executor_ =
+ std::make_shared<wangle::CPUThreadPoolExecutor>(4); // TODO: read num threads from conf
+ io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
+
+ rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_);
+ location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_,
+ rpc_client_->connection_pool());
}
// We can't have the threads continue running after everything is done
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index da71624..730981d 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -67,14 +67,13 @@ class Client {
void Close();
private:
+ void init(const hbase::Configuration &conf);
const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181";
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_ =
- std::make_shared<wangle::CPUThreadPoolExecutor>(4);
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_ =
- std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
std::shared_ptr<hbase::LocationCache> location_cache_;
- std::shared_ptr<hbase::RpcClient> rpc_client_ = std::make_shared<hbase::RpcClient>();
+ std::shared_ptr<hbase::RpcClient> rpc_client_;
std::shared_ptr<hbase::Configuration> conf_;
bool is_closed_ = false;
};
diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc
index 53db6fc..42e7bb3 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -34,19 +34,24 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) {
auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
- LocationCache cache{"localhost:2181", cpu, io};
+ auto cp = std::make_shared<ConnectionPool>(io);
+ LocationCache cache{"localhost:2181", cpu, cp};
auto f = cache.LocateMeta();
auto result = f.get();
ASSERT_FALSE(f.hasException());
ASSERT_TRUE(result.has_port());
ASSERT_TRUE(result.has_host_name());
+ cpu->stop();
+ io->stop();
+ cp->Close();
}
TEST(LocationCacheTest, TestGetRegionLocation) {
TestUtil test_util{};
auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
- LocationCache cache{"localhost:2181", cpu, io};
+ auto cp = std::make_shared<ConnectionPool>(io);
+ LocationCache cache{"localhost:2181", cpu, cp};
// If there is no table this should throw an exception
auto tn = folly::to<hbase::pb::TableName>("t");
@@ -57,4 +62,77 @@ TEST(LocationCacheTest, TestGetRegionLocation) {
ASSERT_TRUE(loc != nullptr);
cpu->stop();
io->stop();
+ cp->Close();
+}
+
+TEST(LocationCacheTest, TestCaching) {
+ TestUtil test_util{};
+ auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+ auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
+ auto cp = std::make_shared<ConnectionPool>(io);
+ LocationCache cache{"localhost:2181", cpu, cp};
+
+ auto tn_1 = folly::to<hbase::pb::TableName>("t1");
+ auto tn_2 = folly::to<hbase::pb::TableName>("t2");
+ auto tn_3 = folly::to<hbase::pb::TableName>("t3");
+ auto row_a = "a";
+
+ // test location pulled from meta gets cached
+ ASSERT_ANY_THROW(cache.LocateRegion(tn_1, row_a).get(milliseconds(1000)));
+ ASSERT_ANY_THROW(cache.LocateFromMeta(tn_1, row_a).get(milliseconds(1000)));
+
+ test_util.RunShellCmd("create 't1', 'd'");
+
+ ASSERT_FALSE(cache.IsLocationCached(tn_1, row_a));
+ auto loc = cache.LocateRegion(tn_1, row_a).get(milliseconds(1000));
+ ASSERT_TRUE(cache.IsLocationCached(tn_1, row_a));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_1, row_a));
+
+ // test with two regions
+ test_util.RunShellCmd("create 't2', 'd', SPLITS => ['b']");
+
+ ASSERT_FALSE(cache.IsLocationCached(tn_2, "a"));
+ loc = cache.LocateRegion(tn_2, "a").get(milliseconds(1000));
+ ASSERT_TRUE(cache.IsLocationCached(tn_2, "a"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "a"));
+
+ ASSERT_FALSE(cache.IsLocationCached(tn_2, "b"));
+ loc = cache.LocateRegion(tn_2, "b").get(milliseconds(1000));
+ ASSERT_TRUE(cache.IsLocationCached(tn_2, "b"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "b"));
+ ASSERT_TRUE(cache.IsLocationCached(tn_2, "ba"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "ba"));
+
+ // test with three regions
+ test_util.RunShellCmd("create 't3', 'd', SPLITS => ['b', 'c']");
+
+ ASSERT_FALSE(cache.IsLocationCached(tn_3, "c"));
+ ASSERT_FALSE(cache.IsLocationCached(tn_3, "ca"));
+ loc = cache.LocateRegion(tn_3, "ca").get(milliseconds(1000));
+ ASSERT_TRUE(cache.IsLocationCached(tn_3, "c"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "c"));
+ ASSERT_TRUE(cache.IsLocationCached(tn_3, "ca"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "ca"));
+
+ ASSERT_FALSE(cache.IsLocationCached(tn_3, "b"));
+ loc = cache.LocateRegion(tn_3, "b").get(milliseconds(1000));
+ ASSERT_TRUE(cache.IsLocationCached(tn_3, "b"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "b"));
+ ASSERT_TRUE(cache.IsLocationCached(tn_3, "ba"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "ba"));
+
+ // clear second region
+ cache.ClearCachedLocation(tn_3, "b");
+ ASSERT_FALSE(cache.IsLocationCached(tn_3, "b"));
+
+ ASSERT_FALSE(cache.IsLocationCached(tn_3, "a"));
+ loc = cache.LocateRegion(tn_3, "a").get(milliseconds(1000));
+ ASSERT_TRUE(cache.IsLocationCached(tn_3, "a"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "a"));
+ ASSERT_TRUE(cache.IsLocationCached(tn_3, "abc"));
+ ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "abc"));
+
+ cpu->stop();
+ io->stop();
+ cp->Close();
}
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 6c2a790..66f3eb7 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -55,14 +55,16 @@ static const char META_ZNODE_NAME[] = "/hbase/meta-region-server";
LocationCache::LocationCache(std::string quorum_spec,
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
+ std::shared_ptr<ConnectionPool> cp)
: quorum_spec_(quorum_spec),
cpu_executor_(cpu_executor),
meta_promise_(nullptr),
meta_lock_(),
- cp_(io_executor),
+ cp_(cp),
meta_util_(),
- zk_(nullptr) {
+ zk_(nullptr),
+ cached_locations_(),
+ locations_lock_() {
zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0);
}
@@ -121,7 +123,7 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const Tabl
.via(cpu_executor_.get())
.then([this](ServerName sn) {
auto remote_id = std::make_shared<ConnectionId>(sn.host_name(), sn.port());
- return this->cp_.GetConnection(remote_id);
+ return this->cp_->GetConnection(remote_id);
})
.then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) {
return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row)));
@@ -143,11 +145,27 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const Tabl
auto remote_id =
std::make_shared<ConnectionId>(rl->server_name().host_name(), rl->server_name().port());
// Now fill out the connection.
- rl->set_service(cp_.GetConnection(remote_id)->get_service());
+ // rl->set_service(cp_->GetConnection(remote_id)->get_service()); TODO: this causes wangle
+ // assertion errors
+ return rl;
+ })
+ .then([tn, this](shared_ptr<RegionLocation> rl) {
+ // now add fetched location to the cache.
+ this->CacheLocation(tn, rl);
return rl;
});
}
+Future<shared_ptr<RegionLocation>> LocationCache::LocateRegion(const hbase::pb::TableName &tn,
+ const std::string &row) {
+ auto cached_loc = this->GetCachedLocation(tn, row);
+ if (cached_loc != nullptr) {
+ return cached_loc;
+ } else {
+ return this->LocateFromMeta(tn, row);
+ }
+}
+
std::shared_ptr<RegionLocation> LocationCache::CreateLocation(const Response &resp) {
auto resp_msg = static_pointer_cast<ScanResponse>(resp.resp_msg());
auto &results = resp_msg->results().Get(0);
@@ -163,3 +181,117 @@ std::shared_ptr<RegionLocation> LocationCache::CreateLocation(const Response &re
auto server_name = folly::to<ServerName>(cell_one);
return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr);
}
+
+// must hold shared lock on locations_lock_
+shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb::TableName &tn,
+ const std::string &row) {
+ auto t_locs = this->GetTableLocations(tn);
+ std::shared_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
+
+ if (VLOG_IS_ON(2)) {
+ for (const auto &p : *t_locs) {
+ VLOG(2) << "t_locs[" << p.first << "] = " << p.second->DebugString();
+ }
+ }
+
+ // looking for the "floor" key as a start key
+ auto possible_region = t_locs->upper_bound(row);
+
+ if (t_locs->empty()) {
+ VLOG(2) << "Could not find region in cache, table map is empty";
+ return nullptr;
+ }
+
+ if (possible_region == t_locs->begin()) {
+ VLOG(2) << "Could not find region in cache, all keys are greater, row:" << row
+ << " ,possible_region:" << possible_region->second->DebugString();
+ return nullptr;
+ }
+ --possible_region;
+
+ VLOG(2) << "Found possible region in cache for row:" << row
+ << " ,possible_region:" << possible_region->second->DebugString();
+
+ // found possible start key, now need to check end key
+ if (possible_region->second->region_info().end_key() == "" ||
+ possible_region->second->region_info().end_key() > row) {
+ VLOG(1) << "Found region in cache for row:" << row
+ << " ,region:" << possible_region->second->DebugString();
+ return possible_region->second;
+ } else {
+ return nullptr;
+ }
+}
+
+// must hold unique lock on locations_lock_
+void LocationCache::CacheLocation(const hbase::pb::TableName &tn,
+ const shared_ptr<RegionLocation> loc) {
+ auto t_locs = this->GetTableLocations(tn);
+ std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
+
+ (*t_locs)[loc->region_info().start_key()] = loc;
+ VLOG(1) << "Cached location for region:" << loc->DebugString();
+}
+
+// must hold shared lock on locations_lock_
+bool LocationCache::IsLocationCached(const hbase::pb::TableName &tn, const std::string &row) {
+ return (this->GetCachedLocation(tn, row) != nullptr);
+}
+
+// shared lock needed for cases when this table has been requested before;
+// in the rare case it hasn't, unique lock will be grabbed to add it to cache
+shared_ptr<hbase::PerTableLocationMap> LocationCache::GetTableLocations(
+ const hbase::pb::TableName &tn) {
+ auto found_locs = this->GetCachedTableLocations(tn);
+ if (found_locs == nullptr) {
+ found_locs = this->GetNewTableLocations(tn);
+ }
+ return found_locs;
+}
+
+shared_ptr<hbase::PerTableLocationMap> LocationCache::GetCachedTableLocations(
+ const hbase::pb::TableName &tn) {
+ SharedMutexWritePriority::ReadHolder r_holder{locations_lock_};
+
+ auto table_locs = cached_locations_.find(tn);
+ if (table_locs != cached_locations_.end()) {
+ return table_locs->second;
+ } else {
+ return nullptr;
+ }
+}
+
+shared_ptr<hbase::PerTableLocationMap> LocationCache::GetNewTableLocations(
+ const hbase::pb::TableName &tn) {
+ // double-check locking under upgradable lock
+ SharedMutexWritePriority::UpgradeHolder u_holder{locations_lock_};
+
+ auto table_locs = cached_locations_.find(tn);
+ if (table_locs != cached_locations_.end()) {
+ return table_locs->second;
+ }
+ SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)};
+
+ auto t_locs_p = make_shared<map<std::string, shared_ptr<RegionLocation>>>();
+ cached_locations_.insert(std::make_pair(tn, t_locs_p));
+ return t_locs_p;
+}
+
+// must hold unique lock on locations_lock_
+void LocationCache::ClearCache() {
+ unique_lock<SharedMutexWritePriority> lock(locations_lock_);
+ cached_locations_.clear();
+}
+
+// must hold unique lock on locations_lock_
+void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) {
+ unique_lock<SharedMutexWritePriority> lock(locations_lock_);
+ cached_locations_.erase(tn);
+}
+
+// must hold unique lock on locations_lock_
+void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row) {
+ auto table_locs = this->GetTableLocations(tn);
+ unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
+ table_locs->erase(row);
+}
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index b290a1f..22a8ad5 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -19,6 +19,7 @@
#pragma once
#include <folly/Executor.h>
+#include <folly/SharedMutex.h>
#include <folly/futures/Future.h>
#include <folly/futures/SharedPromise.h>
#include <wangle/concurrent/CPUThreadPoolExecutor.h>
@@ -27,6 +28,7 @@
#include <memory>
#include <mutex>
+#include <shared_mutex>
#include <string>
#include "connection/connection-pool.h"
@@ -40,8 +42,34 @@ class Request;
class Response;
namespace pb {
class ServerName;
+class TableName;
}
+/** Equals function for TableName (uses namespace and table name) */
+struct TableNameEquals {
+ /** equals */
+ bool operator()(const hbase::pb::TableName &lht, const hbase::pb::TableName &rht) const {
+ return lht.namespace_() == rht.namespace_() && lht.qualifier() == rht.qualifier();
+ }
+};
+
+/** Hash for TableName. */
+struct TableNameHash {
+ /** hash */
+ std::size_t operator()(hbase::pb::TableName const &t) const {
+ std::size_t h = 0;
+ boost::hash_combine(h, t.namespace_());
+ boost::hash_combine(h, t.qualifier());
+ return h;
+ }
+};
+
+// typedefs for location cache
+typedef std::map<std::string, std::shared_ptr<RegionLocation>> PerTableLocationMap;
+typedef std::unordered_map<hbase::pb::TableName, std::shared_ptr<PerTableLocationMap>,
+ TableNameHash, TableNameEquals>
+ RegionLocationMap;
+
/**
* Class that can look up and cache locations.
*/
@@ -56,7 +84,7 @@ class LocationCache {
*/
LocationCache(std::string quorum_spec,
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
+ std::shared_ptr<ConnectionPool> cp);
/**
* Destructor.
* This will clean up the zookeeper connections.
@@ -71,7 +99,8 @@ class LocationCache {
folly::Future<hbase::pb::ServerName> LocateMeta();
/**
- * Go read meta and find out where a region is located.
+ * Go read meta and find out where a region is located. Most users should
+ * never call this method directly and should use LocateRegion() instead.
*
* @param tn Table name of the table to look up. This object must live until
* after the future is returned
@@ -83,14 +112,72 @@ class LocationCache {
const std::string &row);
/**
+ * The only method clients should use for meta lookups. If corresponding
+ * location is cached, it's returned from the cache, otherwise lookup
+ * in meta table is done, location is cached and then returned.
+ * It's expected that tiny fraction of invocations incurs meta scan.
+ * This method is to look up non-meta regions; use LocateMeta() to get the
+ * location of hbase:meta region.
+ *
+ * @param tn Table name of the table to look up. This object must live until
+ * after the future is returned
+ *
+ * @param row of the table to look up. This object must live until after the
+ * future is returned
+ */
+ folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(const hbase::pb::TableName &tn,
+ const std::string &row);
+
+ /**
* Remove the cached location of meta.
*/
void InvalidateMeta();
+ /**
+ * Return cached region location corresponding to this row,
+ * nullptr if this location isn't cached.
+ */
+ std::shared_ptr<RegionLocation> GetCachedLocation(const hbase::pb::TableName &tn,
+ const std::string &row);
+
+ /**
+ * Add non-meta region location in the cache (location of meta itself
+ * is cached separately).
+ */
+ void CacheLocation(const hbase::pb::TableName &tn, const std::shared_ptr<RegionLocation> loc);
+
+ /**
+ * Check if location corresponding to this row key is cached.
+ */
+ bool IsLocationCached(const hbase::pb::TableName &tn, const std::string &row);
+
+ /**
+ * Return cached location for all region of this table.
+ */
+ std::shared_ptr<PerTableLocationMap> GetTableLocations(const hbase::pb::TableName &tn);
+
+ /**
+ * Completely clear location cache.
+ */
+ void ClearCache();
+
+ /**
+ * Clear all cached locations for one table.
+ */
+ void ClearCachedLocations(const hbase::pb::TableName &tn);
+
+ /**
+ * Clear cached region location.
+ */
+ void ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row);
+
private:
void RefreshMetaLocation();
hbase::pb::ServerName ReadMetaLocation();
std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
+ std::shared_ptr<hbase::PerTableLocationMap> GetCachedTableLocations(
+ const hbase::pb::TableName &tn);
+ std::shared_ptr<hbase::PerTableLocationMap> GetNewTableLocations(const hbase::pb::TableName &tn);
/* data */
std::string quorum_spec_;
@@ -98,7 +185,11 @@ class LocationCache {
std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_;
std::mutex meta_lock_;
MetaUtil meta_util_;
- ConnectionPool cp_;
+ std::shared_ptr<ConnectionPool> cp_;
+
+ // cached region locations
+ RegionLocationMap cached_locations_;
+ folly::SharedMutexWritePriority locations_lock_;
// TODO: migrate this to a smart pointer with a deleter.
zhandle_t *zk_;
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index e7b76d3..b0411cb 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -79,6 +79,10 @@ class RegionLocation {
*/
void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; }
+ const std::string DebugString() {
+ return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString();
+ }
+
private:
std::string region_name_;
hbase::pb::RegionInfo ri_;
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 90e7cd4..dac0d10 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -94,7 +94,7 @@ int main(int argc, char *argv[]) {
auto row = FLAGS_row;
auto tn = folly::to<TableName>(FLAGS_table);
- auto loc = cache.LocateFromMeta(tn, row).get(milliseconds(5000));
+ auto loc = cache.LocateRegion(tn, row).get(milliseconds(5000));
auto connection = loc->service();
auto num_puts = FLAGS_columns;