You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:54 UTC
[hbase] 66/133: HBASE-17465 [C++] implement request retry mechanism
over RPC (Xiaobing Zhou)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1eb84c2f9049d82c5fd1ffe970ad37249b5cb22b
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Mar 3 19:15:16 2017 -0800
HBASE-17465 [C++] implement request retry mechanism over RPC (Xiaobing Zhou)
---
hbase-native-client/Makefile | 2 +-
hbase-native-client/bin/start-docker.sh | 2 +-
.../connection/connection-factory.cc | 7 +-
.../connection/connection-factory.h | 3 +-
hbase-native-client/connection/connection-pool.cc | 4 +-
hbase-native-client/connection/rpc-client.cc | 42 +---
hbase-native-client/connection/rpc-client.h | 42 +---
hbase-native-client/core/BUCK | 14 ++
.../core/async-rpc-retrying-caller-factory.cc | 22 ++
.../core/async-rpc-retrying-caller-factory.h | 124 ++++++++++
.../core/async-rpc-retrying-caller.cc | 22 ++
.../core/async-rpc-retrying-caller.h | 266 +++++++++++++++++++++
.../core/async-rpc-retrying-test.cc | 255 ++++++++++++++++++++
hbase-native-client/core/client.cc | 3 +-
hbase-native-client/core/client.h | 2 +-
hbase-native-client/core/filter.h | 2 +-
hbase-native-client/core/hbase-rpc-controller.cc | 22 ++
hbase-native-client/core/hbase-rpc-controller.h | 56 +++++
hbase-native-client/core/location-cache.cc | 1 +
hbase-native-client/core/region-location.h | 10 +-
hbase-native-client/core/response_converter.cc | 1 +
hbase-native-client/core/response_converter.h | 1 +
hbase-native-client/core/table.cc | 4 +
hbase-native-client/core/table.h | 5 +
hbase-native-client/{utils => exceptions}/BUCK | 17 +-
hbase-native-client/exceptions/exception.h | 104 ++++++++
hbase-native-client/utils/BUCK | 7 +-
hbase-native-client/utils/connection-util.cc | 26 ++
hbase-native-client/utils/connection-util.h | 62 +++++
hbase-native-client/utils/sys-util.h | 39 +++
hbase-native-client/utils/time-util.h | 52 ++++
31 files changed, 1113 insertions(+), 106 deletions(-)
diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile
index 84ae556..b926220 100644
--- a/hbase-native-client/Makefile
+++ b/hbase-native-client/Makefile
@@ -22,7 +22,7 @@ LD:=g++
DEBUG_PATH = build/debug
RELEASE_PATH = build/release
PROTO_SRC_DIR = build/if
-MODULES = connection core serde test-util utils security
+MODULES = connection core serde test-util utils security exceptions
SRC_DIR = $(MODULES)
DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES))
RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES))
diff --git a/hbase-native-client/bin/start-docker.sh b/hbase-native-client/bin/start-docker.sh
index 1380cdf..8b017a0 100755
--- a/hbase-native-client/bin/start-docker.sh
+++ b/hbase-native-client/bin/start-docker.sh
@@ -56,7 +56,7 @@ docker build -t hbase_native .
# After the image is built run the thing
docker run -p 16050:16050/tcp \
- -v ${BASE_DIR}/..:/usr/src/hbase \
+ -v ${BASE_DIR}/..:/usr/src/hbase \
-v ~/.m2:/root/.m2 \
-it hbase_native /bin/bash
popd
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 2f7e75c..832b00f 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -31,11 +31,10 @@ using std::chrono::milliseconds;
using std::chrono::nanoseconds;
ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
- std::shared_ptr<Codec> codec,
- nanoseconds connect_timeout)
+ std::shared_ptr<Codec> codec, nanoseconds connect_timeout)
: connect_timeout_(connect_timeout),
- io_pool_(io_pool),
- pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
+ io_pool_(io_pool),
+ pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() {
auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index fbcb6ef..32d0bf7 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -44,8 +44,7 @@ class ConnectionFactory {
* There should only be one ConnectionFactory per client.
*/
ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
- std::shared_ptr<Codec> codec,
- nanoseconds connect_timeout = nanoseconds(0));
+ std::shared_ptr<Codec> codec, nanoseconds connect_timeout = nanoseconds(0));
/** Default Destructor */
virtual ~ConnectionFactory() = default;
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index b18ee89..4fe4610 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -22,6 +22,7 @@
#include <folly/SocketAddress.h>
#include <wangle/service/Service.h>
+#include <folly/Logging.h>
#include <memory>
#include <utility>
@@ -34,8 +35,7 @@ using folly::SharedMutexWritePriority;
using folly::SocketAddress;
ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<Codec> codec,
- nanoseconds connect_timeout)
+ std::shared_ptr<Codec> codec, nanoseconds connect_timeout)
: cf_(std::make_shared<ConnectionFactory>(io_executor, codec, connect_timeout)),
clients_(),
connections_(),
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index c61a73e..5fa1138 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -18,27 +18,17 @@
*/
#include "connection/rpc-client.h"
+
+#include <folly/Logging.h>
#include <unistd.h>
#include <wangle/concurrent/IOThreadPoolExecutor.h>
+#include <memory>
+#include <string>
using hbase::RpcClient;
-using hbase::AbstractRpcChannel;
namespace hbase {
-class RpcChannelImplementation : public AbstractRpcChannel {
- public:
- RpcChannelImplementation(std::shared_ptr<RpcClient> rpc_client, const std::string& host,
- uint16_t port, std::shared_ptr<User> ticket, int rpc_timeout)
- : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {}
-
- void CallMethod(const MethodDescriptor* method, RpcController* controller, const Message* request,
- Message* response, Closure* done) override {
- rpc_client_->CallMethod(method, controller, request, response, done, host_, port_, ticket_);
- }
-};
-} // namespace hbase
-
RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
std::shared_ptr<Codec> codec, nanoseconds connect_timeout)
: io_executor_(io_executor) {
@@ -80,26 +70,4 @@ folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string&
std::shared_ptr<RpcConnection> RpcClient::GetConnection(std::shared_ptr<ConnectionId> remote_id) {
return cp_->GetConnection(remote_id);
}
-
-std::shared_ptr<RpcChannel> RpcClient::CreateRpcChannel(const std::string& host, uint16_t port,
- std::shared_ptr<User> ticket,
- int rpc_timeout) {
- std::shared_ptr<RpcChannelImplementation> channel = std::make_shared<RpcChannelImplementation>(
- shared_from_this(), host, port, ticket, rpc_timeout);
-
- /* static_pointer_cast is safe since RpcChannelImplementation derives
- * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */
- return std::static_pointer_cast<RpcChannel>(channel);
-}
-
-void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* controller,
- const Message* req_msg, Message* resp_msg, Closure* done,
- const std::string& host, uint16_t port, std::shared_ptr<User> ticket) {
- std::shared_ptr<Message> shared_req(const_cast<Message*>(req_msg));
- std::shared_ptr<Message> shared_resp(resp_msg);
-
- std::unique_ptr<Request> req = std::make_unique<Request>(shared_req, shared_resp, method->name());
-
- AsyncCall(host, port, std::move(req), ticket, method->service()->name())
- .then([done, this](std::unique_ptr<Response> resp) { done->Run(); });
-}
+} // namespace hbase
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index 5c11ab5..d416ceb 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -38,24 +38,15 @@ using hbase::ConnectionPool;
using hbase::RpcConnection;
using hbase::security::User;
-using google::protobuf::MethodDescriptor;
-using google::protobuf::RpcChannel;
using google::protobuf::Message;
-using google::protobuf::RpcController;
-using google::protobuf::Closure;
-
using std::chrono::nanoseconds;
-class RpcChannelImplementation;
-
namespace hbase {
-class RpcClient : public std::enable_shared_from_this<RpcClient> {
- friend class RpcChannelImplementation;
-
+class RpcClient {
public:
- RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<Codec> codec, nanoseconds connect_timeout);
+ RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, std::shared_ptr<Codec> codec,
+ nanoseconds connect_timeout = nanoseconds(0));
virtual ~RpcClient() { Close(); }
@@ -79,40 +70,13 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
virtual void Close();
- virtual std::shared_ptr<RpcChannel> CreateRpcChannel(const std::string &host, uint16_t port,
- std::shared_ptr<User> ticket,
- int rpc_timeout);
-
std::shared_ptr<ConnectionPool> connection_pool() const { return cp_; }
private:
- void CallMethod(const MethodDescriptor *method, RpcController *controller, const Message *req_msg,
- Message *resp_msg, Closure *done, const std::string &host, uint16_t port,
- std::shared_ptr<User> ticket);
std::shared_ptr<RpcConnection> GetConnection(std::shared_ptr<ConnectionId> remote_id);
private:
std::shared_ptr<ConnectionPool> cp_;
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
};
-
-class AbstractRpcChannel : public RpcChannel {
- public:
- AbstractRpcChannel(std::shared_ptr<RpcClient> rpc_client, const std::string &host, uint16_t port,
- std::shared_ptr<User> ticket, int rpc_timeout)
- : rpc_client_(rpc_client),
- host_(host),
- port_(port),
- ticket_(ticket),
- rpc_timeout_(rpc_timeout) {}
-
- virtual ~AbstractRpcChannel() = default;
-
- protected:
- std::shared_ptr<RpcClient> rpc_client_;
- std::string host_;
- uint16_t port_;
- std::shared_ptr<User> ticket_;
- int rpc_timeout_;
-};
} // namespace hbase
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index e541d8f..2f4f6c1 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -40,6 +40,9 @@ cxx_library(
"request_converter.h",
"response_converter.h",
"table.h",
+ "async-rpc-retrying-caller-factory.h",
+ "async-rpc-retrying-caller.h",
+ "hbase-rpc-controller.h",
],
srcs=[
"cell.cc",
@@ -58,6 +61,8 @@ cxx_library(
"table.cc",
],
deps=[
+ "//exceptions:exceptions",
+ "//utils:utils",
"//connection:connection",
"//if:if",
"//serde:serde",
@@ -96,6 +101,15 @@ cxx_test(
deps=[":core",],
run_test_separately=True,)
cxx_test(
+ name="retry-test",
+ srcs=["async-rpc-retrying-test.cc",],
+ deps=[
+ ":core",
+ "//test-util:test-util",
+ "//exceptions:exceptions",
+ ],
+ run_test_separately=True,)
+cxx_test(
name="time_range-test",
srcs=["time_range-test.cc",],
deps=[":core",],
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
new file mode 100644
index 0000000..0ac9cac
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-rpc-retrying-caller-factory.h"
+
+namespace hbase {} // namespace hbase
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
new file mode 100644
index 0000000..3342e29
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Logging.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/async/EventBase.h>
+#include <chrono>
+#include <memory>
+#include <string>
+
+#include "connection/rpc-client.h"
+#include "core/async-rpc-retrying-caller.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+using hbase::pb::TableName;
+using std::chrono::nanoseconds;
+
+namespace hbase {
+
+template <typename CONN, typename RESP, typename RPC_CLIENT>
+class SingleRequestCallerBuilder
+ : public std::enable_shared_from_this<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>> {
+ public:
+ explicit SingleRequestCallerBuilder(std::shared_ptr<CONN> conn)
+ : conn_(conn),
+ table_name_(nullptr),
+ rpc_timeout_nanos_(0),
+ operation_timeout_nanos_(0),
+ locate_type_(RegionLocateType::kCurrent) {}
+
+ virtual ~SingleRequestCallerBuilder() = default;
+
+ typedef SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT> GenenericThisType;
+ typedef std::shared_ptr<GenenericThisType> SharedThisPtr;
+
+ SharedThisPtr table(std::shared_ptr<TableName> table_name) {
+ table_name_ = table_name;
+ return shared_this();
+ }
+
+ SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) {
+ rpc_timeout_nanos_ = rpc_timeout_nanos;
+ return shared_this();
+ }
+
+ SharedThisPtr operation_timeout(nanoseconds operation_timeout_nanos) {
+ operation_timeout_nanos_ = operation_timeout_nanos;
+ return shared_this();
+ }
+
+ SharedThisPtr row(const std::string& row) {
+ row_ = row;
+ return shared_this();
+ }
+
+ SharedThisPtr locate_type(RegionLocateType locate_type) {
+ locate_type_ = locate_type;
+ return shared_this();
+ }
+
+ SharedThisPtr action(Callable<RESP, RPC_CLIENT> callable) {
+ callable_ = callable;
+ return shared_this();
+ }
+
+ folly::Future<RESP> Call() { return Build()->Call(); }
+
+ std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<CONN, RESP, RPC_CLIENT>> Build() {
+ return std::make_shared<AsyncSingleRequestRpcRetryingCaller<CONN, RESP, RPC_CLIENT>>(
+ conn_, table_name_, row_, locate_type_, callable_, conn_->get_conn_conf()->GetPauseNs(),
+ conn_->get_conn_conf()->GetMaxRetries(), operation_timeout_nanos_, rpc_timeout_nanos_,
+ conn_->get_conn_conf()->GetStartLogErrorsCount());
+ }
+
+ private:
+ SharedThisPtr shared_this() {
+ return std::enable_shared_from_this<GenenericThisType>::shared_from_this();
+ }
+
+ private:
+ std::shared_ptr<CONN> conn_;
+ std::shared_ptr<TableName> table_name_;
+ nanoseconds rpc_timeout_nanos_;
+ nanoseconds operation_timeout_nanos_;
+ std::string row_;
+ RegionLocateType locate_type_;
+ Callable<RESP, RPC_CLIENT> callable_;
+}; // end of SingleRequestCallerBuilder
+
+template <typename CONN>
+class AsyncRpcRetryingCallerFactory {
+ private:
+ std::shared_ptr<CONN> conn_;
+
+ public:
+ explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<CONN> conn) : conn_(conn) {}
+
+ virtual ~AsyncRpcRetryingCallerFactory() = default;
+
+ template <typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+ std::shared_ptr<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>> Single() {
+ return std::make_shared<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>>(conn_);
+ }
+};
+
+} // namespace hbase
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
new file mode 100644
index 0000000..743b6bb
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-rpc-retrying-caller.h"
+
+namespace hbase {} /* namespace hbase */
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
new file mode 100644
index 0000000..f7a1523
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-caller.h
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/HHWheelTimer.h>
+#include <algorithm>
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+#include "connection/rpc-client.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/region-location.h"
+#include "exceptions/exception.h"
+#include "if/HBase.pb.h"
+#include "utils/connection-util.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+template <typename T>
+using Supplier = std::function<T()>;
+
+template <typename T>
+using Consumer = std::function<void(T)>;
+
+template <typename R, typename S, typename... I>
+using ReqConverter = std::function<R(const S&, const I&...)>;
+
+template <typename R, typename S>
+using RespConverter = std::function<R(const S&)>;
+
+template <typename RESP>
+using RpcCallback = std::function<void(const RESP&)>;
+
+template <typename REQ, typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+using RpcCall = std::function<folly::Future<std::unique_ptr<RESP>>(
+ std::shared_ptr<RPC_CLIENT>, std::shared_ptr<RegionLocation>,
+ std::shared_ptr<HBaseRpcController>, std::unique_ptr<REQ>)>;
+
+template <typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+using Callable = std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>,
+ std::shared_ptr<RegionLocation>,
+ std::shared_ptr<RPC_CLIENT>)>;
+
+template <typename CONN, typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+class AsyncSingleRequestRpcRetryingCaller {
+ public:
+ AsyncSingleRequestRpcRetryingCaller(std::shared_ptr<CONN> conn,
+ std::shared_ptr<hbase::pb::TableName> table_name,
+ const std::string& row, RegionLocateType locate_type,
+ Callable<RESP, RPC_CLIENT> callable, nanoseconds pause_ns,
+ uint32_t max_retries, nanoseconds operation_timeout_nanos,
+ nanoseconds rpc_timeout_nanos,
+ uint32_t start_log_errors_count)
+ : conn_(conn),
+ table_name_(table_name),
+ row_(row),
+ locate_type_(locate_type),
+ callable_(callable),
+ pause_ns_(pause_ns),
+ max_retries_(max_retries),
+ operation_timeout_nanos_(operation_timeout_nanos),
+ rpc_timeout_nanos_(rpc_timeout_nanos),
+ start_log_errors_count_(start_log_errors_count),
+ promise_(std::make_shared<folly::Promise<RESP>>()),
+ tries_(1) {
+ controller_ = conn_->get_rpc_controller_factory()->NewController();
+ start_ns_ = TimeUtil::GetNowNanos();
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+ exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+ retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_);
+ }
+
+ virtual ~AsyncSingleRequestRpcRetryingCaller() {}
+
+ folly::Future<RESP> Call() {
+ auto f = promise_->getFuture();
+ LocateThenCall();
+ return f;
+ }
+
+ private:
+ void LocateThenCall() {
+ int64_t locate_timeout_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ locate_timeout_ns = RemainingTimeNs();
+ if (locate_timeout_ns <= 0) {
+ CompleteExceptionally();
+ return;
+ }
+ } else {
+ locate_timeout_ns = -1L;
+ }
+
+ conn_->get_locator()
+ ->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns)
+ .then([this](RegionLocation& loc) { Call(loc); })
+ .onError([this](const std::exception& e) {
+ OnError(e,
+ [this]() -> std::string {
+ return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
+ table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) +
+ ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " +
+ TimeUtil::ToMillisStr(operation_timeout_nanos_) +
+ " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
+ " ms";
+ },
+ [](const std::exception& error) {});
+ });
+ }
+
+ void OnError(const std::exception& error, Supplier<std::string> err_msg,
+ Consumer<std::exception> update_cached_location) {
+ ThrowableWithExtraContext twec(std::make_shared<std::exception>(error),
+ TimeUtil::GetNowNanos());
+ exceptions_->push_back(twec);
+ if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) ||
+ tries_ >= max_retries_) {
+ CompleteExceptionally();
+ return;
+ }
+
+ int64_t delay_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+ if (max_delay_ns <= 0) {
+ CompleteExceptionally();
+ return;
+ }
+ delay_ns =
+ std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1));
+ } else {
+ delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1);
+ }
+ update_cached_location(error);
+ tries_++;
+ retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); },
+ milliseconds(TimeUtil::ToMillis(delay_ns)));
+ }
+
+ void Call(const RegionLocation& loc) {
+ int64_t call_timeout_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ call_timeout_ns = this->RemainingTimeNs();
+ if (call_timeout_ns <= 0) {
+ this->CompleteExceptionally();
+ return;
+ }
+ call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
+ } else {
+ call_timeout_ns = rpc_timeout_nanos_.count();
+ }
+
+ std::shared_ptr<RPC_CLIENT> rpc_client;
+ try {
+ rpc_client = conn_->GetRpcClient();
+ } catch (const IOException& e) {
+ OnError(e,
+ [&, this]() -> std::string {
+ return "Get async rpc_client to " +
+ folly::sformat("{0}:{1}", loc.server_name().host_name(),
+ loc.server_name().port()) +
+ " for '" + row_ + "' in " + loc.DebugString() + " of " +
+ table_name_->namespace_() + "::" + table_name_->qualifier() +
+ " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
+ std::to_string(max_attempts_) + ", timeout = " +
+ TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
+ " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
+ },
+ [&, this](const std::exception& error) {
+ conn_->get_locator()->UpdateCachedLocation(loc, error);
+ });
+ return;
+ }
+
+ ResetController(controller_, call_timeout_ns);
+
+ callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client)
+ .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
+ .onError([&, this](const std::exception& e) {
+ OnError(e,
+ [&, this]() -> std::string {
+ return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(),
+ loc.server_name().port()) +
+ " for '" + row_ + "' in " + loc.DebugString() + " of " +
+ table_name_->namespace_() + "::" + table_name_->qualifier() +
+ " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
+ std::to_string(max_attempts_) + ", timeout = " +
+ TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
+ " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
+ " ms";
+ },
+ [&, this](const std::exception& error) {
+ conn_->get_locator()->UpdateCachedLocation(loc, error);
+ });
+ return;
+ });
+ }
+
+ void CompleteExceptionally() {
+ this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+ }
+
+ int64_t RemainingTimeNs() {
+ return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+ }
+
+ static void ResetController(std::shared_ptr<HBaseRpcController> controller,
+ const int64_t& timeout_ns) {
+ controller->Reset();
+ if (timeout_ns >= 0) {
+ controller->set_call_timeout(
+ milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns))));
+ }
+ }
+
+ private:
+ folly::HHWheelTimer::UniquePtr retry_timer_;
+ std::shared_ptr<CONN> conn_;
+ std::shared_ptr<hbase::pb::TableName> table_name_;
+ std::string row_;
+ RegionLocateType locate_type_;
+ Callable<RESP, RPC_CLIENT> callable_;
+ nanoseconds pause_ns_;
+ uint32_t max_retries_;
+ nanoseconds operation_timeout_nanos_;
+ nanoseconds rpc_timeout_nanos_;
+ uint32_t start_log_errors_count_;
+ std::shared_ptr<folly::Promise<RESP>> promise_;
+ std::shared_ptr<HBaseRpcController> controller_;
+ uint64_t start_ns_;
+ uint32_t tries_;
+ std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
+ uint32_t max_attempts_;
+ folly::EventBase event_base_;
+};
+
+} /* namespace hbase */
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
new file mode 100644
index 0000000..a9b0017
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Memory.h>
+#include <folly/futures/Future.h>
+#include <gmock/gmock.h>
+#include <google/protobuf/stubs/callback.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <functional>
+#include <string>
+
+#include "connection/request.h"
+#include "connection/response.h"
+#include "connection/rpc-client.h"
+#include "core/async-rpc-retrying-caller-factory.h"
+#include "core/async-rpc-retrying-caller.h"
+#include "core/client.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/keyvalue-codec.h"
+#include "core/region-location.h"
+#include "core/request_converter.h"
+#include "core/response_converter.h"
+#include "core/result.h"
+#include "exceptions/exception.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+#include "test-util/test-util.h"
+
+using namespace google::protobuf;
+using namespace hbase;
+using namespace hbase::pb;
+using namespace std::placeholders;
+using namespace testing;
+using ::testing::Return;
+using ::testing::_;
+using std::chrono::nanoseconds;
+
+class MockRpcControllerFactory {
+ public:
+ MOCK_METHOD0(NewController, std::shared_ptr<HBaseRpcController>());
+};
+
+class MockAsyncConnectionConfiguration {
+ public:
+ MOCK_METHOD0(GetPauseNs, nanoseconds());
+ MOCK_METHOD0(GetMaxRetries, int32_t());
+ MOCK_METHOD0(GetStartLogErrorsCount, int32_t());
+ MOCK_METHOD0(GetReadRpcTimeoutNs, nanoseconds());
+ MOCK_METHOD0(GetOperationTimeoutNs, nanoseconds());
+};
+
+class AsyncRegionLocator {
+ public:
+ explicit AsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : region_location_(region_location) {}
+ ~AsyncRegionLocator() = default;
+
+ folly::Future<RegionLocation> GetRegionLocation(std::shared_ptr<hbase::pb::TableName>,
+ const std::string&, RegionLocateType, int64_t) {
+ folly::Promise<RegionLocation> promise;
+ promise.setValue(*region_location_);
+ return promise.getFuture();
+ }
+
+ void UpdateCachedLocation(const RegionLocation&, const std::exception&) {}
+
+ private:
+ std::shared_ptr<RegionLocation> region_location_;
+};
+
+class MockAsyncConnection {
+ public:
+ MOCK_METHOD0(get_conn_conf, std::shared_ptr<MockAsyncConnectionConfiguration>());
+ MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr<MockRpcControllerFactory>());
+ MOCK_METHOD0(get_locator, std::shared_ptr<AsyncRegionLocator>());
+ MOCK_METHOD0(GetRpcClient, std::shared_ptr<hbase::RpcClient>());
+};
+
+template <typename CONN>
+class MockRawAsyncTableImpl {
+ public:
+ explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn)
+ : conn_(conn), promise_(std::make_shared<folly::Promise<hbase::Result>>()) {}
+ virtual ~MockRawAsyncTableImpl() = default;
+
+ /* implement this in real RawAsyncTableImpl. */
+
+ /* in real RawAsyncTableImpl, this should be private. */
+ folly::Future<hbase::Result> GetCall(std::shared_ptr<hbase::RpcClient> rpc_client,
+ std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<RegionLocation> loc, const hbase::Get& get) {
+ hbase::RpcCall<hbase::Request, hbase::Response, hbase::RpcClient> rpc_call = [](
+ std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
+ std::shared_ptr<HBaseRpcController> controller,
+ std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
+ return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
+ std::move(preq), User::defaultUser(), "ClientService");
+ };
+
+ return Call<hbase::Get, hbase::Request, hbase::Response, hbase::Result>(
+ rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call,
+ &hbase::ResponseConverter::FromGetResponse);
+ }
+
+ /* in real RawAsyncTableImpl, this should be private. */
+ template <typename REQ, typename PREQ, typename PRESP, typename RESP>
+ folly::Future<RESP> Call(
+ std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<RegionLocation> loc, const REQ& req,
+ const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter,
+ const hbase::RpcCall<PREQ, PRESP, hbase::RpcClient>& rpc_call,
+ const RespConverter<std::unique_ptr<RESP>, PRESP>& resp_converter) {
+ rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
+ .then([&, this](std::unique_ptr<PRESP> presp) {
+ std::unique_ptr<hbase::Result> result = hbase::ResponseConverter::FromGetResponse(*presp);
+ promise_->setValue(std::move(*result));
+ })
+ .onError([this](const std::exception& e) { promise_->setException(e); });
+ return promise_->getFuture();
+ }
+
+ private:
+ std::shared_ptr<CONN> conn_;
+ std::shared_ptr<folly::Promise<hbase::Result>> promise_;
+};
+
+TEST(AsyncRpcRetryTest, TestGetBasic) {
+ // Remove already configured env if present.
+ unsetenv("HBASE_CONF");
+
+ // Using TestUtil to populate test data
+ hbase::TestUtil* test_util = new hbase::TestUtil();
+ test_util->RunShellCmd("create 't', 'd'");
+ test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'");
+ test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>("t");
+ auto row = "test2";
+
+ // Get to be performed on above HBase Table
+ hbase::Get get(row);
+
+ // Create Configuration
+ hbase::Configuration conf;
+
+ // Create a client
+ Client client(conf);
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ /* init region location and rpc channel */
+ auto region_location = table->GetRegionLocation(row);
+
+ auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ auto codec = std::make_shared<hbase::KeyValueCodec>();
+ auto rpc_client = std::make_shared<RpcClient>(io_executor_, codec);
+
+ /* init rpc controller */
+ auto controller = std::make_shared<HBaseRpcController>();
+
+ /* init rpc controller factory */
+ auto controller_factory = std::make_shared<MockRpcControllerFactory>();
+ EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller));
+
+ /* init connection configuration */
+ auto connection_conf = std::make_shared<MockAsyncConnectionConfiguration>();
+ EXPECT_CALL((*connection_conf), GetPauseNs())
+ .Times(1)
+ .WillRepeatedly(Return(nanoseconds(100000000)));
+ EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31));
+ EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9));
+ EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs())
+ .Times(1)
+ .WillRepeatedly(Return(nanoseconds(60000000000)));
+ EXPECT_CALL((*connection_conf), GetOperationTimeoutNs())
+ .Times(1)
+ .WillRepeatedly(Return(nanoseconds(1200000000000)));
+
+ /* init region locator */
+ auto region_locator = std::make_shared<AsyncRegionLocator>(region_location);
+
+ /* init hbase client connection */
+ auto conn = std::make_shared<MockAsyncConnection>();
+ EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf));
+ EXPECT_CALL((*conn), get_rpc_controller_factory())
+ .Times(AtLeast(1))
+ .WillRepeatedly(Return(controller_factory));
+ EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator));
+ EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly(Return(rpc_client));
+
+ /* init retry caller factory */
+ auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn);
+ AsyncRpcRetryingCallerFactory<MockAsyncConnection> caller_factory(conn);
+
+ /* init request caller builder */
+ auto builder = caller_factory.Single<hbase::Result>();
+
+ /* call with retry to get result */
+ try {
+ auto async_caller =
+ builder->table(std::make_shared<TableName>(tn))
+ ->row(row)
+ ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs())
+ ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs())
+ ->action(
+ [=, &get](
+ std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc,
+ std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<hbase::Result> {
+ return tableImpl->GetCall(rpc_client, controller, loc, get);
+ })
+ ->Build();
+
+ hbase::Result result = async_caller->Call().get();
+
+ /*Stopping the connection as we are getting segfault due to some folly issue
+ The connection stays open and we don't want that.
+ So we are stopping the connection.
+ We can remove this once we have fixed the folly part */
+ delete test_util;
+
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty.";
+ EXPECT_EQ("test2", result.Row());
+ EXPECT_EQ("value2", *(result.Value("d", "2")));
+ EXPECT_EQ("value for extra", *(result.Value("d", "extra")));
+ } catch (std::exception& e) {
+ LOG(ERROR) << e.what();
+ throw e;
+ }
+
+ table->Close();
+ client.Close();
+}
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 240da72..f0483ef 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -57,7 +57,8 @@ void Client::init(const hbase::Configuration &conf) {
} else {
LOG(WARNING) << "Not using RPC Cell Codec";
}
- rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout());
+ rpc_client_ =
+ std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout());
location_cache_ =
std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index a96d6f3..e73ab70 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -89,7 +89,7 @@ class Client {
bool is_closed_ = false;
/** Methods */
- void init(const hbase::Configuration &conf);
+ void init(const hbase::Configuration& conf);
};
} // namespace hbase
diff --git a/hbase-native-client/core/filter.h b/hbase-native-client/core/filter.h
index b5b7133..10accaa 100644
--- a/hbase-native-client/core/filter.h
+++ b/hbase-native-client/core/filter.h
@@ -20,9 +20,9 @@
#pragma once
#include <memory>
+#include <set>
#include <string>
#include <utility>
-#include <set>
#include <vector>
#include "if/Comparator.pb.h"
diff --git a/hbase-native-client/core/hbase-rpc-controller.cc b/hbase-native-client/core/hbase-rpc-controller.cc
new file mode 100644
index 0000000..bc53781
--- /dev/null
+++ b/hbase-native-client/core/hbase-rpc-controller.cc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/hbase-rpc-controller.h"
+
+namespace hbase {} /* namespace hbase */
diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h
new file mode 100644
index 0000000..661c810
--- /dev/null
+++ b/hbase-native-client/core/hbase-rpc-controller.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <google/protobuf/service.h>
+#include <chrono>
+#include <string>
+
+using google::protobuf::RpcController;
+using google::protobuf::Closure;
+
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+class HBaseRpcController : public RpcController {
+ public:
+ HBaseRpcController() {}
+ virtual ~HBaseRpcController() = default;
+
+ void set_call_timeout(const milliseconds& call_timeout) {
+ // TODO:
+ }
+
+ void Reset() override {}
+
+ bool Failed() const override { return false; }
+
+ std::string ErrorText() const override { return ""; }
+
+ void StartCancel() override {}
+
+ void SetFailed(const std::string& reason) override {}
+
+ bool IsCanceled() const override { return false; }
+
+ void NotifyOnCancel(Closure* callback) override {}
+};
+
+} /* namespace hbase */
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index da9f64a..17032fe 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -25,6 +25,7 @@
#include <utility>
+#include <folly/Logging.h>
#include "connection/response.h"
#include "connection/rpc-connection.h"
#include "if/Client.pb.h"
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index b0411cb..4087d94 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -26,6 +26,8 @@
namespace hbase {
+enum RegionLocateType { kBefore, kCurrent, kAfter };
+
/**
* @brief class to hold where a region is located.
*
@@ -49,17 +51,17 @@ class RegionLocation {
/**
* Get a reference to the regio info
*/
- const hbase::pb::RegionInfo ®ion_info() { return ri_; }
+ const hbase::pb::RegionInfo ®ion_info() const { return ri_; }
/**
* Get a reference to the server name
*/
- const hbase::pb::ServerName &server_name() { return sn_; }
+ const hbase::pb::ServerName &server_name() const { return sn_; }
/**
* Get a reference to the region name.
*/
- const std::string ®ion_name() { return region_name_; }
+ const std::string ®ion_name() const { return region_name_; }
/**
* Get a service. This could be closed or null. It's the caller's
@@ -79,7 +81,7 @@ class RegionLocation {
*/
void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; }
- const std::string DebugString() {
+ const std::string DebugString() const {
return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString();
}
diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc
index 19a3554..2497306 100644
--- a/hbase-native-client/core/response_converter.cc
+++ b/hbase-native-client/core/response_converter.cc
@@ -19,6 +19,7 @@
#include "core/response_converter.h"
+#include <string>
#include <vector>
#include "core/cell.h"
diff --git a/hbase-native-client/core/response_converter.h b/hbase-native-client/core/response_converter.h
index 859644b..759b1ce 100644
--- a/hbase-native-client/core/response_converter.h
+++ b/hbase-native-client/core/response_converter.h
@@ -20,6 +20,7 @@
#pragma once
#include <memory>
+#include <vector>
#include "connection/response.h"
#include "core/result.h"
#include "if/Client.pb.h"
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 4e30d4b..ba4dc29 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -71,4 +71,8 @@ void Table::Close() {
is_closed_ = true;
}
+std::shared_ptr<RegionLocation> Table::GetRegionLocation(const std::string &row) {
+ return location_cache_->LocateRegion(*table_name_, row).get();
+}
+
} /* namespace hbase */
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 0e98cd2..f82382e 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -57,6 +57,11 @@ class Table {
*/
void Close();
+ /**
+ * @brief - Get region location for a row in current table.
+ */
+ std::shared_ptr<RegionLocation> GetRegionLocation(const std::string &row);
+
private:
std::shared_ptr<TableName> table_name_;
std::shared_ptr<hbase::LocationCache> location_cache_;
diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/exceptions/BUCK
similarity index 71%
copy from hbase-native-client/utils/BUCK
copy to hbase-native-client/exceptions/BUCK
index 796f2f5..a23654c 100644
--- a/hbase-native-client/utils/BUCK
+++ b/hbase-native-client/exceptions/BUCK
@@ -16,14 +16,9 @@
# limitations under the License.
cxx_library(
- name="utils",
- exported_headers=["user-util.h", "version.h"],
- srcs=["user-util.cc",],
- deps=['//third-party:folly',],
- tests=[":user-util-test"],
- visibility=['PUBLIC',],
- compiler_flags=['-Weffc++'],)
-cxx_test(
- name="user-util-test",
- srcs=["user-util-test.cc",],
- deps=[":utils",],)
+ name="exceptions",
+ exported_headers=["exception.h",],
+ srcs=[],
+ deps=["//third-party:folly",],
+ compiler_flags=['-Weffc++'],
+ visibility=['//core/...'],)
\ No newline at end of file
diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h
new file mode 100644
index 0000000..c0c4142
--- /dev/null
+++ b/hbase-native-client/exceptions/exception.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <exception>
+#include <string>
+#include <vector>
+#include <folly/io/IOBuf.h>
+
+namespace hbase {
+
+class ThrowableWithExtraContext {
+public:
+ ThrowableWithExtraContext(std::shared_ptr<std::exception> cause,
+ const long& when) :
+ cause_(cause), when_(when), extras_("") {
+ }
+
+ ThrowableWithExtraContext(std::shared_ptr<std::exception> cause,
+ const long& when, const std::string& extras) :
+ cause_(cause), when_(when), extras_(extras) {
+ }
+
+ std::string ToString() {
+ // TODO:
+ // return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
+ return extras_ + ", " + cause_->what();
+ }
+
+ std::shared_ptr<std::exception> cause() {
+ return cause_;
+ }
+private:
+ std::shared_ptr<std::exception> cause_;
+ long when_;
+ std::string extras_;
+};
+
+class IOException: public std::logic_error {
+public:
+ IOException(
+ const std::string& what) :
+ logic_error(what), cause_(nullptr) {}
+ IOException(
+ const std::string& what,
+ std::shared_ptr<std::exception> cause) :
+ logic_error(what), cause_(cause) {}
+ virtual ~IOException() = default;
+
+ std::shared_ptr<std::exception> cause() {
+ return cause_;
+ }
+private:
+ const std::shared_ptr<std::exception> cause_;
+};
+
+class RetriesExhaustedException: public IOException {
+public:
+ RetriesExhaustedException(
+ const int& num_retries,
+ std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) :
+ IOException(
+ GetMessage(num_retries, exceptions),
+ exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){
+ }
+ virtual ~RetriesExhaustedException() = default;
+
+private:
+ std::string GetMessage(
+ const int& num_retries,
+ std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) {
+ std::string buffer("Failed after attempts=");
+ buffer.append(std::to_string(num_retries + 1));
+ buffer.append(", exceptions:\n");
+ for (auto it = exceptions->begin(); it != exceptions->end(); it++) {
+ buffer.append(it->ToString());
+ buffer.append("\n");
+ }
+ return buffer;
+ }
+};
+
+class HBaseIOException : public IOException {
+};
+
+class DoNotRetryIOException : public HBaseIOException {
+};
+} // namespace hbase
diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/utils/BUCK
index 796f2f5..eae929e 100644
--- a/hbase-native-client/utils/BUCK
+++ b/hbase-native-client/utils/BUCK
@@ -17,8 +17,11 @@
cxx_library(
name="utils",
- exported_headers=["user-util.h", "version.h"],
- srcs=["user-util.cc",],
+ exported_headers=[
+ "user-util.h", "version.h", "connection-util.h", "sys-util.h",
+ "time-util.h"
+ ],
+ srcs=["user-util.cc", "connection-util.cc"],
deps=['//third-party:folly',],
tests=[":user-util-test"],
visibility=['PUBLIC',],
diff --git a/hbase-native-client/utils/connection-util.cc b/hbase-native-client/utils/connection-util.cc
new file mode 100644
index 0000000..76689bf
--- /dev/null
+++ b/hbase-native-client/utils/connection-util.cc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "utils/connection-util.h"
+
+namespace hbase {
+
+const std::vector<uint32_t> ConnectionUtils::kRetryBackoff = {1, 2, 3, 5, 10, 20, 40,
+ 100, 100, 100, 100, 200, 200};
+} /* namespace hbase */
diff --git a/hbase-native-client/utils/connection-util.h b/hbase-native-client/utils/connection-util.h
new file mode 100644
index 0000000..f52c2f9
--- /dev/null
+++ b/hbase-native-client/utils/connection-util.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <climits>
+#include <cstdlib>
+#include <memory>
+#include <vector>
+#include "utils/time-util.h"
+
+namespace hbase {
+class ConnectionUtils {
+ public:
+ static int Retries2Attempts(const int& retries) {
+ return std::max(1, retries == INT_MAX ? INT_MAX : retries + 1);
+ }
+
+ /* Add a delta to avoid timeout immediately after a retry sleeping. */
+ static const uint64_t kSleepDeltaNs = 1000000;
+
+ static const std::vector<uint32_t> kRetryBackoff;
+ /**
+ * Calculate pause time. Built on {@link kRetryBackoff}.
+ * @param pause time to pause
+ * @param tries amount of tries
+ * @return How long to wait after <code>tries</code> retries
+ */
+ static int64_t GetPauseTime(const int64_t& pause, const int32_t& tries) {
+ int32_t ntries = tries;
+ if (static_cast<size_t>(ntries) >= kRetryBackoff.size()) {
+ ntries = kRetryBackoff.size() - 1;
+ }
+ if (ntries < 0) {
+ ntries = 0;
+ }
+
+ int64_t normal_pause = pause * kRetryBackoff[ntries];
+ // 1% possible jitter
+ float r = static_cast<float>(std::rand()) / static_cast<float>(RAND_MAX);
+ int64_t jitter = (int64_t)(normal_pause * r * 0.01f);
+ return normal_pause + jitter;
+ }
+};
+} /* namespace hbase */
diff --git a/hbase-native-client/utils/sys-util.h b/hbase-native-client/utils/sys-util.h
new file mode 100644
index 0000000..68f00d7
--- /dev/null
+++ b/hbase-native-client/utils/sys-util.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <type_traits>
+
+namespace hbase {
+
+class SysUtil {
+ public:
+ template <class BASE, typename DERIVED>
+ static constexpr bool InstanceOf(const DERIVED& object) {
+ return !dynamic_cast<const BASE*>(&object);
+ }
+
+ template <typename BASE, typename DERIVED>
+ static constexpr bool InstanceOf() {
+ return std::is_base_of<BASE, DERIVED>();
+ }
+};
+
+} /* namespace hbase */
diff --git a/hbase-native-client/utils/time-util.h b/hbase-native-client/utils/time-util.h
new file mode 100644
index 0000000..bbc3b35
--- /dev/null
+++ b/hbase-native-client/utils/time-util.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <chrono>
+#include <string>
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+class TimeUtil {
+ public:
+ static int64_t ToMillis(const int64_t& nanos) {
+ return std::chrono::duration_cast<milliseconds>(nanoseconds(nanos)).count();
+ }
+
+ static std::string ToMillisStr(const nanoseconds& nanos) {
+ return std::to_string(std::chrono::duration_cast<milliseconds>(nanos).count());
+ }
+
+ static int64_t GetNowNanos() {
+ auto duration = std::chrono::high_resolution_clock::now().time_since_epoch();
+ return std::chrono::duration_cast<nanoseconds>(duration).count();
+ }
+
+ static int64_t ElapsedMillis(const int64_t& start_ns) {
+ return std::chrono::duration_cast<milliseconds>(nanoseconds(GetNowNanos() - start_ns)).count();
+ }
+
+ static std::string ElapsedMillisStr(const int64_t& start_ns) {
+ return std::to_string(
+ std::chrono::duration_cast<milliseconds>(nanoseconds(GetNowNanos() - start_ns)).count());
+ }
+};
+} /* namespace hbase */