You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/04/28 01:53:02 UTC

hbase git commit: HBASE-17800 [C++] handle exceptions in client RPC (Xiaobing Zhou and Enis Soztutar)

Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 8aa8a9251 -> 018f1eab2


HBASE-17800 [C++] handle exceptions in client RPC (Xiaobing Zhou and Enis Soztutar)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/018f1eab
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/018f1eab
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/018f1eab

Branch: refs/heads/HBASE-14850
Commit: 018f1eab2365f00e69fffbb4fe526645022e3020
Parents: 8aa8a92
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Apr 27 18:52:53 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Apr 27 18:52:53 2017 -0700

----------------------------------------------------------------------
 hbase-native-client/connection/BUCK             |   1 +
 .../connection/client-dispatcher.cc             |  10 +-
 .../connection/client-handler.cc                |  39 ++-
 hbase-native-client/connection/client-handler.h |   4 +-
 .../connection/connection-factory.cc            |   3 +-
 .../connection/connection-factory.h             |   2 +-
 .../connection/connection-pool-test.cc          |   2 +-
 .../connection/connection-pool.cc               |   7 +-
 hbase-native-client/connection/pipeline.cc      |   5 +-
 hbase-native-client/connection/response.h       |   9 +-
 hbase-native-client/core/async-connection.cc    |  18 +-
 hbase-native-client/core/async-connection.h     |  10 +
 hbase-native-client/core/async-region-locator.h |   4 +-
 .../core/async-rpc-retrying-caller-factory.h    |  14 +-
 .../core/async-rpc-retrying-caller.cc           |  84 ++++---
 .../core/async-rpc-retrying-caller.h            |  11 +-
 .../core/async-rpc-retrying-test.cc             | 240 ++++++++++++++++---
 hbase-native-client/core/client-test.cc         |   1 +
 hbase-native-client/core/client.h               |   5 +
 hbase-native-client/core/location-cache.cc      |   3 +-
 hbase-native-client/core/location-cache.h       |   3 +-
 hbase-native-client/exceptions/BUCK             |   2 +-
 hbase-native-client/exceptions/exception.h      |  92 ++++++-
 23 files changed, 450 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index 19536d5..36111f8 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -48,6 +48,7 @@ cxx_library(
         "//security:security",
         "//third-party:folly",
         "//third-party:wangle",
+        "//exceptions:exceptions",
     ],
     compiler_flags=['-Weffc++'],
     visibility=['//core/...',],)

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/client-dispatcher.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 626fc76..27201d2 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -17,12 +17,14 @@
  *
  */
 #include "connection/client-dispatcher.h"
+#include <folly/ExceptionWrapper.h>
 
 #include <utility>
 
 using namespace folly;
 using namespace hbase;
 using namespace wangle;
+using folly::exception_wrapper;
 
 ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {}
 
@@ -35,9 +37,11 @@ void ClientDispatcher::read(Context *ctx, std::unique_ptr<Response> in) {
 
   requests_.erase(call_id);
 
-  // TODO(eclark): check if the response
-  // is an exception. If it is then set that.
-  p.setValue(std::move(in));
+  if (in->exception()) {
+    p.setException(in->exception());
+  } else {
+    p.setValue(std::move(in));
+  }
 }
 
 Future<std::unique_ptr<Response>> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index af84572..113ebd0 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -19,6 +19,7 @@
 
 #include "connection/client-handler.h"
 
+#include <folly/ExceptionWrapper.h>
 #include <folly/Likely.h>
 #include <glog/logging.h>
 
@@ -36,9 +37,11 @@ using hbase::pb::ResponseHeader;
 using hbase::pb::GetResponse;
 using google::protobuf::Message;
 
-ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec)
+ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
+                             const std::string &server)
     : user_name_(user_name),
       serde_(codec),
+      server_(server),
       once_flag_(std::make_unique<std::once_flag>()),
       resp_msgs_(
           make_unique<folly::AtomicHashMap<uint32_t, std::shared_ptr<google::protobuf::Message>>>(
@@ -51,7 +54,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
     ResponseHeader header;
 
     int used_bytes = serde_.ParseDelimited(buf.get(), &header);
-    VLOG(1) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
+    VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
             << " has_exception=" << header.has_exception();
 
     // Get the response protobuf from the map
@@ -92,9 +95,31 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
       }
 
       received->set_resp_msg(resp_msg);
-    }
-    // TODO: set exception in Response here
+    } else {
+      hbase::pb::ExceptionResponse exceptionResponse = header.exception();
+
+      std::string what;
+      std::string exception_class_name = exceptionResponse.has_exception_class_name()
+                                             ? exceptionResponse.exception_class_name()
+                                             : "";
+      std::string stack_trace =
+          exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "";
+      what.append(exception_class_name).append(stack_trace);
+
+      auto remote_exception = std::make_unique<RemoteException>(what);
+      remote_exception->set_exception_class_name(exception_class_name)
+          ->set_stack_trace(stack_trace)
+          ->set_hostname(exceptionResponse.has_hostname() ? exceptionResponse.hostname() : "")
+          ->set_port(exceptionResponse.has_port() ? exceptionResponse.port() : 0);
+      if (exceptionResponse.has_do_not_retry()) {
+        remote_exception->set_do_not_retry(exceptionResponse.do_not_retry());
+      }
 
+      VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id()
+              << " exception.what=" << remote_exception->what()
+              << ", do_not_retry=" << remote_exception->do_not_retry();
+      received->set_exception(::folly::exception_wrapper{*remote_exception});
+    }
     ctx->fireRead(std::move(received));
   }
 }
@@ -103,17 +128,19 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
   // We need to send the header once.
   // So use call_once to make sure that only one thread wins this.
   std::call_once((*once_flag_), [ctx, this]() {
+    VLOG(3) << "Writing RPC connection Preamble and Header to server: " << server_;
     auto pre = serde_.Preamble();
     auto header = serde_.Header(user_name_);
     pre->appendChain(std::move(header));
     ctx->fireWrite(std::move(pre));
   });
 
+  VLOG(3) << "Writing RPC Request with call_id:"
+          << r->call_id();  // TODO: more logging for RPC Header
+
   // Now store the call id to response.
   resp_msgs_->insert(r->call_id(), r->resp_msg());
 
-  VLOG(1) << "Writing RPC Request with call_id:" << r->call_id();
-
   // Send the data down the pipeline.
   return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/client-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index afb8e62..4c106e0 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -27,6 +27,7 @@
 #include <string>
 #include <utility>
 
+#include "exceptions/exception.h"
 #include "serde/codec.h"
 #include "serde/rpc.h"
 
@@ -59,7 +60,7 @@ class ClientHandler
    * Create the handler
    * @param user_name the user name of the user running this process.
    */
-  explicit ClientHandler(std::string user_name, std::shared_ptr<Codec> codec);
+  ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, const std::string &server);
 
   /**
    * Get bytes from the wire.
@@ -77,6 +78,7 @@ class ClientHandler
   std::unique_ptr<std::once_flag> once_flag_;
   std::string user_name_;
   RpcSerde serde_;
+  std::string server_;  // for logging
 
   // in flight requests
   std::unique_ptr<folly::AtomicHashMap<uint32_t, std::shared_ptr<google::protobuf::Message>>>

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 832b00f..afa227d 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -46,9 +46,10 @@ std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::M
 
   return client;
 }
+
 std::shared_ptr<HBaseService> ConnectionFactory::Connect(
     std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client, const std::string &hostname,
-    int port) {
+    uint16_t port) {
   // Yes this will block however it makes dealing with connection pool soooooo
   // much nicer.
   // TODO see about using shared promise for this.

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/connection-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index 32d0bf7..1e75571 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -61,7 +61,7 @@ class ConnectionFactory {
    */
   virtual std::shared_ptr<HBaseService> Connect(
       std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client,
-      const std::string &hostname, int port);
+      const std::string &hostname, uint16_t port);
 
  private:
   nanoseconds connect_timeout_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
index 623ce3c..8ecdf29 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -40,7 +40,7 @@ class MockConnectionFactory : public ConnectionFactory {
   MOCK_METHOD0(MakeBootstrap, std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>());
   MOCK_METHOD3(Connect, std::shared_ptr<HBaseService>(
                             std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
-                            const std::string &hostname, int port));
+                            const std::string &hostname, uint16_t port));
 };
 
 class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {};

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 4fe4610..3121294 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -19,10 +19,11 @@
 
 #include "connection/connection-pool.h"
 
+#include <folly/Conv.h>
+#include <folly/Logging.h>
 #include <folly/SocketAddress.h>
 #include <wangle/service/Service.h>
 
-#include <folly/Logging.h>
 #include <memory>
 #include <utility>
 
@@ -89,7 +90,6 @@ std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection(
     /* create new connection */
     auto clientBootstrap = cf_->MakeBootstrap();
     auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port());
-
     auto connection = std::make_shared<RpcConnection>(remote_id, dispatcher);
 
     connections_.insert(std::make_pair(remote_id, connection));
@@ -101,6 +101,8 @@ std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection(
 
 void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
   SharedMutexWritePriority::WriteHolder holder{map_mutex_};
+  DLOG(INFO) << "Closing RPC Connection to host:" << remote_id->host()
+             << ", port:" << folly::to<std::string>(remote_id->port());
 
   auto found = connections_.find(remote_id);
   if (found == connections_.end() || found->second == nullptr) {
@@ -108,6 +110,7 @@ void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
   }
   found->second->Close();
   connections_.erase(found);
+  // TODO: erase the client as well?
 }
 
 void ConnectionPool::Close() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/pipeline.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc
index 00dc05c..edada52 100644
--- a/hbase-native-client/connection/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -35,11 +35,14 @@ RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec)
 
 SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
     std::shared_ptr<AsyncTransportWrapper> sock) {
+  SocketAddress addr;  // for logging
+  sock->getPeerAddress(&addr);
+
   auto pipeline = SerializePipeline::create();
   pipeline->addBack(AsyncSocketHandler{sock});
   pipeline->addBack(EventBaseHandler{});
   pipeline->addBack(LengthFieldBasedFrameDecoder{});
-  pipeline->addBack(ClientHandler{user_util_.user_name(), codec_});
+  pipeline->addBack(ClientHandler{user_util_.user_name(), codec_, addr.describe()});
   pipeline->finalize();
   return pipeline;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/connection/response.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h
index 1d60fed..c5472b0 100644
--- a/hbase-native-client/connection/response.h
+++ b/hbase-native-client/connection/response.h
@@ -22,6 +22,8 @@
 #include <memory>
 #include <utility>
 
+#include <folly/ExceptionWrapper.h>
+
 #include "serde/cell-scanner.h"
 
 // Forward
@@ -44,7 +46,7 @@ class Response {
    * Constructor.
    * Initinalizes the call id to 0. 0 should never be a valid call id.
    */
-  Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {}
+  Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr), exception_(nullptr) {}
 
   /** Get the call_id */
   uint32_t call_id() { return call_id_; }
@@ -70,9 +72,14 @@ class Response {
 
   const std::unique_ptr<CellScanner>& cell_scanner() const { return cell_scanner_; }
 
+  folly::exception_wrapper exception() { return exception_; }
+
+  void set_exception(folly::exception_wrapper value) { exception_ = value; }
+
  private:
   uint32_t call_id_;
   std::shared_ptr<google::protobuf::Message> resp_msg_;
   std::unique_ptr<CellScanner> cell_scanner_;
+  folly::exception_wrapper exception_;
 };
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-connection.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc
index b945e38..4642c61 100644
--- a/hbase-native-client/core/async-connection.cc
+++ b/hbase-native-client/core/async-connection.cc
@@ -29,6 +29,13 @@ void AsyncConnectionImpl::Init() {
   auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
   cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
   io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
+  /*
+   * We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly.
+   * Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments
+   * in async-rpc-retrying-caller.cc.
+   */
+  retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
 
   std::shared_ptr<Codec> codec = nullptr;
   if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
@@ -41,22 +48,21 @@ void AsyncConnectionImpl::Init() {
       std::make_shared<hbase::RpcClient>(io_executor_, codec, connection_conf_->connect_timeout());
   location_cache_ =
       std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
-  caller_factory_ = std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this());
+  caller_factory_ =
+      std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
 }
 
 // We can't have the threads continue running after everything is done
 // that leads to an error.
-AsyncConnectionImpl::~AsyncConnectionImpl() {
-  cpu_executor_->stop();
-  io_executor_->stop();
-  if (rpc_client_.get()) rpc_client_->Close();
-}
+AsyncConnectionImpl::~AsyncConnectionImpl() { Close(); }
 
 void AsyncConnectionImpl::Close() {
   if (is_closed_) return;
 
   cpu_executor_->stop();
   io_executor_->stop();
+  retry_executor_->stop();
+  retry_timer_->destroy();
   if (rpc_client_.get()) rpc_client_->Close();
   is_closed_ = true;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-connection.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h
index ff11577..7b260a5 100644
--- a/hbase-native-client/core/async-connection.h
+++ b/hbase-native-client/core/async-connection.h
@@ -53,6 +53,9 @@ class AsyncConnection {
   virtual std::shared_ptr<RpcClient> rpc_client() = 0;
   virtual std::shared_ptr<AsyncRegionLocator> region_locator() = 0;
   virtual std::shared_ptr<HBaseRpcController> CreateRpcController() = 0;
+  virtual std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() = 0;
+  virtual std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() = 0;
+  virtual std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() = 0;
   virtual void Close() = 0;
 };
 
@@ -81,6 +84,11 @@ class AsyncConnectionImpl : public AsyncConnection,
   std::shared_ptr<HBaseRpcController> CreateRpcController() override {
     return std::make_shared<HBaseRpcController>();
   }
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+    return retry_executor_;
+  }
 
   void Close() override;
 
@@ -98,8 +106,10 @@ class AsyncConnectionImpl : public AsyncConnection,
   std::shared_ptr<Configuration> conf_;
   std::shared_ptr<ConnectionConfiguration> connection_conf_;
   std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
   std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
   std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
   std::shared_ptr<LocationCache> location_cache_;
   std::shared_ptr<RpcClient> rpc_client_;
   bool is_closed_ = false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-region-locator.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-region-locator.h b/hbase-native-client/core/async-region-locator.h
index c606dcb..f75cb7e 100644
--- a/hbase-native-client/core/async-region-locator.h
+++ b/hbase-native-client/core/async-region-locator.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <folly/ExceptionWrapper.h>
 #include <folly/futures/Future.h>
 #include <memory>
 #include <string>
@@ -57,7 +58,8 @@ class AsyncRegionLocator {
   /**
    * Update cached region location, possibly using the information from exception.
    */
-  virtual void UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) = 0;
+  virtual void UpdateCachedLocation(const RegionLocation &loc,
+                                    const folly::exception_wrapper &error) = 0;
 };
 
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-rpc-retrying-caller-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 5bcad6c..5a80a06 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -41,8 +41,10 @@ template <typename RESP>
 class SingleRequestCallerBuilder
     : public std::enable_shared_from_this<SingleRequestCallerBuilder<RESP>> {
  public:
-  explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn)
+  explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn,
+                                      std::shared_ptr<folly::HHWheelTimer> retry_timer)
       : conn_(conn),
+        retry_timer_(retry_timer),
         table_name_(nullptr),
         rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
         pause_(conn->connection_conf()->pause()),
@@ -105,7 +107,7 @@ class SingleRequestCallerBuilder
 
   std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<RESP>> Build() {
     return std::make_shared<AsyncSingleRequestRpcRetryingCaller<RESP>>(
-        conn_, table_name_, row_, locate_type_, callable_, pause_, max_retries_,
+        conn_, retry_timer_, table_name_, row_, locate_type_, callable_, pause_, max_retries_,
         operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
   }
 
@@ -116,6 +118,7 @@ class SingleRequestCallerBuilder
 
  private:
   std::shared_ptr<AsyncConnection> conn_;
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
   std::shared_ptr<TableName> table_name_;
   nanoseconds rpc_timeout_nanos_;
   nanoseconds operation_timeout_nanos_;
@@ -130,15 +133,18 @@ class SingleRequestCallerBuilder
 class AsyncRpcRetryingCallerFactory {
  private:
   std::shared_ptr<AsyncConnection> conn_;
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
 
  public:
-  explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn) : conn_(conn) {}
+  explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn,
+                                         std::shared_ptr<folly::HHWheelTimer> retry_timer)
+      : conn_(conn), retry_timer_(retry_timer) {}
 
   virtual ~AsyncRpcRetryingCallerFactory() = default;
 
   template <typename RESP>
   std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() {
-    return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_);
+    return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_);
   }
 };
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index 965a44b..7e211f7 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -19,6 +19,8 @@
 
 #include "core/async-rpc-retrying-caller.h"
 
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
 #include <folly/Format.h>
 #include <folly/Logging.h>
 #include <folly/futures/Unit.h>
@@ -34,15 +36,19 @@
 #include "utils/sys-util.h"
 #include "utils/time-util.h"
 
+using folly::exception_wrapper;
+
 namespace hbase {
 
 template <typename RESP>
 AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
-    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<hbase::pb::TableName> table_name,
-    const std::string& row, RegionLocateType locate_type, Callable<RESP> callable,
-    nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos,
-    nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+    std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row,
+    RegionLocateType locate_type, Callable<RESP> callable, nanoseconds pause, uint32_t max_retries,
+    nanoseconds operation_timeout_nanos, nanoseconds rpc_timeout_nanos,
+    uint32_t start_log_errors_count)
     : conn_(conn),
+      retry_timer_(retry_timer),
       table_name_(table_name),
       row_(row),
       locate_type_(locate_type),
@@ -58,7 +64,6 @@ AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
   start_ns_ = TimeUtil::GetNowNanos();
   max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
   exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
-  retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_);
 }
 
 template <typename RESP>
@@ -87,7 +92,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
   conn_->region_locator()
       ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
       .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
-      .onError([this](const std::exception& e) {
+      .onError([this](const exception_wrapper& e) {
         OnError(e,
                 [this]() -> std::string {
                   return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
@@ -96,17 +101,17 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
                          TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " +
                          TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
                 },
-                [](const std::exception& error) {});
+                [](const exception_wrapper& error) {});
       });
 }
 
 template <typename RESP>
 void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
-    const std::exception& error, Supplier<std::string> err_msg,
-    Consumer<std::exception> update_cached_location) {
-  ThrowableWithExtraContext twec(std::make_shared<std::exception>(error), TimeUtil::GetNowNanos());
+    const exception_wrapper& error, Supplier<std::string> err_msg,
+    Consumer<exception_wrapper> update_cached_location) {
+  ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
   exceptions_->push_back(twec);
-  if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) || tries_ >= max_retries_) {
+  if (!ShouldRetry(error) || tries_ >= max_retries_) {
     CompleteExceptionally();
     return;
   }
@@ -124,8 +129,33 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
   }
   update_cached_location(error);
   tries_++;
-  retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); },
-                                  milliseconds(TimeUtil::ToMillis(delay_ns)));
+
+  /*
+   * The HHWheelTimer::scheduleTimeout() fails with an assertion from
+   * EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of
+   * the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry
+   * timer from IOThreadPool threads. It only works when executed from a single-thread pool
+   * (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should
+   * still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool.
+   * IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection
+   * establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread
+   * just hangs because it deadlocks itself.
+   */
+  conn_->retry_executor()->add([&]() {
+    retry_timer_->scheduleTimeoutFn(
+        [this]() {
+          conn_->cpu_executor()->add([&]() { LocateThenCall(); });
+        },
+        milliseconds(TimeUtil::ToMillis(delay_ns)));
+  });
+}
+
+template <typename RESP>
+bool AsyncSingleRequestRpcRetryingCaller<RESP>::ShouldRetry(const exception_wrapper& error) {
+  bool do_not_retry = false;
+  error.with_exception(
+      [&](const RemoteException& remote_ex) { do_not_retry &= remote_ex.do_not_retry(); });
+  return !do_not_retry;
 }
 
 template <typename RESP>
@@ -143,33 +173,14 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc)
   }
 
   std::shared_ptr<RpcClient> rpc_client;
-  try {
-    // TODO: There is no connection attempt happening here, no need to try-catch.
-    rpc_client = conn_->rpc_client();
-  } catch (const IOException& e) {
-    OnError(e,
-            [&, this]() -> std::string {
-              return "Get async rpc_client to " +
-                     folly::sformat("{0}:{1}", loc.server_name().host_name(),
-                                    loc.server_name().port()) +
-                     " for '" + row_ + "' in " + loc.DebugString() + " of " +
-                     table_name_->namespace_() + "::" + table_name_->qualifier() +
-                     " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
-                     std::to_string(max_attempts_) + ", timeout = " +
-                     TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
-                     " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
-            },
-            [&, this](const std::exception& error) {
-              conn_->region_locator()->UpdateCachedLocation(loc, error);
-            });
-    return;
-  }
+
+  rpc_client = conn_->rpc_client();
 
   ResetController(controller_, call_timeout_ns);
 
   callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client)
       .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
-      .onError([&, this](const std::exception& e) {
+      .onError([&, this](const exception_wrapper& e) {
         OnError(e,
                 [&, this]() -> std::string {
                   return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(),
@@ -182,10 +193,9 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc)
                          " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
                          " ms";
                 },
-                [&, this](const std::exception& error) {
+                [&, this](const exception_wrapper& error) {
                   conn_->region_locator()->UpdateCachedLocation(loc, error);
                 });
-        return;
       });
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
index 6006388..c86ad0b5 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller.h
@@ -18,6 +18,7 @@
  */
 #pragma once
 
+#include <folly/ExceptionWrapper.h>
 #include <folly/futures/Future.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/HHWheelTimer.h>
@@ -70,6 +71,7 @@ template <typename RESP>
 class AsyncSingleRequestRpcRetryingCaller {
  public:
   AsyncSingleRequestRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
+                                      std::shared_ptr<folly::HHWheelTimer> retry_timer,
                                       std::shared_ptr<hbase::pb::TableName> table_name,
                                       const std::string& row, RegionLocateType locate_type,
                                       Callable<RESP> callable, nanoseconds pause,
@@ -84,8 +86,10 @@ class AsyncSingleRequestRpcRetryingCaller {
  private:
   void LocateThenCall();
 
-  void OnError(const std::exception& error, Supplier<std::string> err_msg,
-               Consumer<std::exception> update_cached_location);
+  void OnError(const folly::exception_wrapper& error, Supplier<std::string> err_msg,
+               Consumer<folly::exception_wrapper> update_cached_location);
+
+  bool ShouldRetry(const folly::exception_wrapper& error);
 
   void Call(const RegionLocation& loc);
 
@@ -97,8 +101,8 @@ class AsyncSingleRequestRpcRetryingCaller {
                               const int64_t& timeout_ns);
 
  private:
-  folly::HHWheelTimer::UniquePtr retry_timer_;
   std::shared_ptr<AsyncConnection> conn_;
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
   std::shared_ptr<hbase::pb::TableName> table_name_;
   std::string row_;
   RegionLocateType locate_type_;
@@ -114,6 +118,5 @@ class AsyncSingleRequestRpcRetryingCaller {
   uint32_t tries_;
   std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
   uint32_t max_attempts_;
-  folly::EventBase event_base_;
 };
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index 4956972..ff28e79 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -20,10 +20,13 @@
 #include <folly/Logging.h>
 #include <folly/Memory.h>
 #include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
 #include <gmock/gmock.h>
 #include <google/protobuf/stubs/callback.h>
 #include <wangle/concurrent/IOThreadPoolExecutor.h>
 
+#include <chrono>
 #include <functional>
 #include <string>
 
@@ -67,15 +70,33 @@ using hbase::Client;
 using ::testing::Return;
 using ::testing::_;
 using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
 
-class MockAsyncRegionLocator : public AsyncRegionLocator {
+using namespace hbase;
+
+using folly::exception_wrapper;
+
+class AsyncRpcRetryTest : public ::testing::Test {
  public:
-  explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+  static std::unique_ptr<hbase::TestUtil> test_util;
+
+  static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
+    test_util = std::make_unique<hbase::TestUtil>();
+    test_util->StartMiniCluster(2);
+  }
+};
+std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr;
+
+class AsyncRegionLocatorBase : public AsyncRegionLocator {
+ public:
+  AsyncRegionLocatorBase() {}
+  explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
       : region_location_(region_location) {}
-  ~MockAsyncRegionLocator() = default;
+  virtual ~AsyncRegionLocatorBase() = default;
 
-  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName&,
-                                                                     const std::string&,
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
+                                                                     const std::string &,
                                                                      const RegionLocateType,
                                                                      const int64_t) override {
     folly::Promise<std::shared_ptr<RegionLocation>> promise;
@@ -83,22 +104,102 @@ class MockAsyncRegionLocator : public AsyncRegionLocator {
     return promise.getFuture();
   }
 
-  void UpdateCachedLocation(const RegionLocation&, const std::exception&) override {}
+  virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
+    region_location_ = region_location;
+  }
+
+  void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {}
 
- private:
+ protected:
   std::shared_ptr<RegionLocation> region_location_;
 };
 
+class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
+ public:
+  MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
+  explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockAsyncRegionLocator() {}
+};
+
+class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+  uint32_t tries_ = 0;
+  uint32_t num_fails_ = 0;
+
+ public:
+  explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
+      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+  explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockWrongRegionAsyncRegionLocator() {}
+
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override {
+    // Fail for num_fails_ times, then delegate to the super class which will give the correct
+    // region location.
+    if (tries_++ > num_fails_) {
+      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+    }
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    /* set random region name, simulating invalid region */
+    auto result = std::make_shared<RegionLocation>(
+        "whatever-region-name", region_location_->region_info(), region_location_->server_name(),
+        region_location_->service());
+    promise.setValue(result);
+    return promise.getFuture();
+  }
+};
+
+class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+  uint32_t tries_ = 0;
+  uint32_t num_fails_ = 0;
+
+ public:
+  explicit  MockFailingAsyncRegionLocator(uint32_t num_fails)
+      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+  explicit  MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockFailingAsyncRegionLocator() {}
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override {
+    // Fail for num_fails_ times, then delegate to the super class which will give the correct
+    // region location.
+    if (tries_++ > num_fails_) {
+      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+    }
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    promise.setException(std::runtime_error{"Failed to look up region location"});
+    return promise.getFuture();
+  }
+};
+
 class MockAsyncConnection : public AsyncConnection,
                             public std::enable_shared_from_this<MockAsyncConnection> {
  public:
   MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+                      std::shared_ptr<folly::HHWheelTimer> retry_timer,
+                      std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+                      std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                      std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
                       std::shared_ptr<RpcClient> rpc_client,
                       std::shared_ptr<AsyncRegionLocator> region_locator)
-      : conn_conf_(conn_conf), rpc_client_(rpc_client), region_locator_(region_locator) {}
+      : conn_conf_(conn_conf),
+        retry_timer_(retry_timer),
+        cpu_executor_(cpu_executor),
+        io_executor_(io_executor),
+        retry_executor_(retry_executor),
+        rpc_client_(rpc_client),
+        region_locator_(region_locator) {}
   ~MockAsyncConnection() {}
   void Init() {
-    caller_factory_ = std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this());
+    caller_factory_ =
+        std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
   }
 
   std::shared_ptr<Configuration> conf() override { return nullptr; }
@@ -108,6 +209,11 @@ class MockAsyncConnection : public AsyncConnection,
   }
   std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
   std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+    return retry_executor_;
+  }
 
   void Close() override {}
   std::shared_ptr<HBaseRpcController> CreateRpcController() override {
@@ -115,17 +221,20 @@ class MockAsyncConnection : public AsyncConnection,
   }
 
  private:
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
   std::shared_ptr<ConnectionConfiguration> conn_conf_;
   std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
   std::shared_ptr<RpcClient> rpc_client_;
   std::shared_ptr<AsyncRegionLocator> region_locator_;
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
 };
 
 template <typename CONN>
 class MockRawAsyncTableImpl {
  public:
-  explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn)
-      : conn_(conn), promise_(std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>()) {}
+  explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {}
   virtual ~MockRawAsyncTableImpl() = default;
 
   /* implement this in real RawAsyncTableImpl. */
@@ -133,11 +242,13 @@ class MockRawAsyncTableImpl {
   /* in real RawAsyncTableImpl, this should be private. */
   folly::Future<std::shared_ptr<hbase::Result>> GetCall(
       std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
-      std::shared_ptr<RegionLocation> loc, const hbase::Get& get) {
+      std::shared_ptr<RegionLocation> loc, const hbase::Get &get) {
     hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = [](
         std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
         std::shared_ptr<HBaseRpcController> controller,
         std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
+      VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:"
+              << loc->DebugString();
       return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
                                    std::move(preq), User::defaultUser(), "ClientService");
     };
@@ -151,17 +262,25 @@ class MockRawAsyncTableImpl {
   template <typename REQ, typename PREQ, typename PRESP, typename RESP>
   folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client,
                            std::shared_ptr<HBaseRpcController> controller,
-                           std::shared_ptr<RegionLocation> loc, const REQ& req,
+                           std::shared_ptr<RegionLocation> loc, const REQ &req,
                            ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
-                           const hbase::RpcCall<PREQ, PRESP>& rpc_call,
+                           hbase::RpcCall<PREQ, PRESP> rpc_call,
                            RespConverter<RESP, PRESP> resp_converter) {
+    promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
+    auto f = promise_->getFuture();
+    VLOG(1) << "calling rpc_call";
     rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
         .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) {
+          VLOG(1) << "MockRawAsyncTableImpl#call succeded: ";
           RESP result = resp_converter(*presp);
           promise_->setValue(result);
         })
-        .onError([this](const std::exception& e) { promise_->setException(e); });
-    return promise_->getFuture();
+        .onError([this](const exception_wrapper &e) {
+          VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what();
+          VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name();
+          promise_->setException(e);
+        });
+    return f;
   }
 
  private:
@@ -169,22 +288,19 @@ class MockRawAsyncTableImpl {
   std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_;
 };
 
-TEST(AsyncRpcRetryTest, TestGetBasic) {
-  // Using TestUtil to populate test data
-  auto test_util = std::make_unique<hbase::TestUtil>();
-  test_util->StartMiniCluster(2);
-
-  test_util->CreateTable("t", "d");
+void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName,
+             uint32_t operation_timeout_millis = 1200000) {
+  AsyncRpcRetryTest::test_util->CreateTable(tableName, "d");
 
   // Create TableName and Row to be fetched from HBase
-  auto tn = folly::to<hbase::pb::TableName>("t");
+  auto tn = folly::to<hbase::pb::TableName>(tableName);
   auto row = "test2";
 
   // Get to be performed on above HBase Table
   hbase::Get get(row);
 
   // Create a client
-  Client client(*(test_util->conf()));
+  Client client(*(AsyncRpcRetryTest::test_util->conf()));
 
   // Get connection to HBase Table
   auto table = client.Table(tn);
@@ -196,24 +312,32 @@ TEST(AsyncRpcRetryTest, TestGetBasic) {
   /* init region location and rpc channel */
   auto region_location = table->GetRegionLocation(row);
 
-  auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  // auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4);
+  auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+  auto io_executor_ = client.async_connection()->io_executor();
+  auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
   auto codec = std::make_shared<hbase::KeyValueCodec>();
   auto rpc_client = std::make_shared<RpcClient>(io_executor_, codec);
+  // auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true);
+  std::shared_ptr<folly::HHWheelTimer> retry_timer =
+      folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
 
   /* init connection configuration */
   auto connection_conf = std::make_shared<ConnectionConfiguration>(
-      TimeUtil::SecondsToNanos(20),    // connect_timeout
-      TimeUtil::SecondsToNanos(1200),  // operation_timeout
-      TimeUtil::SecondsToNanos(60),    // rpc_timeout
-      TimeUtil::MillisToNanos(100),    // pause
-      31,                              // max retries
-      9);                              // start log errors count
+      TimeUtil::SecondsToNanos(20),                       // connect_timeout
+      TimeUtil::MillisToNanos(operation_timeout_millis),  // operation_timeout
+      TimeUtil::SecondsToNanos(60),                       // rpc_timeout
+      TimeUtil::MillisToNanos(100),                       // pause
+      5,                                                  // max retries
+      9);                                                 // start log errors count
 
-  /* init region locator */
-  auto region_locator = std::make_shared<MockAsyncRegionLocator>(region_location);
+  /* set region locator */
+  region_locator->set_region_location(region_location);
 
   /* init hbase client connection */
-  auto conn = std::make_shared<MockAsyncConnection>(connection_conf, rpc_client, region_locator);
+  auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+                                                    io_executor_, retry_executor_, rpc_client,
+                                                    region_locator);
   conn->Init();
 
   /* init retry caller factory */
@@ -237,7 +361,9 @@ TEST(AsyncRpcRetryTest, TestGetBasic) {
                        })
           ->Build();
 
-  auto result = async_caller->Call().get();
+  auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
+
+  auto result = async_caller->Call().get(milliseconds(500000));
 
   // Test the values, should be same as in put executed on hbase shell
   ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
@@ -245,6 +371,50 @@ TEST(AsyncRpcRetryTest, TestGetBasic) {
   EXPECT_EQ("value2", *(result->Value("d", "2")));
   EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
 
+  retry_timer->destroy();
   table->Close();
   client.Close();
+  retry_executor_->stop();
+}
+
+// Test successful case
+TEST_F(AsyncRpcRetryTest, TestGetBasic) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockAsyncRegionLocator>());
+  runTest(region_locator, "table1");
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncRpcRetryTest, TestHandleException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+  runTest(region_locator, "table2");
+}
+
+// Tests the RPC failing 5 times, throwing an exception
+TEST_F(AsyncRpcRetryTest, TestFailWithException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(5));
+  EXPECT_ANY_THROW(runTest(region_locator, "table3"));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(3));
+  runTest(region_locator, "table4");
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(5));
+  EXPECT_ANY_THROW(runTest(region_locator, "table5"));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(3));
+  EXPECT_ANY_THROW(runTest(region_locator, "table6", 200));
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index ff4879a..274168f 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -156,6 +156,7 @@ TEST_F(ClientTest, GetForNonExistentTable) {
   // Get to be performed on above HBase Table
   hbase::Get get(row);
 
+  ClientTest::test_util->conf()->SetInt("hbase.client.retries.number", 5);
   // Create a client
   hbase::Client client(*ClientTest::test_util->conf());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 0e11278..2719470 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -61,6 +61,11 @@ class Client {
    */
   void Close();
 
+  /**
+   * @brief Internal. DO NOT USE.
+   */
+  std::shared_ptr<AsyncConnectionImpl> async_connection() { return async_connection_; }
+
  private:
   /** Data */
   std::shared_ptr<AsyncConnectionImpl> async_connection_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 07c3d61..e0afcfb 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -284,7 +284,8 @@ void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const st
   table_locs->erase(row);
 }
 
-void LocationCache::UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) {
+void LocationCache::UpdateCachedLocation(const RegionLocation &loc,
+                                         const folly::exception_wrapper &error) {
   // TODO: just clears the location for now. We can inspect RegionMovedExceptions, etc later.
   ClearCachedLocation(loc.region_info().table_name(), loc.region_info().start_key());
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/core/location-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index 5e79213..a3c15cb 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -18,6 +18,7 @@
  */
 #pragma once
 
+#include <folly/ExceptionWrapper.h>
 #include <folly/Executor.h>
 #include <folly/SharedMutex.h>
 #include <folly/futures/Future.h>
@@ -180,7 +181,7 @@ class LocationCache : public AsyncRegionLocator {
    * Update cached region location, possibly using the information from exception.
    */
   virtual void UpdateCachedLocation(const RegionLocation &loc,
-                                    const std::exception &error) override;
+                                    const folly::exception_wrapper &error) override;
 
   const std::string &zk_quorum() { return zk_quorum_; }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/exceptions/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK
index a23654c..eef4437 100644
--- a/hbase-native-client/exceptions/BUCK
+++ b/hbase-native-client/exceptions/BUCK
@@ -21,4 +21,4 @@ cxx_library(
     srcs=[],
     deps=["//third-party:folly",],
     compiler_flags=['-Weffc++'],
-    visibility=['//core/...'],)
\ No newline at end of file
+    visibility=['//core/...','//connection//...'],)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/018f1eab/hbase-native-client/exceptions/exception.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h
index c0c4142..2943d57 100644
--- a/hbase-native-client/exceptions/exception.h
+++ b/hbase-native-client/exceptions/exception.h
@@ -22,52 +22,55 @@
 #include <string>
 #include <vector>
 #include <folly/io/IOBuf.h>
+#include <folly/ExceptionWrapper.h>
 
 namespace hbase {
 
 class ThrowableWithExtraContext {
 public:
-  ThrowableWithExtraContext(std::shared_ptr<std::exception> cause,
+  ThrowableWithExtraContext(folly::exception_wrapper cause,
       const long& when) :
       cause_(cause), when_(when), extras_("") {
   }
 
-  ThrowableWithExtraContext(std::shared_ptr<std::exception> cause,
+  ThrowableWithExtraContext(folly::exception_wrapper cause,
       const long& when, const std::string& extras) :
       cause_(cause), when_(when), extras_(extras) {
   }
 
-  std::string ToString() {
+  virtual std::string ToString() {
     // TODO:
     // return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
-    return extras_ + ", " + cause_->what();
+    return extras_ + ", " + cause_.what().toStdString();
   }
 
-  std::shared_ptr<std::exception> cause() {
+  virtual folly::exception_wrapper cause() {
     return cause_;
   }
 private:
-  std::shared_ptr<std::exception> cause_;
+  folly::exception_wrapper cause_;
   long when_;
   std::string extras_;
 };
 
 class IOException: public std::logic_error {
 public:
+  IOException() : logic_error("") {}
+
   IOException(
         const std::string& what) :
-        logic_error(what), cause_(nullptr) {}
+        logic_error(what) {}
   IOException(
       const std::string& what,
-      std::shared_ptr<std::exception> cause) :
+	  folly::exception_wrapper cause) :
       logic_error(what), cause_(cause) {}
   virtual ~IOException() = default;
 
-  std::shared_ptr<std::exception> cause() {
+  virtual folly::exception_wrapper cause() {
     return cause_;
   }
 private:
-  const std::shared_ptr<std::exception> cause_;
+  folly::exception_wrapper cause_;
 };
 
 class RetriesExhaustedException: public IOException {
@@ -77,7 +80,7 @@ public:
       std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) :
         IOException(
             GetMessage(num_retries, exceptions),
-            exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){
+            exceptions->empty() ? folly::exception_wrapper{} : (*exceptions)[exceptions->size() - 1].cause()){
   }
   virtual ~RetriesExhaustedException() = default;
 
@@ -99,6 +102,71 @@ private:
 class HBaseIOException : public IOException {
 };
 
-class DoNotRetryIOException : public HBaseIOException {
+class RemoteException : public IOException {
+public:
+
+  RemoteException() : port_(0), do_not_retry_(false) {}
+
+  RemoteException(const std::string& what) :
+      IOException(what), port_(0), do_not_retry_(false) {}
+
+  RemoteException(
+      const std::string& what,
+      folly::exception_wrapper cause) :
+      IOException(what, cause), port_(0), do_not_retry_(false) {}
+
+  virtual ~RemoteException() = default;
+
+  std::string exception_class_name() const {
+    return exception_class_name_;
+  }
+
+  RemoteException* set_exception_class_name(const std::string& value) {
+    exception_class_name_ = value;
+    return this;
+  }
+
+  std::string stack_trace() const {
+    return stack_trace_;
+  }
+
+  RemoteException* set_stack_trace(const std::string& value) {
+    stack_trace_ = value;
+    return this;
+  }
+
+  std::string hostname() const {
+    return hostname_;
+  }
+
+  RemoteException* set_hostname(const std::string& value) {
+    hostname_ = value;
+    return this;
+  }
+
+  int port() const {
+    return port_;
+  }
+
+  RemoteException* set_port(int value) {
+    port_ = value;
+    return this;
+  }
+
+  bool do_not_retry() const {
+    return do_not_retry_;
+  }
+
+  RemoteException* set_do_not_retry(bool value) {
+    do_not_retry_ = value;
+    return this;
+  }
+
+private:
+  std::string exception_class_name_;
+  std::string stack_trace_;
+  std::string hostname_;
+  int port_;
+  bool do_not_retry_;
 };
 }  // namespace hbase