You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/04/28 01:53:02 UTC
hbase git commit: HBASE-17800 [C++] handle exceptions in client RPC
(Xiaobing Zhou and Enis Soztutar)
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 8aa8a9251 -> 018f1eab2
HBASE-17800 [C++] handle exceptions in client RPC (Xiaobing Zhou and Enis Soztutar)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/018f1eab
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/018f1eab
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/018f1eab
Branch: refs/heads/HBASE-14850
Commit: 018f1eab2365f00e69fffbb4fe526645022e3020
Parents: 8aa8a92
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Apr 27 18:52:53 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Apr 27 18:52:53 2017 -0700
----------------------------------------------------------------------
hbase-native-client/connection/BUCK | 1 +
.../connection/client-dispatcher.cc | 10 +-
.../connection/client-handler.cc | 39 ++-
hbase-native-client/connection/client-handler.h | 4 +-
.../connection/connection-factory.cc | 3 +-
.../connection/connection-factory.h | 2 +-
.../connection/connection-pool-test.cc | 2 +-
.../connection/connection-pool.cc | 7 +-
hbase-native-client/connection/pipeline.cc | 5 +-
hbase-native-client/connection/response.h | 9 +-
hbase-native-client/core/async-connection.cc | 18 +-
hbase-native-client/core/async-connection.h | 10 +
hbase-native-client/core/async-region-locator.h | 4 +-
.../core/async-rpc-retrying-caller-factory.h | 14 +-
.../core/async-rpc-retrying-caller.cc | 84 ++++---
.../core/async-rpc-retrying-caller.h | 11 +-
.../core/async-rpc-retrying-test.cc | 240 ++++++++++++++++---
hbase-native-client/core/client-test.cc | 1 +
hbase-native-client/core/client.h | 5 +
hbase-native-client/core/location-cache.cc | 3 +-
hbase-native-client/core/location-cache.h | 3 +-
hbase-native-client/exceptions/BUCK | 2 +-
hbase-native-client/exceptions/exception.h | 92 ++++++-
23 files changed, 450 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index 19536d5..36111f8 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -48,6 +48,7 @@ cxx_library(
"//security:security",
"//third-party:folly",
"//third-party:wangle",
+ "//exceptions:exceptions",
],
compiler_flags=['-Weffc++'],
visibility=['//core/...',],)
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/client-dispatcher.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 626fc76..27201d2 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -17,12 +17,14 @@
*
*/
#include "connection/client-dispatcher.h"
+#include <folly/ExceptionWrapper.h>
#include <utility>
using namespace folly;
using namespace hbase;
using namespace wangle;
+using folly::exception_wrapper;
ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {}
@@ -35,9 +37,11 @@ void ClientDispatcher::read(Context *ctx, std::unique_ptr<Response> in) {
requests_.erase(call_id);
- // TODO(eclark): check if the response
- // is an exception. If it is then set that.
- p.setValue(std::move(in));
+ if (in->exception()) {
+ p.setException(in->exception());
+ } else {
+ p.setValue(std::move(in));
+ }
}
Future<std::unique_ptr<Response>> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index af84572..113ebd0 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -19,6 +19,7 @@
#include "connection/client-handler.h"
+#include <folly/ExceptionWrapper.h>
#include <folly/Likely.h>
#include <glog/logging.h>
@@ -36,9 +37,11 @@ using hbase::pb::ResponseHeader;
using hbase::pb::GetResponse;
using google::protobuf::Message;
-ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec)
+ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
+ const std::string &server)
: user_name_(user_name),
serde_(codec),
+ server_(server),
once_flag_(std::make_unique<std::once_flag>()),
resp_msgs_(
make_unique<folly::AtomicHashMap<uint32_t, std::shared_ptr<google::protobuf::Message>>>(
@@ -51,7 +54,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
ResponseHeader header;
int used_bytes = serde_.ParseDelimited(buf.get(), &header);
- VLOG(1) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
+ VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
<< " has_exception=" << header.has_exception();
// Get the response protobuf from the map
@@ -92,9 +95,31 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
}
received->set_resp_msg(resp_msg);
- }
- // TODO: set exception in Response here
+ } else {
+ hbase::pb::ExceptionResponse exceptionResponse = header.exception();
+
+ std::string what;
+ std::string exception_class_name = exceptionResponse.has_exception_class_name()
+ ? exceptionResponse.exception_class_name()
+ : "";
+ std::string stack_trace =
+ exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "";
+ what.append(exception_class_name).append(stack_trace);
+
+ auto remote_exception = std::make_unique<RemoteException>(what);
+ remote_exception->set_exception_class_name(exception_class_name)
+ ->set_stack_trace(stack_trace)
+ ->set_hostname(exceptionResponse.has_hostname() ? exceptionResponse.hostname() : "")
+ ->set_port(exceptionResponse.has_port() ? exceptionResponse.port() : 0);
+ if (exceptionResponse.has_do_not_retry()) {
+ remote_exception->set_do_not_retry(exceptionResponse.do_not_retry());
+ }
+ VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id()
+ << " exception.what=" << remote_exception->what()
+ << ", do_not_retry=" << remote_exception->do_not_retry();
+ received->set_exception(::folly::exception_wrapper{*remote_exception});
+ }
ctx->fireRead(std::move(received));
}
}
@@ -103,17 +128,19 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
// We need to send the header once.
// So use call_once to make sure that only one thread wins this.
std::call_once((*once_flag_), [ctx, this]() {
+ VLOG(3) << "Writing RPC connection Preamble and Header to server: " << server_;
auto pre = serde_.Preamble();
auto header = serde_.Header(user_name_);
pre->appendChain(std::move(header));
ctx->fireWrite(std::move(pre));
});
+ VLOG(3) << "Writing RPC Request with call_id:"
+ << r->call_id(); // TODO: more logging for RPC Header
+
// Now store the call id to response.
resp_msgs_->insert(r->call_id(), r->resp_msg());
- VLOG(1) << "Writing RPC Request with call_id:" << r->call_id();
-
// Send the data down the pipeline.
return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/client-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index afb8e62..4c106e0 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -27,6 +27,7 @@
#include <string>
#include <utility>
+#include "exceptions/exception.h"
#include "serde/codec.h"
#include "serde/rpc.h"
@@ -59,7 +60,7 @@ class ClientHandler
* Create the handler
* @param user_name the user name of the user running this process.
*/
- explicit ClientHandler(std::string user_name, std::shared_ptr<Codec> codec);
+ ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, const std::string &server);
/**
* Get bytes from the wire.
@@ -77,6 +78,7 @@ class ClientHandler
std::unique_ptr<std::once_flag> once_flag_;
std::string user_name_;
RpcSerde serde_;
+ std::string server_; // for logging
// in flight requests
std::unique_ptr<folly::AtomicHashMap<uint32_t, std::shared_ptr<google::protobuf::Message>>>
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 832b00f..afa227d 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -46,9 +46,10 @@ std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::M
return client;
}
+
std::shared_ptr<HBaseService> ConnectionFactory::Connect(
std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client, const std::string &hostname,
- int port) {
+ uint16_t port) {
// Yes this will block however it makes dealing with connection pool soooooo
// much nicer.
// TODO see about using shared promise for this.
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/connection-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index 32d0bf7..1e75571 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -61,7 +61,7 @@ class ConnectionFactory {
*/
virtual std::shared_ptr<HBaseService> Connect(
std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client,
- const std::string &hostname, int port);
+ const std::string &hostname, uint16_t port);
private:
nanoseconds connect_timeout_;
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
index 623ce3c..8ecdf29 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -40,7 +40,7 @@ class MockConnectionFactory : public ConnectionFactory {
MOCK_METHOD0(MakeBootstrap, std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>());
MOCK_METHOD3(Connect, std::shared_ptr<HBaseService>(
std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
- const std::string &hostname, int port));
+ const std::string &hostname, uint16_t port));
};
class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {};
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 4fe4610..3121294 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -19,10 +19,11 @@
#include "connection/connection-pool.h"
+#include <folly/Conv.h>
+#include <folly/Logging.h>
#include <folly/SocketAddress.h>
#include <wangle/service/Service.h>
-#include <folly/Logging.h>
#include <memory>
#include <utility>
@@ -89,7 +90,6 @@ std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection(
/* create new connection */
auto clientBootstrap = cf_->MakeBootstrap();
auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port());
-
auto connection = std::make_shared<RpcConnection>(remote_id, dispatcher);
connections_.insert(std::make_pair(remote_id, connection));
@@ -101,6 +101,8 @@ std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection(
void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
SharedMutexWritePriority::WriteHolder holder{map_mutex_};
+ DLOG(INFO) << "Closing RPC Connection to host:" << remote_id->host()
+ << ", port:" << folly::to<std::string>(remote_id->port());
auto found = connections_.find(remote_id);
if (found == connections_.end() || found->second == nullptr) {
@@ -108,6 +110,7 @@ void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
}
found->second->Close();
connections_.erase(found);
+ // TODO: erase the client as well?
}
void ConnectionPool::Close() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/pipeline.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc
index 00dc05c..edada52 100644
--- a/hbase-native-client/connection/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -35,11 +35,14 @@ RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec)
SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
std::shared_ptr<AsyncTransportWrapper> sock) {
+ SocketAddress addr; // for logging
+ sock->getPeerAddress(&addr);
+
auto pipeline = SerializePipeline::create();
pipeline->addBack(AsyncSocketHandler{sock});
pipeline->addBack(EventBaseHandler{});
pipeline->addBack(LengthFieldBasedFrameDecoder{});
- pipeline->addBack(ClientHandler{user_util_.user_name(), codec_});
+ pipeline->addBack(ClientHandler{user_util_.user_name(), codec_, addr.describe()});
pipeline->finalize();
return pipeline;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/response.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h
index 1d60fed..c5472b0 100644
--- a/hbase-native-client/connection/response.h
+++ b/hbase-native-client/connection/response.h
@@ -22,6 +22,8 @@
#include <memory>
#include <utility>
+#include <folly/ExceptionWrapper.h>
+
#include "serde/cell-scanner.h"
// Forward
@@ -44,7 +46,7 @@ class Response {
* Constructor.
* Initinalizes the call id to 0. 0 should never be a valid call id.
*/
- Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {}
+ Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr), exception_(nullptr) {}
/** Get the call_id */
uint32_t call_id() { return call_id_; }
@@ -70,9 +72,14 @@ class Response {
const std::unique_ptr<CellScanner>& cell_scanner() const { return cell_scanner_; }
+ folly::exception_wrapper exception() { return exception_; }
+
+ void set_exception(folly::exception_wrapper value) { exception_ = value; }
+
private:
uint32_t call_id_;
std::shared_ptr<google::protobuf::Message> resp_msg_;
std::unique_ptr<CellScanner> cell_scanner_;
+ folly::exception_wrapper exception_;
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-connection.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc
index b945e38..4642c61 100644
--- a/hbase-native-client/core/async-connection.cc
+++ b/hbase-native-client/core/async-connection.cc
@@ -29,6 +29,13 @@ void AsyncConnectionImpl::Init() {
auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
+ /*
+ * We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly.
+ * Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments
+ * in async-rpc-retrying-caller.cc.
+ */
+ retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
std::shared_ptr<Codec> codec = nullptr;
if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
@@ -41,22 +48,21 @@ void AsyncConnectionImpl::Init() {
std::make_shared<hbase::RpcClient>(io_executor_, codec, connection_conf_->connect_timeout());
location_cache_ =
std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
- caller_factory_ = std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this());
+ caller_factory_ =
+ std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
}
// We can't have the threads continue running after everything is done
// that leads to an error.
-AsyncConnectionImpl::~AsyncConnectionImpl() {
- cpu_executor_->stop();
- io_executor_->stop();
- if (rpc_client_.get()) rpc_client_->Close();
-}
+AsyncConnectionImpl::~AsyncConnectionImpl() { Close(); }
void AsyncConnectionImpl::Close() {
if (is_closed_) return;
cpu_executor_->stop();
io_executor_->stop();
+ retry_executor_->stop();
+ retry_timer_->destroy();
if (rpc_client_.get()) rpc_client_->Close();
is_closed_ = true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-connection.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h
index ff11577..7b260a5 100644
--- a/hbase-native-client/core/async-connection.h
+++ b/hbase-native-client/core/async-connection.h
@@ -53,6 +53,9 @@ class AsyncConnection {
virtual std::shared_ptr<RpcClient> rpc_client() = 0;
virtual std::shared_ptr<AsyncRegionLocator> region_locator() = 0;
virtual std::shared_ptr<HBaseRpcController> CreateRpcController() = 0;
+ virtual std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() = 0;
+ virtual std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() = 0;
+ virtual std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() = 0;
virtual void Close() = 0;
};
@@ -81,6 +84,11 @@ class AsyncConnectionImpl : public AsyncConnection,
std::shared_ptr<HBaseRpcController> CreateRpcController() override {
return std::make_shared<HBaseRpcController>();
}
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+ return retry_executor_;
+ }
void Close() override;
@@ -98,8 +106,10 @@ class AsyncConnectionImpl : public AsyncConnection,
std::shared_ptr<Configuration> conf_;
std::shared_ptr<ConnectionConfiguration> connection_conf_;
std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
std::shared_ptr<LocationCache> location_cache_;
std::shared_ptr<RpcClient> rpc_client_;
bool is_closed_ = false;
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-region-locator.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-region-locator.h b/hbase-native-client/core/async-region-locator.h
index c606dcb..f75cb7e 100644
--- a/hbase-native-client/core/async-region-locator.h
+++ b/hbase-native-client/core/async-region-locator.h
@@ -19,6 +19,7 @@
#pragma once
+#include <folly/ExceptionWrapper.h>
#include <folly/futures/Future.h>
#include <memory>
#include <string>
@@ -57,7 +58,8 @@ class AsyncRegionLocator {
/**
* Update cached region location, possibly using the information from exception.
*/
- virtual void UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) = 0;
+ virtual void UpdateCachedLocation(const RegionLocation &loc,
+ const folly::exception_wrapper &error) = 0;
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-rpc-retrying-caller-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 5bcad6c..5a80a06 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -41,8 +41,10 @@ template <typename RESP>
class SingleRequestCallerBuilder
: public std::enable_shared_from_this<SingleRequestCallerBuilder<RESP>> {
public:
- explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn)
+ explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer)
: conn_(conn),
+ retry_timer_(retry_timer),
table_name_(nullptr),
rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
pause_(conn->connection_conf()->pause()),
@@ -105,7 +107,7 @@ class SingleRequestCallerBuilder
std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<RESP>> Build() {
return std::make_shared<AsyncSingleRequestRpcRetryingCaller<RESP>>(
- conn_, table_name_, row_, locate_type_, callable_, pause_, max_retries_,
+ conn_, retry_timer_, table_name_, row_, locate_type_, callable_, pause_, max_retries_,
operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
}
@@ -116,6 +118,7 @@ class SingleRequestCallerBuilder
private:
std::shared_ptr<AsyncConnection> conn_;
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
std::shared_ptr<TableName> table_name_;
nanoseconds rpc_timeout_nanos_;
nanoseconds operation_timeout_nanos_;
@@ -130,15 +133,18 @@ class SingleRequestCallerBuilder
class AsyncRpcRetryingCallerFactory {
private:
std::shared_ptr<AsyncConnection> conn_;
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
public:
- explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn) : conn_(conn) {}
+ explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer)
+ : conn_(conn), retry_timer_(retry_timer) {}
virtual ~AsyncRpcRetryingCallerFactory() = default;
template <typename RESP>
std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() {
- return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_);
+ return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_);
}
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index 965a44b..7e211f7 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -19,6 +19,8 @@
#include "core/async-rpc-retrying-caller.h"
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
#include <folly/Format.h>
#include <folly/Logging.h>
#include <folly/futures/Unit.h>
@@ -34,15 +36,19 @@
#include "utils/sys-util.h"
#include "utils/time-util.h"
+using folly::exception_wrapper;
+
namespace hbase {
template <typename RESP>
AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
- std::shared_ptr<AsyncConnection> conn, std::shared_ptr<hbase::pb::TableName> table_name,
- const std::string& row, RegionLocateType locate_type, Callable<RESP> callable,
- nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos,
- nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row,
+ RegionLocateType locate_type, Callable<RESP> callable, nanoseconds pause, uint32_t max_retries,
+ nanoseconds operation_timeout_nanos, nanoseconds rpc_timeout_nanos,
+ uint32_t start_log_errors_count)
: conn_(conn),
+ retry_timer_(retry_timer),
table_name_(table_name),
row_(row),
locate_type_(locate_type),
@@ -58,7 +64,6 @@ AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
start_ns_ = TimeUtil::GetNowNanos();
max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
- retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_);
}
template <typename RESP>
@@ -87,7 +92,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
conn_->region_locator()
->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
.then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
- .onError([this](const std::exception& e) {
+ .onError([this](const exception_wrapper& e) {
OnError(e,
[this]() -> std::string {
return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
@@ -96,17 +101,17 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " +
TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
},
- [](const std::exception& error) {});
+ [](const exception_wrapper& error) {});
});
}
template <typename RESP>
void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
- const std::exception& error, Supplier<std::string> err_msg,
- Consumer<std::exception> update_cached_location) {
- ThrowableWithExtraContext twec(std::make_shared<std::exception>(error), TimeUtil::GetNowNanos());
+ const exception_wrapper& error, Supplier<std::string> err_msg,
+ Consumer<exception_wrapper> update_cached_location) {
+ ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
exceptions_->push_back(twec);
- if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) || tries_ >= max_retries_) {
+ if (!ShouldRetry(error) || tries_ >= max_retries_) {
CompleteExceptionally();
return;
}
@@ -124,8 +129,33 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
}
update_cached_location(error);
tries_++;
- retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); },
- milliseconds(TimeUtil::ToMillis(delay_ns)));
+
+ /*
+ * The HHWheelTimer::scheduleTimeout() fails with an assertion from
+ * EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of
+ * the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry
+ * timer from IOThreadPool threads. It only works when executed from a single-thread pool
+ * (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should
+ * still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool.
+ * IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection
+ * establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread
+ * just hangs because it deadlocks itself.
+ */
+ conn_->retry_executor()->add([&]() {
+ retry_timer_->scheduleTimeoutFn(
+ [this]() {
+ conn_->cpu_executor()->add([&]() { LocateThenCall(); });
+ },
+ milliseconds(TimeUtil::ToMillis(delay_ns)));
+ });
+}
+
+template <typename RESP>
+bool AsyncSingleRequestRpcRetryingCaller<RESP>::ShouldRetry(const exception_wrapper& error) {
+ bool do_not_retry = false;
+ error.with_exception(
+ [&](const RemoteException& remote_ex) { do_not_retry &= remote_ex.do_not_retry(); });
+ return !do_not_retry;
}
template <typename RESP>
@@ -143,33 +173,14 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc)
}
std::shared_ptr<RpcClient> rpc_client;
- try {
- // TODO: There is no connection attempt happening here, no need to try-catch.
- rpc_client = conn_->rpc_client();
- } catch (const IOException& e) {
- OnError(e,
- [&, this]() -> std::string {
- return "Get async rpc_client to " +
- folly::sformat("{0}:{1}", loc.server_name().host_name(),
- loc.server_name().port()) +
- " for '" + row_ + "' in " + loc.DebugString() + " of " +
- table_name_->namespace_() + "::" + table_name_->qualifier() +
- " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
- std::to_string(max_attempts_) + ", timeout = " +
- TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
- " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
- },
- [&, this](const std::exception& error) {
- conn_->region_locator()->UpdateCachedLocation(loc, error);
- });
- return;
- }
+
+ rpc_client = conn_->rpc_client();
ResetController(controller_, call_timeout_ns);
callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client)
.then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
- .onError([&, this](const std::exception& e) {
+ .onError([&, this](const exception_wrapper& e) {
OnError(e,
[&, this]() -> std::string {
return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(),
@@ -182,10 +193,9 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc)
" ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
" ms";
},
- [&, this](const std::exception& error) {
+ [&, this](const exception_wrapper& error) {
conn_->region_locator()->UpdateCachedLocation(loc, error);
});
- return;
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
index 6006388..c86ad0b5 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller.h
@@ -18,6 +18,7 @@
*/
#pragma once
+#include <folly/ExceptionWrapper.h>
#include <folly/futures/Future.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/HHWheelTimer.h>
@@ -70,6 +71,7 @@ template <typename RESP>
class AsyncSingleRequestRpcRetryingCaller {
public:
AsyncSingleRequestRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer,
std::shared_ptr<hbase::pb::TableName> table_name,
const std::string& row, RegionLocateType locate_type,
Callable<RESP> callable, nanoseconds pause,
@@ -84,8 +86,10 @@ class AsyncSingleRequestRpcRetryingCaller {
private:
void LocateThenCall();
- void OnError(const std::exception& error, Supplier<std::string> err_msg,
- Consumer<std::exception> update_cached_location);
+ void OnError(const folly::exception_wrapper& error, Supplier<std::string> err_msg,
+ Consumer<folly::exception_wrapper> update_cached_location);
+
+ bool ShouldRetry(const folly::exception_wrapper& error);
void Call(const RegionLocation& loc);
@@ -97,8 +101,8 @@ class AsyncSingleRequestRpcRetryingCaller {
const int64_t& timeout_ns);
private:
- folly::HHWheelTimer::UniquePtr retry_timer_;
std::shared_ptr<AsyncConnection> conn_;
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
std::shared_ptr<hbase::pb::TableName> table_name_;
std::string row_;
RegionLocateType locate_type_;
@@ -114,6 +118,5 @@ class AsyncSingleRequestRpcRetryingCaller {
uint32_t tries_;
std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
uint32_t max_attempts_;
- folly::EventBase event_base_;
};
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index 4956972..ff28e79 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -20,10 +20,13 @@
#include <folly/Logging.h>
#include <folly/Memory.h>
#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
#include <gmock/gmock.h>
#include <google/protobuf/stubs/callback.h>
#include <wangle/concurrent/IOThreadPoolExecutor.h>
+#include <chrono>
#include <functional>
#include <string>
@@ -67,15 +70,33 @@ using hbase::Client;
using ::testing::Return;
using ::testing::_;
using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
-class MockAsyncRegionLocator : public AsyncRegionLocator {
+using namespace hbase;
+
+using folly::exception_wrapper;
+
+class AsyncRpcRetryTest : public ::testing::Test {
public:
- explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ static std::unique_ptr<hbase::TestUtil> test_util;
+
+ static void SetUpTestCase() {
+ google::InstallFailureSignalHandler();
+ test_util = std::make_unique<hbase::TestUtil>();
+ test_util->StartMiniCluster(2);
+ }
+};
+std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr;
+
+class AsyncRegionLocatorBase : public AsyncRegionLocator {
+ public:
+ AsyncRegionLocatorBase() {}
+ explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
: region_location_(region_location) {}
- ~MockAsyncRegionLocator() = default;
+ virtual ~AsyncRegionLocatorBase() = default;
- folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName&,
- const std::string&,
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
+ const std::string &,
const RegionLocateType,
const int64_t) override {
folly::Promise<std::shared_ptr<RegionLocation>> promise;
@@ -83,22 +104,102 @@ class MockAsyncRegionLocator : public AsyncRegionLocator {
return promise.getFuture();
}
- void UpdateCachedLocation(const RegionLocation&, const std::exception&) override {}
+ virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
+ region_location_ = region_location;
+ }
+
+ void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {}
- private:
+ protected:
std::shared_ptr<RegionLocation> region_location_;
};
+class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
+ public:
+ MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
+ explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockAsyncRegionLocator() {}
+};
+
+class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+ uint32_t tries_ = 0;
+ uint32_t num_fails_ = 0;
+
+ public:
+ explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
+ : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+ explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockWrongRegionAsyncRegionLocator() {}
+
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+ const hbase::pb::TableName &tn, const std::string &row,
+ const RegionLocateType locate_type = RegionLocateType::kCurrent,
+ const int64_t locate_ns = 0) override {
+ // Fail for num_fails_ times, then delegate to the super class which will give the correct
+ // region location.
+ if (tries_++ > num_fails_) {
+ return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+ }
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ /* set random region name, simulating invalid region */
+ auto result = std::make_shared<RegionLocation>(
+ "whatever-region-name", region_location_->region_info(), region_location_->server_name(),
+ region_location_->service());
+ promise.setValue(result);
+ return promise.getFuture();
+ }
+};
+
+class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+ uint32_t tries_ = 0;
+ uint32_t num_fails_ = 0;
+
+ public:
+ explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
+ : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+ explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockFailingAsyncRegionLocator() {}
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+ const hbase::pb::TableName &tn, const std::string &row,
+ const RegionLocateType locate_type = RegionLocateType::kCurrent,
+ const int64_t locate_ns = 0) override {
+ // Fail for num_fails_ times, then delegate to the super class which will give the correct
+ // region location.
+ if (tries_++ > num_fails_) {
+ return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+ }
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ promise.setException(std::runtime_error{"Failed to look up region location"});
+ return promise.getFuture();
+ }
+};
+
class MockAsyncConnection : public AsyncConnection,
public std::enable_shared_from_this<MockAsyncConnection> {
public:
MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
std::shared_ptr<RpcClient> rpc_client,
std::shared_ptr<AsyncRegionLocator> region_locator)
- : conn_conf_(conn_conf), rpc_client_(rpc_client), region_locator_(region_locator) {}
+ : conn_conf_(conn_conf),
+ retry_timer_(retry_timer),
+ cpu_executor_(cpu_executor),
+ io_executor_(io_executor),
+ retry_executor_(retry_executor),
+ rpc_client_(rpc_client),
+ region_locator_(region_locator) {}
~MockAsyncConnection() {}
void Init() {
- caller_factory_ = std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this());
+ caller_factory_ =
+ std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
}
std::shared_ptr<Configuration> conf() override { return nullptr; }
@@ -108,6 +209,11 @@ class MockAsyncConnection : public AsyncConnection,
}
std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+ return retry_executor_;
+ }
void Close() override {}
std::shared_ptr<HBaseRpcController> CreateRpcController() override {
@@ -115,17 +221,20 @@ class MockAsyncConnection : public AsyncConnection,
}
private:
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
std::shared_ptr<ConnectionConfiguration> conn_conf_;
std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
std::shared_ptr<RpcClient> rpc_client_;
std::shared_ptr<AsyncRegionLocator> region_locator_;
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
};
template <typename CONN>
class MockRawAsyncTableImpl {
public:
- explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn)
- : conn_(conn), promise_(std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>()) {}
+ explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {}
virtual ~MockRawAsyncTableImpl() = default;
/* implement this in real RawAsyncTableImpl. */
@@ -133,11 +242,13 @@ class MockRawAsyncTableImpl {
/* in real RawAsyncTableImpl, this should be private. */
folly::Future<std::shared_ptr<hbase::Result>> GetCall(
std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<RegionLocation> loc, const hbase::Get& get) {
+ std::shared_ptr<RegionLocation> loc, const hbase::Get &get) {
hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = [](
std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
std::shared_ptr<HBaseRpcController> controller,
std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
+ VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:"
+ << loc->DebugString();
return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
std::move(preq), User::defaultUser(), "ClientService");
};
@@ -151,17 +262,25 @@ class MockRawAsyncTableImpl {
template <typename REQ, typename PREQ, typename PRESP, typename RESP>
folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client,
std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<RegionLocation> loc, const REQ& req,
+ std::shared_ptr<RegionLocation> loc, const REQ &req,
ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
- const hbase::RpcCall<PREQ, PRESP>& rpc_call,
+ hbase::RpcCall<PREQ, PRESP> rpc_call,
RespConverter<RESP, PRESP> resp_converter) {
+ promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
+ auto f = promise_->getFuture();
+ VLOG(1) << "calling rpc_call";
rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
.then([&, this, resp_converter](std::unique_ptr<PRESP> presp) {
+ VLOG(1) << "MockRawAsyncTableImpl#call succeded: ";
RESP result = resp_converter(*presp);
promise_->setValue(result);
})
- .onError([this](const std::exception& e) { promise_->setException(e); });
- return promise_->getFuture();
+ .onError([this](const exception_wrapper &e) {
+ VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what();
+ VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name();
+ promise_->setException(e);
+ });
+ return f;
}
private:
@@ -169,22 +288,19 @@ class MockRawAsyncTableImpl {
std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_;
};
-TEST(AsyncRpcRetryTest, TestGetBasic) {
- // Using TestUtil to populate test data
- auto test_util = std::make_unique<hbase::TestUtil>();
- test_util->StartMiniCluster(2);
-
- test_util->CreateTable("t", "d");
+void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName,
+ uint32_t operation_timeout_millis = 1200000) {
+ AsyncRpcRetryTest::test_util->CreateTable(tableName, "d");
// Create TableName and Row to be fetched from HBase
- auto tn = folly::to<hbase::pb::TableName>("t");
+ auto tn = folly::to<hbase::pb::TableName>(tableName);
auto row = "test2";
// Get to be performed on above HBase Table
hbase::Get get(row);
// Create a client
- Client client(*(test_util->conf()));
+ Client client(*(AsyncRpcRetryTest::test_util->conf()));
// Get connection to HBase Table
auto table = client.Table(tn);
@@ -196,24 +312,32 @@ TEST(AsyncRpcRetryTest, TestGetBasic) {
/* init region location and rpc channel */
auto region_location = table->GetRegionLocation(row);
- auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ // auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4);
+ auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+ auto io_executor_ = client.async_connection()->io_executor();
+ auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
auto codec = std::make_shared<hbase::KeyValueCodec>();
auto rpc_client = std::make_shared<RpcClient>(io_executor_, codec);
+ // auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true);
+ std::shared_ptr<folly::HHWheelTimer> retry_timer =
+ folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
/* init connection configuration */
auto connection_conf = std::make_shared<ConnectionConfiguration>(
- TimeUtil::SecondsToNanos(20), // connect_timeout
- TimeUtil::SecondsToNanos(1200), // operation_timeout
- TimeUtil::SecondsToNanos(60), // rpc_timeout
- TimeUtil::MillisToNanos(100), // pause
- 31, // max retries
- 9); // start log errors count
+ TimeUtil::SecondsToNanos(20), // connect_timeout
+ TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout
+ TimeUtil::SecondsToNanos(60), // rpc_timeout
+ TimeUtil::MillisToNanos(100), // pause
+ 5, // max retries
+ 9); // start log errors count
- /* init region locator */
- auto region_locator = std::make_shared<MockAsyncRegionLocator>(region_location);
+ /* set region locator */
+ region_locator->set_region_location(region_location);
/* init hbase client connection */
- auto conn = std::make_shared<MockAsyncConnection>(connection_conf, rpc_client, region_locator);
+ auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+ io_executor_, retry_executor_, rpc_client,
+ region_locator);
conn->Init();
/* init retry caller factory */
@@ -237,7 +361,9 @@ TEST(AsyncRpcRetryTest, TestGetBasic) {
})
->Build();
- auto result = async_caller->Call().get();
+ auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
+
+ auto result = async_caller->Call().get(milliseconds(500000));
// Test the values, should be same as in put executed on hbase shell
ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
@@ -245,6 +371,50 @@ TEST(AsyncRpcRetryTest, TestGetBasic) {
EXPECT_EQ("value2", *(result->Value("d", "2")));
EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
+ retry_timer->destroy();
table->Close();
client.Close();
+ retry_executor_->stop();
+}
+
+// Test successful case
+TEST_F(AsyncRpcRetryTest, TestGetBasic) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runTest(region_locator, "table1");
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncRpcRetryTest, TestHandleException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runTest(region_locator, "table2");
+}
+
+// Tests the RPC failing 5 times, throwing an exception
+TEST_F(AsyncRpcRetryTest, TestFailWithException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(5));
+ EXPECT_ANY_THROW(runTest(region_locator, "table3"));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runTest(region_locator, "table4");
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(5));
+ EXPECT_ANY_THROW(runTest(region_locator, "table5"));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ EXPECT_ANY_THROW(runTest(region_locator, "table6", 200));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index ff4879a..274168f 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -156,6 +156,7 @@ TEST_F(ClientTest, GetForNonExistentTable) {
// Get to be performed on above HBase Table
hbase::Get get(row);
+ ClientTest::test_util->conf()->SetInt("hbase.client.retries.number", 5);
// Create a client
hbase::Client client(*ClientTest::test_util->conf());
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 0e11278..2719470 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -61,6 +61,11 @@ class Client {
*/
void Close();
+ /**
+ * @brief Internal. DO NOT USE.
+ */
+ std::shared_ptr<AsyncConnectionImpl> async_connection() { return async_connection_; }
+
private:
/** Data */
std::shared_ptr<AsyncConnectionImpl> async_connection_;
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 07c3d61..e0afcfb 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -284,7 +284,8 @@ void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const st
table_locs->erase(row);
}
-void LocationCache::UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) {
+void LocationCache::UpdateCachedLocation(const RegionLocation &loc,
+ const folly::exception_wrapper &error) {
// TODO: just clears the location for now. We can inspect RegionMovedExceptions, etc later.
ClearCachedLocation(loc.region_info().table_name(), loc.region_info().start_key());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/location-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index 5e79213..a3c15cb 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -18,6 +18,7 @@
*/
#pragma once
+#include <folly/ExceptionWrapper.h>
#include <folly/Executor.h>
#include <folly/SharedMutex.h>
#include <folly/futures/Future.h>
@@ -180,7 +181,7 @@ class LocationCache : public AsyncRegionLocator {
* Update cached region location, possibly using the information from exception.
*/
virtual void UpdateCachedLocation(const RegionLocation &loc,
- const std::exception &error) override;
+ const folly::exception_wrapper &error) override;
const std::string &zk_quorum() { return zk_quorum_; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/exceptions/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK
index a23654c..eef4437 100644
--- a/hbase-native-client/exceptions/BUCK
+++ b/hbase-native-client/exceptions/BUCK
@@ -21,4 +21,4 @@ cxx_library(
srcs=[],
deps=["//third-party:folly",],
compiler_flags=['-Weffc++'],
- visibility=['//core/...'],)
\ No newline at end of file
+ visibility=['//core/...','//connection//...'],)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/exceptions/exception.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h
index c0c4142..2943d57 100644
--- a/hbase-native-client/exceptions/exception.h
+++ b/hbase-native-client/exceptions/exception.h
@@ -22,52 +22,55 @@
#include <string>
#include <vector>
#include <folly/io/IOBuf.h>
+#include <folly/ExceptionWrapper.h>
namespace hbase {
class ThrowableWithExtraContext {
public:
- ThrowableWithExtraContext(std::shared_ptr<std::exception> cause,
+ ThrowableWithExtraContext(folly::exception_wrapper cause,
const long& when) :
cause_(cause), when_(when), extras_("") {
}
- ThrowableWithExtraContext(std::shared_ptr<std::exception> cause,
+ ThrowableWithExtraContext(folly::exception_wrapper cause,
const long& when, const std::string& extras) :
cause_(cause), when_(when), extras_(extras) {
}
- std::string ToString() {
+ virtual std::string ToString() {
// TODO:
// return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
- return extras_ + ", " + cause_->what();
+ return extras_ + ", " + cause_.what().toStdString();
}
- std::shared_ptr<std::exception> cause() {
+ virtual folly::exception_wrapper cause() {
return cause_;
}
private:
- std::shared_ptr<std::exception> cause_;
+ folly::exception_wrapper cause_;
long when_;
std::string extras_;
};
class IOException: public std::logic_error {
public:
+ IOException() : logic_error("") {}
+
IOException(
const std::string& what) :
- logic_error(what), cause_(nullptr) {}
+ logic_error(what) {}
IOException(
const std::string& what,
- std::shared_ptr<std::exception> cause) :
+ folly::exception_wrapper cause) :
logic_error(what), cause_(cause) {}
virtual ~IOException() = default;
- std::shared_ptr<std::exception> cause() {
+ virtual folly::exception_wrapper cause() {
return cause_;
}
private:
- const std::shared_ptr<std::exception> cause_;
+ folly::exception_wrapper cause_;
};
class RetriesExhaustedException: public IOException {
@@ -77,7 +80,7 @@ public:
std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) :
IOException(
GetMessage(num_retries, exceptions),
- exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){
+ exceptions->empty() ? folly::exception_wrapper{} : (*exceptions)[exceptions->size() - 1].cause()){
}
virtual ~RetriesExhaustedException() = default;
@@ -99,6 +102,71 @@ private:
class HBaseIOException : public IOException {
};
-class DoNotRetryIOException : public HBaseIOException {
+class RemoteException : public IOException {
+public:
+
+ RemoteException() : port_(0), do_not_retry_(false) {}
+
+ RemoteException(const std::string& what) :
+ IOException(what), port_(0), do_not_retry_(false) {}
+
+ RemoteException(
+ const std::string& what,
+ folly::exception_wrapper cause) :
+ IOException(what, cause), port_(0), do_not_retry_(false) {}
+
+ virtual ~RemoteException() = default;
+
+ std::string exception_class_name() const {
+ return exception_class_name_;
+ }
+
+ RemoteException* set_exception_class_name(const std::string& value) {
+ exception_class_name_ = value;
+ return this;
+ }
+
+ std::string stack_trace() const {
+ return stack_trace_;
+ }
+
+ RemoteException* set_stack_trace(const std::string& value) {
+ stack_trace_ = value;
+ return this;
+ }
+
+ std::string hostname() const {
+ return hostname_;
+ }
+
+ RemoteException* set_hostname(const std::string& value) {
+ hostname_ = value;
+ return this;
+ }
+
+ int port() const {
+ return port_;
+ }
+
+ RemoteException* set_port(int value) {
+ port_ = value;
+ return this;
+ }
+
+ bool do_not_retry() const {
+ return do_not_retry_;
+ }
+
+ RemoteException* set_do_not_retry(bool value) {
+ do_not_retry_ = value;
+ return this;
+ }
+
+private:
+ std::string exception_class_name_;
+ std::string stack_trace_;
+ std::string hostname_;
+ int port_;
+ bool do_not_retry_;
};
} // namespace hbase