You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:24 UTC
[hbase] 96/133: HBASE-18178 [C++] Retrying meta location lookup and
zookeeper connection
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6af4fd7ec2eba5afabbdaced5e4f14adeaff4451
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Jun 16 11:55:51 2017 -0700
HBASE-18178 [C++] Retrying meta location lookup and zookeeper connection
---
hbase-native-client/connection/client-handler.cc | 9 +-
hbase-native-client/core/BUCK | 12 +++
.../core/async-rpc-retrying-caller.cc | 17 ++--
.../core/async-rpc-retrying-test.cc | 3 +-
.../core/connection-configuration.h | 2 +-
.../core/location-cache-retry-test.cc | 112 +++++++++++++++++++++
hbase-native-client/core/location-cache.cc | 100 ++++++++++++------
hbase-native-client/core/location-cache.h | 8 +-
hbase-native-client/core/meta-utils.cc | 42 +++++---
hbase-native-client/core/meta-utils.h | 24 ++++-
hbase-native-client/core/region-location.h | 19 +---
hbase-native-client/core/response-converter.cc | 2 +-
hbase-native-client/core/simple-client.cc | 18 +++-
hbase-native-client/core/zk-util.cc | 4 +
hbase-native-client/core/zk-util.h | 5 +
hbase-native-client/serde/region-info.h | 4 +-
hbase-native-client/serde/table-name.h | 5 +-
hbase-native-client/test-util/mini-cluster.cc | 4 +
hbase-native-client/test-util/test-util.cc | 4 +
hbase-native-client/test-util/test-util.h | 1 +
hbase-native-client/utils/bytes-util-test.cc | 3 +-
hbase-native-client/utils/bytes-util.cc | 4 +-
22 files changed, 315 insertions(+), 87 deletions(-)
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 894ecb3..775df68 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -51,7 +51,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
int used_bytes = serde_.ParseDelimited(buf.get(), &header);
VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
- << " has_exception=" << header.has_exception();
+ << " has_exception=" << header.has_exception() << ", server: " << server_;
// Get the response protobuf from the map
auto search = resp_msgs_->find(header.call_id());
@@ -80,7 +80,8 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
}
VLOG(3) << "Read RPCResponse, buf length:" << buf->length()
- << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length;
+ << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length
+ << ", server: " << server_;
// Make sure that bytes were parsed.
CHECK((used_bytes + cell_block_length) == buf->length());
@@ -113,7 +114,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id()
<< " exception.what=" << remote_exception->what()
- << ", do_not_retry=" << remote_exception->do_not_retry();
+ << ", do_not_retry=" << remote_exception->do_not_retry() << ", server: " << server_;
received->set_exception(folly::exception_wrapper{*remote_exception});
}
ctx->fireRead(std::move(received));
@@ -129,7 +130,7 @@ folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Re
ctx->fireWrite(std::move(header));
});
- VLOG(3) << "Writing RPC Request:" << r->DebugString();
+ VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_;
// Now store the call id to response.
resp_msgs_->insert(r->call_id(), r->resp_msg());
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 47e97f5..464c010 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -137,6 +137,18 @@ cxx_test(
],
run_test_separately=True,)
cxx_test(
+ name="location-cache-retry-test",
+ srcs=[
+ "location-cache-retry-test.cc",
+ ],
+ deps=[
+ ":core",
+ "//if:if",
+ "//serde:serde",
+ "//test-util:test-util",
+ ],
+ run_test_separately=True,)
+cxx_test(
name="cell-test",
srcs=[
"cell-test.cc",
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index 0302ad3..aee7d0b 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -168,14 +168,17 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc)
ResetController(controller_, call_timeout_ns);
- callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client)
- .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
- .onError([&, this](const exception_wrapper& e) {
+ // TODO: RegionLocation should propagate through these method chains as a shared_ptr.
+ // Otherwise, it may get deleted underneat us. We are just copying for now.
+ auto loc_ptr = std::make_shared<RegionLocation>(loc);
+ callable_(controller_, loc_ptr, rpc_client)
+ .then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
+ .onError([&, loc_ptr, this](const exception_wrapper& e) {
OnError(e,
[&, this]() -> std::string {
- return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(),
- loc.server_name().port()) +
- " for '" + row_ + "' in " + loc.DebugString() + " of " +
+ return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(),
+ loc_ptr->server_name().port()) +
+ " for '" + row_ + "' in " + loc_ptr->DebugString() + " of " +
table_name_->namespace_() + "::" + table_name_->qualifier() +
" failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
std::to_string(max_attempts_) + ", timeout = " +
@@ -184,7 +187,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc)
" ms";
},
[&, this](const exception_wrapper& error) {
- conn_->region_locator()->UpdateCachedLocation(loc, error);
+ conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error);
});
});
}
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index 0f83914..f887815 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -147,8 +147,7 @@ class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
folly::Promise<std::shared_ptr<RegionLocation>> promise;
/* set random region name, simulating invalid region */
auto result = std::make_shared<RegionLocation>(
- "whatever-region-name", region_location_->region_info(), region_location_->server_name(),
- region_location_->service());
+ "whatever-region-name", region_location_->region_info(), region_location_->server_name());
promise.setValue(result);
return promise.getFuture();
}
diff --git a/hbase-native-client/core/connection-configuration.h b/hbase-native-client/core/connection-configuration.h
index adc8d5b..995798e 100644
--- a/hbase-native-client/core/connection-configuration.h
+++ b/hbase-native-client/core/connection-configuration.h
@@ -143,7 +143,7 @@ class ConnectionConfiguration {
*/
static constexpr const char* kClientRetriesNumber = "hbase.client.retries.number";
- static constexpr const uint32_t kDefaultClientRetriesNumber = 31;
+ static constexpr const uint32_t kDefaultClientRetriesNumber = 35;
/**
* Configure the number of failures after which the client will start logging. A few failures
diff --git a/hbase-native-client/core/location-cache-retry-test.cc b/hbase-native-client/core/location-cache-retry-test.cc
new file mode 100644
index 0000000..988f994
--- /dev/null
+++ b/hbase-native-client/core/location-cache-retry-test.cc
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+
+#include "core/append.h"
+#include "core/cell.h"
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/delete.h"
+#include "core/get.h"
+#include "core/hbase-configuration-loader.h"
+#include "core/increment.h"
+#include "core/meta-utils.h"
+#include "core/put.h"
+#include "core/result.h"
+#include "core/table.h"
+#include "exceptions/exception.h"
+#include "serde/table-name.h"
+#include "test-util/test-util.h"
+#include "utils/bytes-util.h"
+
+using hbase::Cell;
+using hbase::Configuration;
+using hbase::Get;
+using hbase::MetaUtil;
+using hbase::RetriesExhaustedException;
+using hbase::Put;
+using hbase::Table;
+using hbase::TestUtil;
+
+using std::chrono_literals::operator""s;
+
+class LocationCacheRetryTest : public ::testing::Test {
+ public:
+ static std::unique_ptr<hbase::TestUtil> test_util;
+ static void SetUpTestCase() {
+ google::InstallFailureSignalHandler();
+ test_util = std::make_unique<hbase::TestUtil>();
+ test_util->StartMiniCluster(2);
+ test_util->conf()->SetInt("hbase.client.retries.number", 5);
+ }
+};
+
+std::unique_ptr<hbase::TestUtil> LocationCacheRetryTest::test_util = nullptr;
+
+TEST_F(LocationCacheRetryTest, GetFromMetaTable) {
+ auto tn = folly::to<hbase::pb::TableName>("hbase:meta");
+ auto row = "test1";
+
+ hbase::Client client(*LocationCacheRetryTest::test_util->conf());
+
+ // do a get against the other table, but not the actual table "t".
+ auto table = client.Table(tn);
+ hbase::Get get(row);
+ auto result = table->Get(get);
+
+ LocationCacheRetryTest::test_util->MoveRegion(MetaUtil::kMetaRegion, "");
+
+ std::this_thread::sleep_for(3s); // sleep 3 sec
+
+ result = table->Get(get);
+}
+
+TEST_F(LocationCacheRetryTest, PutGet) {
+ LocationCacheRetryTest::test_util->CreateTable("t", "d");
+ LocationCacheRetryTest::test_util->CreateTable("t2", "d");
+
+ auto tn = folly::to<hbase::pb::TableName>("t");
+ auto tn2 = folly::to<hbase::pb::TableName>("t2");
+ auto row = "test1";
+
+ hbase::Client client(*LocationCacheRetryTest::test_util->conf());
+
+ // do a get against the other table, but not the actual table "t".
+ auto table = client.Table(tn);
+ auto table2 = client.Table(tn2);
+ hbase::Get get(row);
+ auto result = table2->Get(get);
+
+ // we should have already cached the location of meta right now. Now
+ // move the meta region to the other server so that we will get a NotServingRegionException
+ // when we do the actual location lookup request. If there is no invalidation
+ // of the meta's own location, then following put/get will result in retries exhausted.
+ LocationCacheRetryTest::test_util->MoveRegion(MetaUtil::kMetaRegion, "");
+
+ std::this_thread::sleep_for(3s); // sleep 3 sec
+
+ table->Put(Put{row}.AddColumn("d", "1", "value1"));
+
+ result = table->Get(get);
+
+ ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+ EXPECT_EQ("test1", result->Row());
+ EXPECT_EQ("value1", *(result->Value("d", "1")));
+}
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index dfe3e9f..5f68420 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -24,8 +24,12 @@
#include <wangle/concurrent/CPUThreadPoolExecutor.h>
#include <wangle/concurrent/IOThreadPoolExecutor.h>
+#include <map>
+#include <utility>
+
#include "connection/response.h"
#include "connection/rpc-connection.h"
+#include "core/meta-utils.h"
#include "exceptions/exception.h"
#include "if/Client.pb.h"
#include "if/ZooKeeper.pb.h"
@@ -33,8 +37,6 @@
#include "serde/server-name.h"
#include "serde/zk.h"
-#include <utility>
-
using hbase::pb::MetaRegionServer;
using hbase::pb::ServerName;
using hbase::pb::TableName;
@@ -54,27 +56,49 @@ LocationCache::LocationCache(std::shared_ptr<hbase::Configuration> conf,
cached_locations_(),
locations_lock_() {
zk_quorum_ = ZKUtil::ParseZooKeeperQuorum(*conf_);
- zk_ = zookeeper_init(zk_quorum_.c_str(), nullptr, 1000, 0, 0, 0);
+ EnsureZooKeeperConnection();
+}
+
+LocationCache::~LocationCache() { CloseZooKeeperConnection(); }
+
+void LocationCache::CloseZooKeeperConnection() {
+ if (zk_ != nullptr) {
+ zookeeper_close(zk_);
+ zk_ = nullptr;
+ LOG(INFO) << "Closed connection to ZooKeeper.";
+ }
}
-LocationCache::~LocationCache() {
- zookeeper_close(zk_);
- zk_ = nullptr;
- LOG(INFO) << "Closed connection to ZooKeeper.";
+void LocationCache::EnsureZooKeeperConnection() {
+ if (zk_ == nullptr) {
+ LOG(INFO) << "Connecting to ZooKeeper. Quorum:" + zk_quorum_;
+ auto session_timeout = ZKUtil::SessionTimeout(*conf_);
+ zk_ = zookeeper_init(zk_quorum_.c_str(), nullptr, session_timeout, nullptr, nullptr, 0);
+ }
}
folly::Future<ServerName> LocationCache::LocateMeta() {
- std::lock_guard<std::mutex> g(meta_lock_);
+ std::lock_guard<std::recursive_mutex> g(meta_lock_);
if (meta_promise_ == nullptr) {
this->RefreshMetaLocation();
}
- return meta_promise_->getFuture();
+ return meta_promise_->getFuture().onError([&](const folly::exception_wrapper &ew) {
+ auto promise = InvalidateMeta();
+ promise->setException(ew);
+ return ServerName{};
+ });
}
-void LocationCache::InvalidateMeta() {
+std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> LocationCache::InvalidateMeta() {
+ VLOG(2) << "Invalidating meta location";
+ std::lock_guard<std::recursive_mutex> g(meta_lock_);
if (meta_promise_ != nullptr) {
- std::lock_guard<std::mutex> g(meta_lock_);
- meta_promise_ = nullptr;
+ // return the unique_ptr back to the caller.
+ std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> ret = nullptr;
+ std::swap(ret, meta_promise_);
+ return ret;
+ } else {
+ return nullptr;
}
}
@@ -84,18 +108,21 @@ void LocationCache::RefreshMetaLocation() {
cpu_executor_->add([&] { meta_promise_->setWith([&] { return this->ReadMetaLocation(); }); });
}
+// Note: this is a blocking call to zookeeper
ServerName LocationCache::ReadMetaLocation() {
auto buf = folly::IOBuf::create(4096);
ZkDeserializer derser;
+ EnsureZooKeeperConnection();
// This needs to be int rather than size_t as that's what ZK expects.
int len = buf->capacity();
std::string zk_node = ZKUtil::MetaZNode(*conf_);
- // TODO(elliott): handle disconnects/reconntion as needed.
int zk_result = zoo_get(this->zk_, zk_node.c_str(), 0,
reinterpret_cast<char *>(buf->writableData()), &len, nullptr);
if (zk_result != ZOK || len < 9) {
LOG(ERROR) << "Error getting meta location.";
+ // We just close the zk connection, and let the upper levels retry.
+ CloseZooKeeperConnection();
throw std::runtime_error("Error getting meta location. Quorum: " + zk_quorum_);
}
buf->append(len);
@@ -103,6 +130,8 @@ ServerName LocationCache::ReadMetaLocation() {
MetaRegionServer mrs;
if (derser.Parse(buf.get(), &mrs) == false) {
LOG(ERROR) << "Unable to decode";
+ throw std::runtime_error("Error getting meta location (Unable to decode). Quorum: " +
+ zk_quorum_);
}
return mrs.server();
}
@@ -118,10 +147,15 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
.then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) {
return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row)));
})
- .then([this](std::unique_ptr<Response> resp) {
+ .onError([&](const folly::exception_wrapper &ew) {
+ auto promise = InvalidateMeta();
+ throw ew;
+ return static_cast<std::unique_ptr<Response>>(nullptr);
+ })
+ .then([tn, this](std::unique_ptr<Response> resp) {
// take the protobuf response and make it into
// a region location.
- return meta_util_.CreateLocation(std::move(*resp));
+ return meta_util_.CreateLocation(std::move(*resp), tn);
})
.then([tn, this](std::shared_ptr<RegionLocation> rl) {
// Make sure that the correct location was found.
@@ -134,9 +168,6 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
.then([this](std::shared_ptr<RegionLocation> rl) {
auto remote_id =
std::make_shared<ConnectionId>(rl->server_name().host_name(), rl->server_name().port());
- // Now fill out the connection.
- // rl->set_service(cp_->GetConnection(remote_id)->get_service()); TODO: this causes wangle
- // assertion errors
return rl;
})
.then([tn, this](std::shared_ptr<RegionLocation> rl) {
@@ -146,9 +177,20 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
});
}
+constexpr const char *MetaUtil::kMetaRegionName;
+
folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateRegion(
const TableName &tn, const std::string &row, const RegionLocateType locate_type,
const int64_t locate_ns) {
+ // We maybe asked to locate meta itself
+ if (MetaUtil::IsMeta(tn)) {
+ return LocateMeta().then([this](const ServerName &server_name) {
+ auto rl = std::make_shared<RegionLocation>(MetaUtil::kMetaRegionName,
+ meta_util_.meta_region_info(), server_name);
+ return rl;
+ });
+ }
+
// TODO: implement region locate type and timeout
auto cached_loc = this->GetCachedLocation(tn, row);
if (cached_loc != nullptr) {
@@ -164,34 +206,28 @@ std::shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb
auto t_locs = this->GetTableLocations(tn);
std::shared_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
- if (VLOG_IS_ON(2)) {
- for (const auto &p : *t_locs) {
- VLOG(2) << "t_locs[" << p.first << "] = " << p.second->DebugString();
- }
- }
-
// looking for the "floor" key as a start key
auto possible_region = t_locs->upper_bound(row);
if (t_locs->empty()) {
- VLOG(2) << "Could not find region in cache, table map is empty";
+ VLOG(5) << "Could not find region in cache, table map is empty";
return nullptr;
}
if (possible_region == t_locs->begin()) {
- VLOG(2) << "Could not find region in cache, all keys are greater, row:" << row
+ VLOG(5) << "Could not find region in cache, all keys are greater, row:" << row
<< " ,possible_region:" << possible_region->second->DebugString();
return nullptr;
}
--possible_region;
- VLOG(2) << "Found possible region in cache for row:" << row
+ VLOG(5) << "Found possible region in cache for row:" << row
<< " ,possible_region:" << possible_region->second->DebugString();
// found possible start key, now need to check end key
if (possible_region->second->region_info().end_key() == "" ||
possible_region->second->region_info().end_key() > row) {
- VLOG(1) << "Found region in cache for row:" << row
+ VLOG(2) << "Found region in cache for row:" << row
<< " ,region:" << possible_region->second->DebugString();
return possible_region->second;
} else {
@@ -261,15 +297,23 @@ void LocationCache::ClearCache() {
// must hold unique lock on locations_lock_
void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) {
+ VLOG(1) << "ClearCachedLocations, table:" << folly::to<std::string>(tn);
std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
cached_locations_.erase(tn);
+ if (MetaUtil::IsMeta(tn)) {
+ InvalidateMeta();
+ }
}
// must hold unique lock on locations_lock_
void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row) {
+ VLOG(1) << "ClearCachedLocation, table:" << folly::to<std::string>(tn) << ", row:" << row;
auto table_locs = this->GetTableLocations(tn);
std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
table_locs->erase(row);
+ if (MetaUtil::IsMeta(tn)) {
+ InvalidateMeta();
+ }
}
void LocationCache::UpdateCachedLocation(const RegionLocation &loc,
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index a3c15cb..a374fb6 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -137,7 +137,7 @@ class LocationCache : public AsyncRegionLocator {
/**
* Remove the cached location of meta.
*/
- void InvalidateMeta();
+ std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> InvalidateMeta();
/**
* Return cached region location corresponding to this row,
@@ -186,6 +186,10 @@ class LocationCache : public AsyncRegionLocator {
const std::string &zk_quorum() { return zk_quorum_; }
private:
+ void CloseZooKeeperConnection();
+ void EnsureZooKeeperConnection();
+
+ private:
void RefreshMetaLocation();
hbase::pb::ServerName ReadMetaLocation();
std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
@@ -198,7 +202,7 @@ class LocationCache : public AsyncRegionLocator {
std::string zk_quorum_;
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_;
- std::mutex meta_lock_;
+ std::recursive_mutex meta_lock_;
MetaUtil meta_util_;
std::shared_ptr<ConnectionPool> cp_;
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index 8efecc8..31349a5 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -21,10 +21,13 @@
#include <folly/Conv.h>
#include <memory>
+#include <utility>
+#include <vector>
#include "connection/request.h"
#include "connection/response.h"
#include "core/response-converter.h"
+#include "exceptions/exception.h"
#include "if/Client.pb.h"
#include "serde/region-info.h"
#include "serde/server-name.h"
@@ -38,10 +41,17 @@ using hbase::pb::ServerName;
namespace hbase {
-static const std::string META_REGION = "1588230740";
-static const std::string CATALOG_FAMILY = "info";
-static const std::string REGION_INFO_COLUMN = "regioninfo";
-static const std::string SERVER_COLUMN = "server";
+MetaUtil::MetaUtil() {
+ meta_region_info_.set_start_key("");
+ meta_region_info_.set_end_key("");
+ meta_region_info_.set_offline(false);
+ meta_region_info_.set_split(false);
+ meta_region_info_.set_replica_id(0);
+ meta_region_info_.set_split(false);
+ meta_region_info_.set_region_id(1);
+ meta_region_info_.mutable_table_name()->set_namespace_(MetaUtil::kSystemNamespace);
+ meta_region_info_.mutable_table_name()->set_qualifier(MetaUtil::kMetaTableQualifier);
+}
std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const {
return folly::to<std::string>(tn, ",", row, ",", "999999999999999999");
@@ -56,7 +66,7 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::st
// Set the region this scan goes to
auto region = msg->mutable_region();
- region->set_value(META_REGION);
+ region->set_value(MetaUtil::kMetaRegion);
region->set_type(
RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
@@ -78,30 +88,38 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::st
// Set the columns that we need
auto info_col = scan->add_column();
- info_col->set_family("info");
- info_col->add_qualifier("server");
- info_col->add_qualifier("regioninfo");
+ info_col->set_family(MetaUtil::kCatalogFamily);
+ info_col->add_qualifier(MetaUtil::kServerColumn);
+ info_col->add_qualifier(MetaUtil::kRegionInfoColumn);
scan->set_start_row(RegionLookupRowkey(tn, row));
return request;
}
-std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp) {
+std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp,
+ const TableName &tn) {
std::vector<std::shared_ptr<Result>> results = ResponseConverter::FromScanResponse(resp);
+ if (results.size() == 0) {
+ throw TableNotFoundException(folly::to<std::string>(tn));
+ }
if (results.size() != 1) {
throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" +
std::to_string(results.size()));
}
auto result = *results[0];
- auto region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN);
- auto server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN);
+ auto region_info_str = result.Value(MetaUtil::kCatalogFamily, MetaUtil::kRegionInfoColumn);
+ auto server_str = result.Value(MetaUtil::kCatalogFamily, MetaUtil::kServerColumn);
CHECK(region_info_str);
CHECK(server_str);
auto row = result.Row();
auto region_info = folly::to<RegionInfo>(*region_info_str);
auto server_name = folly::to<ServerName>(*server_str);
- return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr);
+ return std::make_shared<RegionLocation>(row, std::move(region_info), server_name);
+}
+
+bool MetaUtil::IsMeta(const hbase::pb::TableName &tn) {
+ return folly::to<std::string>(tn) == MetaUtil::kMetaTableName;
}
} // namespace hbase
diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h
index d67f32d..d178179 100644
--- a/hbase-native-client/core/meta-utils.h
+++ b/hbase-native-client/core/meta-utils.h
@@ -34,6 +34,17 @@ namespace hbase {
*/
class MetaUtil {
public:
+ static constexpr const char *kSystemNamespace = "hbase";
+ static constexpr const char *kMetaTableQualifier = "meta";
+ static constexpr const char *kMetaTableName = "hbase:meta";
+ static constexpr const char *kMetaRegion = "1588230740";
+ static constexpr const char *kMetaRegionName = "hbase:meta,,1";
+ static constexpr const char *kCatalogFamily = "info";
+ static constexpr const char *kRegionInfoColumn = "regioninfo";
+ static constexpr const char *kServerColumn = "server";
+
+ MetaUtil();
+
/**
* Given a table and a row give the row key from which to start a scan to find
* region locations.
@@ -49,6 +60,17 @@ class MetaUtil {
/**
* Return a RegionLocation from the parsed Response
*/
- std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
+ std::shared_ptr<RegionLocation> CreateLocation(const Response &resp,
+ const hbase::pb::TableName &tn);
+
+ /**
+ * Return whether the table is the meta table.
+ */
+ static bool IsMeta(const hbase::pb::TableName &tn);
+
+ const pb::RegionInfo &meta_region_info() const { return meta_region_info_; }
+
+ private:
+ pb::RegionInfo meta_region_info_;
};
} // namespace hbase
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index d5d9d67..822180b 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -44,9 +44,8 @@ class RegionLocation {
* this region.
* @param service the connected service to the regionserver.
*/
- RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName sn,
- std::shared_ptr<HBaseService> service)
- : region_name_(region_name), ri_(ri), sn_(sn), service_(service) {}
+ RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName sn)
+ : region_name_(region_name), ri_(ri), sn_(sn) {}
/**
* Get a reference to the regio info
@@ -64,19 +63,6 @@ class RegionLocation {
const std::string ®ion_name() const { return region_name_; }
/**
- * Get a service. This could be closed or null. It's the caller's
- * responsibility to check.
- */
- std::shared_ptr<HBaseService> service() { return service_; }
-
- /**
- * Set the service.
- * This should be used if the region moved or if the connection is thought to
- * be bad and a new tcp connection needs to be made.
- */
- void set_service(std::shared_ptr<HBaseService> s) { service_ = s; }
-
- /**
* Set the servername if the region has moved.
*/
void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; }
@@ -89,7 +75,6 @@ class RegionLocation {
std::string region_name_;
hbase::pb::RegionInfo ri_;
hbase::pb::ServerName sn_;
- std::shared_ptr<HBaseService> service_;
};
} // namespace hbase
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 9bc4892..4f9bfb1 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -93,7 +93,7 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const R
std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) {
VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString()
- << " cell_scanner:" << (cell_scanner == nullptr);
+ << " cell_scanner:" << (cell_scanner != nullptr);
int num_results =
cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size();
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index f79d848..2fd7108 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -29,6 +29,7 @@
#include "connection/rpc-client.h"
#include "core/client.h"
#include "core/get.h"
+#include "core/hbase-configuration-loader.h"
#include "core/put.h"
#include "core/scan.h"
#include "core/table.h"
@@ -39,6 +40,7 @@
using hbase::Client;
using hbase::Configuration;
using hbase::Get;
+using hbase::HBaseConfigurationLoader;
using hbase::Scan;
using hbase::Put;
using hbase::Table;
@@ -49,6 +51,7 @@ using hbase::TimeUtil;
DEFINE_string(table, "test_table", "What table to do the reads or writes");
DEFINE_string(row, "row_", "row prefix");
DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
+DEFINE_string(conf, "", "Conf directory to read the config from (optional)");
DEFINE_uint64(num_rows, 10000, "How many rows to write and read");
DEFINE_bool(puts, true, "Whether to perform puts");
DEFINE_bool(gets, true, "Whether to perform gets");
@@ -76,10 +79,17 @@ int main(int argc, char *argv[]) {
FLAGS_logtostderr = 1;
FLAGS_stderrthreshold = 1;
- // Configuration
- auto conf = std::make_shared<Configuration>();
- conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper);
- conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads);
+ std::shared_ptr<Configuration> conf = nullptr;
+ if (FLAGS_conf == "") {
+ // Configuration
+ conf = std::make_shared<Configuration>();
+ conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper);
+ conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads);
+ } else {
+ setenv("HBASE_CONF", FLAGS_conf.c_str(), 1);
+ hbase::HBaseConfigurationLoader loader;
+ conf = std::make_shared<Configuration>(loader.LoadDefaultResources().value());
+ }
auto row = FLAGS_row;
diff --git a/hbase-native-client/core/zk-util.cc b/hbase-native-client/core/zk-util.cc
index 50ea92a..d29c8c3 100644
--- a/hbase-native-client/core/zk-util.cc
+++ b/hbase-native-client/core/zk-util.cc
@@ -55,4 +55,8 @@ std::string ZKUtil::MetaZNode(const hbase::Configuration& conf) {
return zk_node;
}
+int32_t ZKUtil::SessionTimeout(const hbase::Configuration& conf) {
+ return conf.GetInt(kHBaseZookeeperSessionTimeout_, kDefHBaseZookeeperSessionTimeout_);
+}
+
} // namespace hbase
diff --git a/hbase-native-client/core/zk-util.h b/hbase-native-client/core/zk-util.h
index 8f2d627..403fbe4 100644
--- a/hbase-native-client/core/zk-util.h
+++ b/hbase-native-client/core/zk-util.h
@@ -34,8 +34,13 @@ class ZKUtil {
static constexpr const char* kDefHBaseZnodeParent_ = "/hbase";
static constexpr const char* kHBaseMetaRegionServer_ = "meta-region-server";
+ static constexpr const char* kHBaseZookeeperSessionTimeout_ = "zookeeper.session.timeout";
+ static constexpr const int32_t kDefHBaseZookeeperSessionTimeout_ = 90000;
+
static std::string ParseZooKeeperQuorum(const hbase::Configuration& conf);
static std::string MetaZNode(const hbase::Configuration& conf);
+
+ static int32_t SessionTimeout(const hbase::Configuration& conf);
};
} // namespace hbase
diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h
index 1f08298..8010042 100644
--- a/hbase-native-client/serde/region-info.h
+++ b/hbase-native-client/serde/region-info.h
@@ -19,13 +19,13 @@
#pragma once
-#include "if/HBase.pb.h"
-
#include <folly/Conv.h>
#include <boost/algorithm/string/predicate.hpp>
#include <string>
+#include "if/HBase.pb.h"
+
namespace hbase {
namespace pb {
template <class String>
diff --git a/hbase-native-client/serde/table-name.h b/hbase-native-client/serde/table-name.h
index b8b7776..3594802 100644
--- a/hbase-native-client/serde/table-name.h
+++ b/hbase-native-client/serde/table-name.h
@@ -18,12 +18,13 @@
*/
#pragma once
+#include <folly/Conv.h>
+#include <folly/String.h>
+
#include <memory>
#include <string>
#include <vector>
-#include <folly/Conv.h>
-#include <folly/String.h>
#include "if/HBase.pb.h"
namespace hbase {
diff --git a/hbase-native-client/test-util/mini-cluster.cc b/hbase-native-client/test-util/mini-cluster.cc
index 688ea8e..56461e1 100644
--- a/hbase-native-client/test-util/mini-cluster.cc
+++ b/hbase-native-client/test-util/mini-cluster.cc
@@ -59,6 +59,7 @@ JNIEnv *MiniCluster::CreateVM(JavaVM **jvm) {
}
fd.close();
}
+
auto options = std::string{"-Djava.class.path="} + clspath;
jvm_options.optionString = const_cast<char *>(options.c_str());
args.options = &jvm_options;
@@ -185,6 +186,9 @@ JNIEnv *MiniCluster::env() {
}
// converts C char* to Java byte[]
jbyteArray MiniCluster::StrToByteChar(const std::string &str) {
+ if (str.size() == 0) {
+ return nullptr;
+ }
char *p = const_cast<char *>(str.c_str());
int n = str.length();
jbyteArray arr = env_->NewByteArray(n);
diff --git a/hbase-native-client/test-util/test-util.cc b/hbase-native-client/test-util/test-util.cc
index 26862d8..b32c635 100644
--- a/hbase-native-client/test-util/test-util.cc
+++ b/hbase-native-client/test-util/test-util.cc
@@ -79,6 +79,10 @@ void TestUtil::CreateTable(const std::string &table, const std::vector<std::stri
mini_->CreateTable(table, families, keys);
}
+void TestUtil::MoveRegion(const std::string ®ion, const std::string &server) {
+ mini_->MoveRegion(region, server);
+}
+
void TestUtil::StartStandAloneInstance() {
auto p = temp_dir_.path().string();
auto cmd = std::string{"bin/start-local-hbase.sh " + p};
diff --git a/hbase-native-client/test-util/test-util.h b/hbase-native-client/test-util/test-util.h
index e26558b..40e99d1 100644
--- a/hbase-native-client/test-util/test-util.h
+++ b/hbase-native-client/test-util/test-util.h
@@ -68,6 +68,7 @@ class TestUtil {
void StartStandAloneInstance();
void StopStandAloneInstance();
void RunShellCmd(const std::string &);
+ void MoveRegion(const std::string ®ion, const std::string &server);
private:
std::unique_ptr<MiniCluster> mini_;
diff --git a/hbase-native-client/utils/bytes-util-test.cc b/hbase-native-client/utils/bytes-util-test.cc
index 16af021..4a49593 100644
--- a/hbase-native-client/utils/bytes-util-test.cc
+++ b/hbase-native-client/utils/bytes-util-test.cc
@@ -23,8 +23,7 @@
#include "utils/bytes-util.h"
-using namespace std;
-using namespace hbase;
+using hbase::BytesUtil;
TEST(TestBytesUtil, TestToStringBinary) {
std::string empty{""};
diff --git a/hbase-native-client/utils/bytes-util.cc b/hbase-native-client/utils/bytes-util.cc
index a937782..12037c3 100644
--- a/hbase-native-client/utils/bytes-util.cc
+++ b/hbase-native-client/utils/bytes-util.cc
@@ -21,11 +21,11 @@
#include <bits/stdc++.h>
#include <boost/predef.h>
+#include <glog/logging.h>
+
#include <memory>
#include <string>
-#include <glog/logging.h>
-
namespace hbase {
constexpr char BytesUtil::kHexChars[];