You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/09/15 21:20:26 UTC

[09/25] hbase git commit: HBASE-18725 [C++] Install header files as well as library

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-test.cc b/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-test.cc
new file mode 100644
index 0000000..b0f4afb
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-test.cc
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Memory.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
+#include <gtest/gtest.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <chrono>
+#include <functional>
+#include <string>
+
+#include "hbase/connection/rpc-client.h"
+#include "hbase/client/async-batch-rpc-retrying-caller.h"
+#include "hbase/client/async-connection.h"
+#include "hbase/client/async-rpc-retrying-caller-factory.h"
+#include "hbase/client/client.h"
+#include "hbase/client/connection-configuration.h"
+#include "hbase/client/keyvalue-codec.h"
+#include "hbase/client/region-location.h"
+#include "hbase/client/result.h"
+#include "hbase/exceptions/exception.h"
+#include "hbase/test-util/test-util.h"
+#include "hbase/utils/time-util.h"
+
+using hbase::AsyncRpcRetryingCallerFactory;
+using hbase::AsyncConnection;
+using hbase::AsyncRegionLocator;
+using hbase::ConnectionConfiguration;
+using hbase::Configuration;
+using hbase::HBaseRpcController;
+using hbase::RegionLocation;
+using hbase::RegionLocateType;
+using hbase::RpcClient;
+using hbase::RequestConverter;
+using hbase::ResponseConverter;
+using hbase::Put;
+using hbase::TimeUtil;
+using hbase::Client;
+using hbase::security::User;
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+using namespace hbase;
+
+using folly::exception_wrapper;
+
+class AsyncBatchRpcRetryTest : public ::testing::Test {
+ public:
+  static std::unique_ptr<hbase::TestUtil> test_util;
+  static std::string tableName;
+
+  static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
+    test_util = std::make_unique<hbase::TestUtil>();
+    test_util->StartMiniCluster(2);
+    std::vector<std::string> keys{"test0",   "test100", "test200", "test300", "test400",
+                                  "test500", "test600", "test700", "test800", "test900"};
+    tableName = "split-table1";
+    test_util->CreateTable(tableName, "d", keys);
+  }
+};
+std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr;
+std::string AsyncBatchRpcRetryTest::tableName;
+
+class AsyncRegionLocatorBase : public AsyncRegionLocator {
+ public:
+  AsyncRegionLocatorBase() {}
+  explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
+      : region_location_(region_location) {}
+  virtual ~AsyncRegionLocatorBase() = default;
+
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
+                                                                     const std::string &row,
+                                                                     const RegionLocateType,
+                                                                     const int64_t) override {
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    promise.setValue(region_locations_.at(row));
+    return promise.getFuture();
+  }
+
+  virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
+    region_location_ = region_location;
+  }
+
+  virtual void set_region_location(
+      const std::map<std::string, std::shared_ptr<RegionLocation>> &reg_locs) {
+    for (auto reg_loc : reg_locs) {
+      region_locations_[reg_loc.first] = reg_loc.second;
+    }
+  }
+
+  void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override {
+  }
+
+ protected:
+  std::shared_ptr<RegionLocation> region_location_;
+  std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_;
+  std::map<std::string, uint32_t> mtries_;
+  std::map<std::string, uint32_t> mnum_fails_;
+
+  void InitRetryMaps(uint32_t num_fails) {
+    if (mtries_.size() == 0 && mnum_fails_.size() == 0) {
+      for (auto reg_loc : region_locations_) {
+        mtries_[reg_loc.first] = 0;
+        mnum_fails_[reg_loc.first] = num_fails;
+      }
+    }
+  }
+};
+
+class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
+ public:
+  MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
+  explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockAsyncRegionLocator() {}
+};
+
+class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+  uint32_t counter_ = 0;
+  uint32_t num_fails_ = 0;
+  uint32_t tries_ = 0;
+
+ public:
+  explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
+      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+  explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockWrongRegionAsyncRegionLocator() {}
+
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override {
+    InitRetryMaps(num_fails_);
+    auto &tries = mtries_[row];
+    auto &num_fails = mnum_fails_[row];
+    if (++tries > num_fails) {
+      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+    }
+
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    /* set random region name, simulating invalid region */
+    auto result = std::make_shared<RegionLocation>("whatever-region-name",
+                                                   region_locations_.at(row)->region_info(),
+                                                   region_locations_.at(row)->server_name());
+    promise.setValue(result);
+    return promise.getFuture();
+  }
+};
+
+class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+  uint32_t tries_ = 0;
+  uint32_t num_fails_ = 0;
+  uint32_t counter_ = 0;
+
+ public:
+  explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
+      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+  explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockFailingAsyncRegionLocator() {}
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override {
+    InitRetryMaps(num_fails_);
+    auto &tries = mtries_[row];
+    auto &num_fails = mnum_fails_[row];
+    if (++tries > num_fails) {
+      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+    }
+
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    promise.setException(std::runtime_error{"Failed to look up region location"});
+    return promise.getFuture();
+  }
+};
+
+class MockAsyncConnection : public AsyncConnection,
+                            public std::enable_shared_from_this<MockAsyncConnection> {
+ public:
+  MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+                      std::shared_ptr<folly::HHWheelTimer> retry_timer,
+                      std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+                      std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                      std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
+                      std::shared_ptr<RpcClient> rpc_client,
+                      std::shared_ptr<AsyncRegionLocator> region_locator)
+      : conn_conf_(conn_conf),
+        retry_timer_(retry_timer),
+        cpu_executor_(cpu_executor),
+        io_executor_(io_executor),
+        retry_executor_(retry_executor),
+        rpc_client_(rpc_client),
+        region_locator_(region_locator) {}
+  ~MockAsyncConnection() {}
+  void Init() {
+    caller_factory_ =
+        std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
+  }
+
+  std::shared_ptr<Configuration> conf() override { return nullptr; }
+  std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
+    return caller_factory_;
+  }
+  std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
+  std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+    return retry_executor_;
+  }
+
+  void Close() override {
+    retry_timer_->destroy();
+    retry_executor_->stop();
+    io_executor_->stop();
+    cpu_executor_->stop();
+  }
+  std::shared_ptr<HBaseRpcController> CreateRpcController() override {
+    return std::make_shared<HBaseRpcController>();
+  }
+
+ private:
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+  std::shared_ptr<ConnectionConfiguration> conn_conf_;
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+  std::shared_ptr<RpcClient> rpc_client_;
+  std::shared_ptr<AsyncRegionLocator> region_locator_;
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
+};
+
+class MockRawAsyncTableImpl {
+ public:
+  explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn,
+                                 std::shared_ptr<hbase::pb::TableName> tn)
+      : conn_(conn), tn_(tn) {}
+  virtual ~MockRawAsyncTableImpl() = default;
+
+  /* implement this in real RawAsyncTableImpl. */
+  template <typename REQ, typename RESP>
+  folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) {
+    /* init request caller builder */
+    auto builder = conn_->caller_factory()->Batch<REQ, RESP>();
+
+    /* call with retry to get result */
+    auto async_caller =
+        builder->table(tn_)
+            ->actions(std::make_shared<std::vector<REQ>>(rows))
+            ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
+            ->operation_timeout(conn_->connection_conf()->operation_timeout())
+            ->pause(conn_->connection_conf()->pause())
+            ->max_attempts(conn_->connection_conf()->max_retries())
+            ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count())
+            ->Build();
+
+    return async_caller->Call().then([async_caller](auto r) { return r; });
+  }
+
+ private:
+  std::shared_ptr<MockAsyncConnection> conn_;
+  std::shared_ptr<hbase::pb::TableName> tn_;
+};
+
+std::shared_ptr<MockAsyncConnection> getAsyncConnection(
+    Client &client, uint32_t operation_timeout_millis, uint32_t tries,
+    std::shared_ptr<AsyncRegionLocatorBase> region_locator) {
+  /* init region location and rpc channel */
+  auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+  auto io_executor_ = client.async_connection()->io_executor();
+  auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  auto codec = std::make_shared<hbase::KeyValueCodec>();
+  auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
+                                                AsyncBatchRpcRetryTest::test_util->conf());
+  std::shared_ptr<folly::HHWheelTimer> retry_timer =
+      folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
+
+  /* init connection configuration */
+  auto connection_conf = std::make_shared<ConnectionConfiguration>(
+      TimeUtil::SecondsToNanos(20),                       // connect_timeout
+      TimeUtil::MillisToNanos(operation_timeout_millis),  // operation_timeout
+      TimeUtil::SecondsToNanos(60),                       // rpc_timeout
+      TimeUtil::MillisToNanos(100),                       // pause
+      tries,                                              // max retries
+      1);                                                 // start log errors count
+
+  return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+                                               io_executor_, retry_executor_, rpc_client,
+                                               region_locator);
+}
+
+template <typename ACTION>
+std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) {
+  std::vector<std::shared_ptr<hbase::Row>> rows;
+  for (auto action : actions) {
+    std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action);
+    rows.push_back(srow);
+  }
+  return rows;
+}
+
+template <typename REQ, typename RESP>
+std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions,
+                                                       std::vector<folly::Try<RESP>> &tresults) {
+  std::vector<std::shared_ptr<hbase::Result>> results{};
+  uint64_t num = 0;
+  for (auto tresult : tresults) {
+    if (tresult.hasValue()) {
+      results.push_back(tresult.value());
+    } else if (tresult.hasException()) {
+      folly::exception_wrapper ew = tresult.exception();
+      LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for "
+                 << actions[num].row();
+      throw ew;
+    }
+    ++num;
+  }
+  return results;
+}
+
+template <typename ACTION>
+std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions(
+    uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) {
+  std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
+  for (uint64_t i = 0; i < num_rows; ++i) {
+    auto row = "test" + std::to_string(i);
+    ACTION action(row);
+    actions.push_back(action);
+    region_locations[row] = table->GetRegionLocation(row);
+  }
+  return region_locations;
+}
+
+void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+                  const std::string &table_name, bool split_regions, uint32_t tries = 3,
+                  uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) {
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
+
+  // Create a client
+  Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+  // Get connection to HBase Table
+  std::shared_ptr<Table> table = client.Table(tn);
+
+  for (uint64_t i = 0; i < num_rows; i++) {
+    table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+                                                         "value" + std::to_string(i)));
+  }
+  std::vector<hbase::Get> gets;
+  auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table);
+
+  /* set region locator */
+  region_locator->set_region_location(region_locations);
+
+  /* init hbase client connection */
+  auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
+  conn->Init();
+
+  /* init retry caller factory */
+  auto tableImpl =
+      std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+  std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets);
+  auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
+      milliseconds(operation_timeout_millis));
+  ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+
+  auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults);
+  // Test the values, should be same as in put executed on hbase shell
+  ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
+  uint32_t i = 0;
+  for (; i < num_rows; ++i) {
+    ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
+                                        << " must not be empty";
+    EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
+    EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
+  }
+
+  table->Close();
+  client.Close();
+  conn->Close();
+}
+
+void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+                  const std::string &table_name, bool split_regions, uint32_t tries = 3,
+                  uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
+
+  // Create a client
+  Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+  // Get connection to HBase Table
+  std::shared_ptr<Table> table = client.Table(tn);
+
+  std::vector<hbase::Put> puts;
+  auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table);
+
+  /* set region locator */
+  region_locator->set_region_location(region_locations);
+
+  /* init hbase client connection */
+  auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
+  conn->Init();
+
+  /* init retry caller factory */
+  auto tableImpl =
+      std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+  std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts);
+  auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
+      milliseconds(operation_timeout_millis));
+  ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+
+  auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults);
+  // Test the values, should be same as in put executed on hbase shell
+  ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
+
+  table->Close();
+  client.Close();
+  conn->Close();
+}
+
+// Test successful case
+TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockAsyncRegionLocator>());
+  runMultiGets(region_locator, "table1", false);
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+  runMultiGets(region_locator, "table2", false, 5);
+}
+
+// Tests the RPC failing 4 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+  EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(3));
+  runMultiGets(region_locator, "table4", false);
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(4));
+  EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(6));
+  EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000));
+}
+
+//////////////////////
+// Test successful case
+TEST_F(AsyncBatchRpcRetryTest, MultiPuts) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockAsyncRegionLocator>());
+  runMultiPuts(region_locator, "table1", false);
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+  runMultiPuts(region_locator, "table2", false, 5);
+}
+
+// Tests the RPC failing 4 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(3));
+  runMultiPuts(region_locator, "table4", false);
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(4));
+  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(6));
+  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000));
+}
+
+ // Test successful case
+ TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runMultiGets(region_locator, "table7", true);
+ }
+
+ // Tests the RPC failing 3 times, then succeeding
+ TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table8", true, 5);
+ }
+
+ // Tests the RPC failing 4 times, throwing an exception
+ TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true));
+ }
+
+ // Tests the region location lookup failing 3 times, then succeeding
+ TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table10", true);
+ }
+
+ // Tests the region location lookup failing 5 times, throwing an exception
+ TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3));
+ }
+
+ // Tests hitting operation timeout, thus not retrying anymore
+ TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(6));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000));
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-client-scanner.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-client-scanner.cc b/hbase-native-client/src/hbase/client/async-client-scanner.cc
new file mode 100644
index 0000000..50c01ee
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-client-scanner.cc
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-client-scanner.h"
+
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+
+AsyncClientScanner::AsyncClientScanner(
+    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
+    std::shared_ptr<pb::TableName> table_name, std::shared_ptr<RawScanResultConsumer> consumer,
+    nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
+    nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+    : conn_(conn),
+      scan_(scan),
+      table_name_(table_name),
+      consumer_(consumer),
+      pause_(pause),
+      max_retries_(max_retries),
+      scan_timeout_nanos_(scan_timeout_nanos),
+      rpc_timeout_nanos_(rpc_timeout_nanos),
+      start_log_errors_count_(start_log_errors_count) {
+  results_cache_ = std::make_shared<ScanResultCache>();
+  max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+}
+
+void AsyncClientScanner::Start() { OpenScanner(); }
+
+folly::Future<std::shared_ptr<OpenScannerResponse>> AsyncClientScanner::CallOpenScanner(
+    std::shared_ptr<hbase::RpcClient> rpc_client,
+    std::shared_ptr<hbase::HBaseRpcController> controller,
+    std::shared_ptr<hbase::RegionLocation> loc) {
+  open_scanner_tries_++;
+
+  auto preq = RequestConverter::ToScanRequest(*scan_, loc->region_name(), scan_->Caching(), false);
+
+  auto self(shared_from_this());
+  VLOG(5) << "Calling RPC Client to open the scanner for region:" << loc->DebugString();
+  return rpc_client
+      ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq),
+                  security::User::defaultUser(), "ClientService")
+      .then([self, loc, controller, rpc_client](const std::unique_ptr<Response>& presp) {
+        VLOG(5) << "Scan Response:" << presp->DebugString();
+        return std::make_shared<OpenScannerResponse>(rpc_client, presp, loc, controller);
+      });
+}
+
+void AsyncClientScanner::OpenScanner() {
+  auto self(shared_from_this());
+  open_scanner_tries_ = 1;
+
+  auto caller = conn_->caller_factory()
+                    ->Single<std::shared_ptr<OpenScannerResponse>>()
+                    ->table(table_name_)
+                    ->row(scan_->StartRow())
+                    ->locate_type(GetLocateType(*scan_))
+                    ->rpc_timeout(rpc_timeout_nanos_)
+                    ->operation_timeout(scan_timeout_nanos_)
+                    ->pause(pause_)
+                    ->max_retries(max_retries_)
+                    ->start_log_errors_count(start_log_errors_count_)
+                    ->action([&](std::shared_ptr<hbase::HBaseRpcController> controller,
+                                 std::shared_ptr<hbase::RegionLocation> loc,
+                                 std::shared_ptr<hbase::RpcClient> rpc_client)
+                                 -> folly::Future<std::shared_ptr<OpenScannerResponse>> {
+                                   return CallOpenScanner(rpc_client, controller, loc);
+                                 })
+                    ->Build();
+
+  caller->Call()
+      .then([this, self](std::shared_ptr<OpenScannerResponse> resp) {
+        VLOG(3) << "Opened scanner with id:" << resp->scan_resp_->scanner_id()
+                << ", region:" << resp->region_location_->DebugString() << ", starting scan";
+        StartScan(resp);
+      })
+      .onError([this, self](const folly::exception_wrapper& e) {
+        VLOG(3) << "Open scan request received error:" << e.what();
+        consumer_->OnError(e);
+      })
+      .then([caller, self](const auto r) { return r; });
+}
+
+void AsyncClientScanner::StartScan(std::shared_ptr<OpenScannerResponse> resp) {
+  auto self(shared_from_this());
+  auto caller = conn_->caller_factory()
+                    ->Scan()
+                    ->scanner_id(resp->scan_resp_->scanner_id())
+                    ->region_location(resp->region_location_)
+                    ->scanner_lease_timeout(TimeUtil::MillisToNanos(resp->scan_resp_->ttl()))
+                    ->scan(scan_)
+                    ->rpc_client(resp->rpc_client_)
+                    ->consumer(consumer_)
+                    ->results_cache(results_cache_)
+                    ->rpc_timeout(rpc_timeout_nanos_)
+                    ->scan_timeout(scan_timeout_nanos_)
+                    ->pause(pause_)
+                    ->max_retries(max_retries_)
+                    ->start_log_errors_count(start_log_errors_count_)
+                    ->Build();
+
+  caller->Start(resp->controller_, resp->scan_resp_, resp->cell_scanner_)
+      .then([caller, self](const bool has_more) {
+        if (has_more) {
+          // open the next scanner on the next region.
+          self->OpenScanner();
+        } else {
+          self->consumer_->OnComplete();
+        }
+      })
+      .onError([caller, self](const folly::exception_wrapper& e) { self->consumer_->OnError(e); })
+      .then([caller, self](const auto r) { return r; });
+}
+
+RegionLocateType AsyncClientScanner::GetLocateType(const Scan& scan) {
+  // TODO: In C++, there is no Scan::IncludeStartRow() and Scan::IncludeStopRow().
+  // When added, this method should be modified to return other RegionLocateTypes
+  // (see ConnectionUtils.java #getLocateType())
+  // TODO: When reversed scans are implemented, return other RegionLocateTypes
+  return RegionLocateType::kCurrent;
+}
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-connection.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-connection.cc b/hbase-native-client/src/hbase/client/async-connection.cc
new file mode 100644
index 0000000..f1bdebc
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-connection.cc
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-connection.h"
+#include "hbase/client/async-rpc-retrying-caller-factory.h"
+
+namespace hbase {
+
+void AsyncConnectionImpl::Init() {
+  connection_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
+  // start thread pools
+  auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
+  auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
+  cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
+  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
+  /*
+   * We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly.
+   * Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments
+   * in async-rpc-retrying-caller.cc.
+   */
+  retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
+
+  std::shared_ptr<Codec> codec = nullptr;
+  if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
+      std::string(KeyValueCodec::kJavaClassName)) {
+    codec = std::make_shared<hbase::KeyValueCodec>();
+  } else {
+    LOG(WARNING) << "Not using RPC Cell Codec";
+  }
+  rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, cpu_executor_, codec, conf_,
+                                                   connection_conf_->connect_timeout());
+  location_cache_ = std::make_shared<hbase::LocationCache>(conf_, io_executor_, cpu_executor_,
+                                                           rpc_client_->connection_pool());
+  caller_factory_ =
+      std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
+}
+
+// We can't have the threads continue running after everything is done
+// that leads to an error.
+AsyncConnectionImpl::~AsyncConnectionImpl() { Close(); }
+
+void AsyncConnectionImpl::Close() {
+  if (is_closed_) return;
+
+  cpu_executor_->stop();
+  io_executor_->stop();
+  retry_executor_->stop();
+  retry_timer_->destroy();
+  if (rpc_client_.get()) rpc_client_->Close();
+  is_closed_ = true;
+}
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-rpc-retrying-caller-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-rpc-retrying-caller-factory.cc b/hbase-native-client/src/hbase/client/async-rpc-retrying-caller-factory.cc
new file mode 100644
index 0000000..42b4eac
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-rpc-retrying-caller-factory.cc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-rpc-retrying-caller-factory.h"
+
+namespace hbase {}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-rpc-retrying-caller.cc b/hbase-native-client/src/hbase/client/async-rpc-retrying-caller.cc
new file mode 100644
index 0000000..4c39f05
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-rpc-retrying-caller.cc
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-rpc-retrying-caller.h"
+
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/Unit.h>
+
+#include "hbase/connection/rpc-client.h"
+#include "hbase/client/async-connection.h"
+#include "hbase/client/hbase-rpc-controller.h"
+#include "hbase/client/region-location.h"
+#include "hbase/client/result.h"
+#include "hbase/exceptions/exception.h"
+#include "hbase/if/HBase.pb.h"
+#include "hbase/utils/connection-util.h"
+#include "hbase/utils/sys-util.h"
+#include "hbase/utils/time-util.h"
+
+using folly::exception_wrapper;
+
+namespace hbase {
+
+template <typename RESP>
+AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
+    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+    std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row,
+    RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause,
+    uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos,
+    std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+    : conn_(conn),
+      retry_timer_(retry_timer),
+      table_name_(table_name),
+      row_(row),
+      locate_type_(locate_type),
+      callable_(callable),
+      pause_(pause),
+      max_retries_(max_retries),
+      operation_timeout_nanos_(operation_timeout_nanos),
+      rpc_timeout_nanos_(rpc_timeout_nanos),
+      start_log_errors_count_(start_log_errors_count),
+      promise_(std::make_shared<folly::Promise<RESP>>()),
+      tries_(1) {
+  controller_ = conn_->CreateRpcController();
+  start_ns_ = TimeUtil::GetNowNanos();
+  max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+  exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+}
+
+template <typename RESP>
+AsyncSingleRequestRpcRetryingCaller<RESP>::~AsyncSingleRequestRpcRetryingCaller() {}
+
+template <typename RESP>
+folly::Future<RESP> AsyncSingleRequestRpcRetryingCaller<RESP>::Call() {
+  auto f = promise_->getFuture();
+  LocateThenCall();
+  return f;
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
+  int64_t locate_timeout_ns;
+  if (operation_timeout_nanos_.count() > 0) {
+    locate_timeout_ns = RemainingTimeNs();
+    if (locate_timeout_ns <= 0) {
+      CompleteExceptionally();
+      return;
+    }
+  } else {
+    locate_timeout_ns = -1L;
+  }
+
+  conn_->region_locator()
+      ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
+      .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
+      .onError([this](const exception_wrapper& e) {
+        OnError(e,
+                [this, e]() -> std::string {
+                  return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
+                         table_name_->qualifier() + " failed with e.what()=" +
+                         e.what().toStdString() + ", tries = " + std::to_string(tries_) +
+                         ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " +
+                         TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " +
+                         TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
+                },
+                [](const exception_wrapper& error) {});
+      });
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
+    const exception_wrapper& error, Supplier<std::string> err_msg,
+    Consumer<exception_wrapper> update_cached_location) {
+  ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
+  exceptions_->push_back(twec);
+  if (!ExceptionUtil::ShouldRetry(error) || tries_ >= max_retries_) {
+    CompleteExceptionally();
+    return;
+  }
+
+  if (tries_ > start_log_errors_count_) {
+    LOG(WARNING) << err_msg();
+  } else {
+    VLOG(1) << err_msg();
+  }
+
+  int64_t delay_ns;
+  if (operation_timeout_nanos_.count() > 0) {
+    int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+    if (max_delay_ns <= 0) {
+      CompleteExceptionally();
+      return;
+    }
+    delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
+  } else {
+    delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
+  }
+  update_cached_location(error);
+  tries_++;
+
+  /*
+   * The HHWheelTimer::scheduleTimeout() fails with an assertion from
+   * EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of
+   * the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry
+   * timer from IOThreadPool threads. It only works when executed from a single-thread pool
+   * (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should
+   * still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool.
+   * IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection
+   * establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread
+   * just hangs because it deadlocks itself.
+   */
+  conn_->retry_executor()->add([=]() {
+    retry_timer_->scheduleTimeoutFn(
+        [=]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); },
+        std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
+  });
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) {
+  int64_t call_timeout_ns;
+  if (operation_timeout_nanos_.count() > 0) {
+    call_timeout_ns = this->RemainingTimeNs();
+    if (call_timeout_ns <= 0) {
+      this->CompleteExceptionally();
+      return;
+    }
+    call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
+  } else {
+    call_timeout_ns = rpc_timeout_nanos_.count();
+  }
+
+  std::shared_ptr<RpcClient> rpc_client;
+
+  rpc_client = conn_->rpc_client();
+
+  ResetController(controller_, call_timeout_ns);
+
+  // TODO: RegionLocation should propagate through these method chains as a shared_ptr.
+  // Otherwise, it may get deleted underneat us. We are just copying for now.
+  auto loc_ptr = std::make_shared<RegionLocation>(loc);
+  callable_(controller_, loc_ptr, rpc_client)
+      .then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
+      .onError([&, loc_ptr, this](const exception_wrapper& e) {
+        OnError(
+            e,
+            [&, this, e]() -> std::string {
+              return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(),
+                                                 loc_ptr->server_name().port()) +
+                     " for '" + row_ + "' in " + loc_ptr->DebugString() + " of " +
+                     table_name_->namespace_() + "::" + table_name_->qualifier() +
+                     " failed with e.what()=" + e.what().toStdString() + ", tries = " +
+                     std::to_string(tries_) + ", maxAttempts = " + std::to_string(max_attempts_) +
+                     ", timeout = " + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
+                     " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
+            },
+            [&, this](const exception_wrapper& error) {
+              conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error);
+            });
+      });
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::CompleteExceptionally() {
+  this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+}
+
+template <typename RESP>
+int64_t AsyncSingleRequestRpcRetryingCaller<RESP>::RemainingTimeNs() {
+  return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController(
+    std::shared_ptr<HBaseRpcController> controller, const int64_t& timeout_ns) {
+  controller->Reset();
+  if (timeout_ns >= 0) {
+    controller->set_call_timeout(std::chrono::milliseconds(
+        std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns))));
+  }
+}
+
+// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the
+// templetized
+// class definitions.
+class OpenScannerResponse;
+template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
+template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
+template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>;
+template class AsyncSingleRequestRpcRetryingCaller<bool>;
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-rpc-retrying-test.cc b/hbase-native-client/src/hbase/client/async-rpc-retrying-test.cc
new file mode 100644
index 0000000..6782d05
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-rpc-retrying-test.cc
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Memory.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
+#include <gmock/gmock.h>
+#include <google/protobuf/stubs/callback.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <chrono>
+#include <functional>
+#include <string>
+
+#include "hbase/connection/request.h"
+#include "hbase/connection/response.h"
+#include "hbase/connection/rpc-client.h"
+#include "hbase/client/async-connection.h"
+#include "hbase/client/async-rpc-retrying-caller-factory.h"
+#include "hbase/client/async-rpc-retrying-caller.h"
+#include "hbase/client/client.h"
+#include "hbase/client/connection-configuration.h"
+#include "hbase/client/hbase-rpc-controller.h"
+#include "hbase/client/keyvalue-codec.h"
+#include "hbase/client/region-location.h"
+#include "hbase/client/request-converter.h"
+#include "hbase/client/response-converter.h"
+#include "hbase/client/result.h"
+#include "hbase/exceptions/exception.h"
+#include "hbase/if/Client.pb.h"
+#include "hbase/if/HBase.pb.h"
+#include "hbase/test-util/test-util.h"
+#include "hbase/utils/time-util.h"
+
+using hbase::AsyncRpcRetryingCallerFactory;
+using hbase::AsyncConnection;
+using hbase::AsyncRegionLocator;
+using hbase::ConnectionConfiguration;
+using hbase::Configuration;
+using hbase::HBaseRpcController;
+using hbase::RegionLocation;
+using hbase::RegionLocateType;
+using hbase::RpcClient;
+using hbase::RequestConverter;
+using hbase::ResponseConverter;
+using hbase::ReqConverter;
+using hbase::RespConverter;
+using hbase::Put;
+using hbase::TimeUtil;
+using hbase::Client;
+using hbase::security::User;
+
+using ::testing::Return;
+using ::testing::_;
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+using namespace hbase;
+
+using folly::exception_wrapper;
+
+class AsyncRpcRetryTest : public ::testing::Test {
+ public:
+  static std::unique_ptr<hbase::TestUtil> test_util;
+
+  static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
+    test_util = std::make_unique<hbase::TestUtil>();
+    test_util->StartMiniCluster(2);
+  }
+};
+std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr;
+
+class AsyncRegionLocatorBase : public AsyncRegionLocator {
+ public:
+  AsyncRegionLocatorBase() {}
+  explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
+      : region_location_(region_location) {}
+  virtual ~AsyncRegionLocatorBase() = default;
+
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
+                                                                     const std::string &,
+                                                                     const RegionLocateType,
+                                                                     const int64_t) override {
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    promise.setValue(region_location_);
+    return promise.getFuture();
+  }
+
+  virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
+    region_location_ = region_location;
+  }
+
+  void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {}
+
+ protected:
+  std::shared_ptr<RegionLocation> region_location_;
+};
+
+class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
+ public:
+  MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
+  explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockAsyncRegionLocator() {}
+};
+
+class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+  uint32_t tries_ = 0;
+  uint32_t num_fails_ = 0;
+
+ public:
+  explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
+      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+  explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockWrongRegionAsyncRegionLocator() {}
+
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override {
+    // Fail for num_fails_ times, then delegate to the super class which will give the correct
+    // region location.
+    if (tries_++ > num_fails_) {
+      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+    }
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    /* set random region name, simulating invalid region */
+    auto result = std::make_shared<RegionLocation>(
+        "whatever-region-name", region_location_->region_info(), region_location_->server_name());
+    promise.setValue(result);
+    return promise.getFuture();
+  }
+};
+
+class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+  uint32_t tries_ = 0;
+  uint32_t num_fails_ = 0;
+
+ public:
+  explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
+      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+  explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockFailingAsyncRegionLocator() {}
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override {
+    // Fail for num_fails_ times, then delegate to the super class which will give the correct
+    // region location.
+    if (tries_++ > num_fails_) {
+      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+    }
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    promise.setException(std::runtime_error{"Failed to look up region location"});
+    return promise.getFuture();
+  }
+};
+
+class MockAsyncConnection : public AsyncConnection,
+                            public std::enable_shared_from_this<MockAsyncConnection> {
+ public:
+  MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+                      std::shared_ptr<folly::HHWheelTimer> retry_timer,
+                      std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+                      std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                      std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
+                      std::shared_ptr<RpcClient> rpc_client,
+                      std::shared_ptr<AsyncRegionLocator> region_locator)
+      : conn_conf_(conn_conf),
+        retry_timer_(retry_timer),
+        cpu_executor_(cpu_executor),
+        io_executor_(io_executor),
+        retry_executor_(retry_executor),
+        rpc_client_(rpc_client),
+        region_locator_(region_locator) {}
+  ~MockAsyncConnection() {}
+  void Init() {
+    caller_factory_ =
+        std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
+  }
+
+  std::shared_ptr<Configuration> conf() override { return nullptr; }
+  std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
+    return caller_factory_;
+  }
+  std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
+  std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+    return retry_executor_;
+  }
+
+  void Close() override {}
+  std::shared_ptr<HBaseRpcController> CreateRpcController() override {
+    return std::make_shared<HBaseRpcController>();
+  }
+
+ private:
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+  std::shared_ptr<ConnectionConfiguration> conn_conf_;
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+  std::shared_ptr<RpcClient> rpc_client_;
+  std::shared_ptr<AsyncRegionLocator> region_locator_;
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
+};
+
+template <typename CONN>
+class MockRawAsyncTableImpl {
+ public:
+  explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {}
+  virtual ~MockRawAsyncTableImpl() = default;
+
+  /* implement this in real RawAsyncTableImpl. */
+
+  /* in real RawAsyncTableImpl, this should be private. */
+  folly::Future<std::shared_ptr<hbase::Result>> GetCall(
+      std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
+      std::shared_ptr<RegionLocation> loc, const hbase::Get &get) {
+    hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = [](
+        std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
+        std::shared_ptr<HBaseRpcController> controller,
+        std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
+      VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:"
+              << loc->DebugString();
+      return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
+                                   std::move(preq), User::defaultUser(), "ClientService");
+    };
+
+    return Call<hbase::Get, hbase::Request, hbase::Response, std::shared_ptr<hbase::Result>>(
+        rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call,
+        &hbase::ResponseConverter::FromGetResponse);
+  }
+
+  /* in real RawAsyncTableImpl, this should be private. */
+  template <typename REQ, typename PREQ, typename PRESP, typename RESP>
+  folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client,
+                           std::shared_ptr<HBaseRpcController> controller,
+                           std::shared_ptr<RegionLocation> loc, const REQ &req,
+                           ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
+                           hbase::RpcCall<PREQ, PRESP> rpc_call,
+                           RespConverter<RESP, PRESP> resp_converter) {
+    promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
+    auto f = promise_->getFuture();
+    VLOG(1) << "calling rpc_call";
+    rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
+        .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) {
+          VLOG(1) << "MockRawAsyncTableImpl#call succeded: ";
+          RESP result = resp_converter(*presp);
+          promise_->setValue(result);
+        })
+        .onError([this](const exception_wrapper &e) {
+          VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what();
+          VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name();
+          promise_->setException(e);
+        });
+    return f;
+  }
+
+ private:
+  std::shared_ptr<CONN> conn_;
+  std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_;
+};
+
+void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName,
+             uint32_t operation_timeout_millis = 1200000) {
+  AsyncRpcRetryTest::test_util->CreateTable(tableName, "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(tableName);
+  auto row = "test2";
+
+  // Get to be performed on above HBase Table
+  hbase::Get get(row);
+
+  // Create a client
+  Client client(*(AsyncRpcRetryTest::test_util->conf()));
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+
+  table->Put(Put{"test2"}.AddColumn("d", "2", "value2"));
+  table->Put(Put{"test2"}.AddColumn("d", "extra", "value for extra"));
+
+  /* init region location and rpc channel */
+  auto region_location = table->GetRegionLocation(row);
+
+  // auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4);
+  auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+  auto io_executor_ = client.async_connection()->io_executor();
+  auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  auto codec = std::make_shared<hbase::KeyValueCodec>();
+  auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
+                                                AsyncRpcRetryTest::test_util->conf());
+  // auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true);
+  std::shared_ptr<folly::HHWheelTimer> retry_timer =
+      folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
+
+  /* init connection configuration */
+  auto connection_conf = std::make_shared<ConnectionConfiguration>(
+      TimeUtil::SecondsToNanos(20),                       // connect_timeout
+      TimeUtil::MillisToNanos(operation_timeout_millis),  // operation_timeout
+      TimeUtil::SecondsToNanos(60),                       // rpc_timeout
+      TimeUtil::MillisToNanos(100),                       // pause
+      5,                                                  // max retries
+      9);                                                 // start log errors count
+
+  /* set region locator */
+  region_locator->set_region_location(region_location);
+
+  /* init hbase client connection */
+  auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+                                                    io_executor_, retry_executor_, rpc_client,
+                                                    region_locator);
+  conn->Init();
+
+  /* init retry caller factory */
+  auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn);
+
+  /* init request caller builder */
+  auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>();
+
+  /* call with retry to get result */
+
+  auto async_caller =
+      builder->table(std::make_shared<hbase::pb::TableName>(tn))
+          ->row(row)
+          ->rpc_timeout(conn->connection_conf()->read_rpc_timeout())
+          ->operation_timeout(conn->connection_conf()->operation_timeout())
+          ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
+                             std::shared_ptr<hbase::RegionLocation> loc,
+                             std::shared_ptr<hbase::RpcClient> rpc_client)
+                       -> folly::Future<std::shared_ptr<hbase::Result>> {
+                         return tableImpl->GetCall(rpc_client, controller, loc, get);
+                       })
+          ->Build();
+
+  auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
+
+  auto result = async_caller->Call().get(milliseconds(500000));
+
+  // Test the values, should be same as in put executed on hbase shell
+  ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+  EXPECT_EQ("test2", result->Row());
+  EXPECT_EQ("value2", *(result->Value("d", "2")));
+  EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
+
+  retry_timer->destroy();
+  table->Close();
+  client.Close();
+  retry_executor_->stop();
+}
+
+// Test successful case
+TEST_F(AsyncRpcRetryTest, TestGetBasic) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockAsyncRegionLocator>());
+  runTest(region_locator, "table1");
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncRpcRetryTest, TestHandleException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+  runTest(region_locator, "table2");
+}
+
+// Tests the RPC failing 5 times, throwing an exception
+TEST_F(AsyncRpcRetryTest, TestFailWithException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(5));
+  EXPECT_ANY_THROW(runTest(region_locator, "table3"));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(3));
+  runTest(region_locator, "table4");
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(5));
+  EXPECT_ANY_THROW(runTest(region_locator, "table5"));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(3));
+  EXPECT_ANY_THROW(runTest(region_locator, "table6", 200));
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-scan-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-scan-rpc-retrying-caller.cc b/hbase-native-client/src/hbase/client/async-scan-rpc-retrying-caller.cc
new file mode 100644
index 0000000..2189128
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-scan-rpc-retrying-caller.cc
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-scan-rpc-retrying-caller.h"
+
+namespace hbase {
+
+ScanResumerImpl::ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
+    : caller_(caller), mutex_() {}
+
+void ScanResumerImpl::Resume() {
+  // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
+  // just return at the first if condition without loading the resp and numValidResuls field. If
+  // resume is called after suspend, then it is also safe to just reference resp and
+  // numValidResults after the synchronized block as no one will change it anymore.
+  std::shared_ptr<pb::ScanResponse> local_resp;
+  int64_t local_num_complete_rows;
+
+  {
+    std::unique_lock<std::mutex> mlock{mutex_};
+    if (state_ == ScanResumerState::kInitialized) {
+      // user calls this method before we call prepare, so just set the state to
+      // RESUMED, the implementation will just go on.
+      state_ = ScanResumerState::kResumed;
+      return;
+    }
+    if (state_ == ScanResumerState::kResumed) {
+      // already resumed, give up.
+      return;
+    }
+    state_ = ScanResumerState::kResumed;
+    local_resp = resp_;
+    local_num_complete_rows = num_complete_rows_;
+  }
+
+  caller_->CompleteOrNext(local_resp);
+}
+
+bool ScanResumerImpl::Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows) {
+  std::unique_lock<std::mutex> mlock(mutex_);
+  if (state_ == ScanResumerState::kResumed) {
+    // user calls resume before we actually suspend the scan, just continue;
+    return false;
+  }
+  state_ = ScanResumerState::kSuspended;
+  resp_ = resp;
+  num_complete_rows_ = num_complete_rows;
+
+  return true;
+}
+
+ScanControllerImpl::ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
+    : caller_(caller) {}
+
+std::shared_ptr<ScanResumer> ScanControllerImpl::Suspend() {
+  PreCheck();
+  state_ = ScanControllerState::kSuspended;
+  resumer_ = std::make_shared<ScanResumerImpl>(caller_);
+  return resumer_;
+}
+
+void ScanControllerImpl::Terminate() {
+  PreCheck();
+  state_ = ScanControllerState::kTerminated;
+}
+
+// return the current state, and set the state to DESTROYED.
+ScanControllerState ScanControllerImpl::Destroy() {
+  ScanControllerState state = state_;
+  state_ = ScanControllerState::kDestroyed;
+  return state;
+}
+
+void ScanControllerImpl::PreCheck() {
+  CHECK(std::this_thread::get_id() == caller_thread_id_)
+      << "The current thread is" << std::this_thread::get_id() << ", expected thread is "
+      << caller_thread_id_ << ", you should not call this method outside OnNext or OnHeartbeat";
+
+  CHECK(state_ == ScanControllerState::kInitialized) << "Invalid Stopper state "
+                                                     << DebugString(state_);
+}
+
+std::string ScanControllerImpl::DebugString(ScanControllerState state) {
+  switch (state) {
+    case ScanControllerState::kInitialized:
+      return "kInitialized";
+    case ScanControllerState::kSuspended:
+      return "kSuspended";
+    case ScanControllerState::kTerminated:
+      return "kTerminated";
+    case ScanControllerState::kDestroyed:
+      return "kDestroyed";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+std::string ScanControllerImpl::DebugString(ScanResumerState state) {
+  switch (state) {
+    case ScanResumerState::kInitialized:
+      return "kInitialized";
+    case ScanResumerState::kSuspended:
+      return "kSuspended";
+    case ScanResumerState::kResumed:
+      return "kResumed";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+AsyncScanRpcRetryingCaller::AsyncScanRpcRetryingCaller(
+    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+    std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<Scan> scan, int64_t scanner_id,
+    std::shared_ptr<ScanResultCache> results_cache, std::shared_ptr<RawScanResultConsumer> consumer,
+    std::shared_ptr<RegionLocation> region_location, nanoseconds scanner_lease_timeout_nanos,
+    nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
+    nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+    : conn_(conn),
+      retry_timer_(retry_timer),
+      rpc_client_(rpc_client),
+      scan_(scan),
+      scanner_id_(scanner_id),
+      results_cache_(results_cache),
+      consumer_(consumer),
+      region_location_(region_location),
+      scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos),
+      pause_(pause),
+      max_retries_(max_retries),
+      scan_timeout_nanos_(scan_timeout_nanos),
+      rpc_timeout_nanos_(rpc_timeout_nanos),
+      start_log_errors_count_(start_log_errors_count),
+      promise_(std::make_shared<folly::Promise<bool>>()),
+      tries_(1) {
+  controller_ = conn_->CreateRpcController();
+  start_ns_ = TimeUtil::GetNowNanos();
+  max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+  exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+}
+
+folly::Future<bool> AsyncScanRpcRetryingCaller::Start(
+    std::shared_ptr<HBaseRpcController> controller,
+    std::shared_ptr<pb::ScanResponse> open_scan_resp,
+    const std::shared_ptr<CellScanner> cell_scanner) {
+  OnComplete(controller, open_scan_resp, cell_scanner);
+  return promise_->getFuture();
+}
+
+int64_t AsyncScanRpcRetryingCaller::RemainingTimeNs() {
+  return scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller,
+                                            std::shared_ptr<pb::ScanResponse> resp,
+                                            const std::shared_ptr<CellScanner> cell_scanner) {
+  VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_;
+
+  if (controller->Failed()) {
+    OnError(controller->exception());
+    return;
+  }
+
+  bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message();
+
+  int64_t num_complete_rows_before = results_cache_->num_complete_rows();
+  try {
+    auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner);
+
+    auto results = results_cache_->AddAndGet(raw_results, is_heartbeat);
+
+    auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this());
+
+    if (results.size() > 0) {
+      UpdateNextStartRowWhenError(*results[results.size() - 1]);
+      VLOG(5) << "Calling consumer->OnNext()";
+      consumer_->OnNext(results, scan_controller);
+    } else if (is_heartbeat) {
+      consumer_->OnHeartbeat(scan_controller);
+    }
+
+    ScanControllerState state = scan_controller->Destroy();
+    if (state == ScanControllerState::kTerminated) {
+      if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
+        // we have more results in region but user request to stop the scan, so we need to close the
+        // scanner explicitly.
+        CloseScanner();
+      }
+      CompleteNoMoreResults();
+      return;
+    }
+
+    int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before;
+    if (state == ScanControllerState::kSuspended) {
+      if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) {
+        return;
+      }
+    }
+  } catch (const std::runtime_error& e) {
+    // We can not retry here. The server has responded normally and the call sequence has been
+    // increased so a new scan with the same call sequence will cause an
+    // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
+    LOG(WARNING) << "Received exception in reading the scan response:" << e.what();
+    CompleteWhenError(true);
+    return;
+  }
+
+  CompleteOrNext(resp);
+}
+
+void AsyncScanRpcRetryingCaller::CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp) {
+  VLOG(5) << "Scan: CompleteOrNext, scanner_id" << scanner_id_
+          << ", response:" << resp->ShortDebugString();
+
+  if (resp->has_more_results() && !resp->more_results()) {
+    // RS tells us there is no more data for the whole scan
+    CompleteNoMoreResults();
+    return;
+  }
+  // TODO: Implement Scan::limit(), and check the limit here
+
+  if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
+    // TODO: check whether Scan is reversed here
+    CompleteWhenNoMoreResultsInRegion();
+    return;
+  }
+  Next();
+}
+
+void AsyncScanRpcRetryingCaller::CompleteExceptionally(bool close_scanner) {
+  VLOG(5) << "Scan: CompleteExceptionally";
+  results_cache_->Clear();
+  if (close_scanner) {
+    CloseScanner();
+  }
+  this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+}
+
+void AsyncScanRpcRetryingCaller::CompleteNoMoreResults() {
+  // In master code, scanners auto-close if we have exhausted the region. It may not be the case
+  // in branch-1 code. If this is backported, make sure that the scanner is closed.
+  VLOG(5) << "Scan: CompleteNoMoreResults, scanner_id:" << scanner_id_;
+  promise_->setValue(false);
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWhenNoMoreResultsInRegion() {
+  VLOG(5) << "Scan: CompleteWhenNoMoreResultsInRegion, scanner_id:" << scanner_id_;
+  // In master code, scanners auto-close if we have exhausted the region. It may not be the case
+  // in branch-1 code. If this is backported, make sure that the scanner is closed.
+  if (NoMoreResultsForScan(*scan_, region_location_->region_info())) {
+    CompleteNoMoreResults();
+  } else {
+    CompleteWithNextStartRow(region_location_->region_info().end_key(), true);
+  }
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWithNextStartRow(std::string row, bool inclusive) {
+  VLOG(5) << "Scan: CompleteWithNextStartRow: region scan is complete, move to next region";
+  scan_->SetStartRow(row);
+  // TODO: set inclusive if it is reverse scans
+  promise_->setValue(true);
+}
+
+void AsyncScanRpcRetryingCaller::UpdateNextStartRowWhenError(const Result& result) {
+  next_start_row_when_error_ = optional<std::string>(result.Row());
+  include_next_start_row_when_error_ = result.Partial();
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWhenError(bool close_scanner) {
+  VLOG(5) << "Scan: CompleteWhenError, scanner_id:" << scanner_id_;
+  results_cache_->Clear();
+  if (close_scanner) {
+    CloseScanner();
+  }
+  if (next_start_row_when_error_) {
+    // TODO: HBASE-17583 adds include start / stop row to the Scan. Once we rebase and implement
+    // those options in Scan , we can start using that here.
+    scan_->SetStartRow(include_next_start_row_when_error_
+                           ? *next_start_row_when_error_
+                           : BytesUtil::CreateClosestRowAfter(*next_start_row_when_error_));
+  }
+  promise_->setValue(true);
+}
+
+void AsyncScanRpcRetryingCaller::OnError(const folly::exception_wrapper& error) {
+  VLOG(5) << "Scan: OnError, scanner_id:" << scanner_id_;
+  if (tries_ > start_log_errors_count_ || VLOG_IS_ON(5)) {
+    LOG(WARNING) << "Call to " << region_location_->server_name().ShortDebugString()
+                 << " for scanner id = " << scanner_id_ << " for "
+                 << region_location_->region_info().ShortDebugString()
+                 << " failed, , tries = " << tries_ << ", maxAttempts = " << max_attempts_
+                 << ", timeout = " << TimeUtil::ToMillis(scan_timeout_nanos_).count()
+                 << " ms, time elapsed = " << TimeUtil::ElapsedMillis(start_ns_) << " ms"
+                 << error.what().toStdString();
+  }
+
+  bool scanner_closed = ExceptionUtil::IsScannerClosed(error);
+  ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
+  exceptions_->push_back(twec);
+  if (tries_ >= max_retries_) {
+    CompleteExceptionally(!scanner_closed);
+    return;
+  }
+
+  int64_t delay_ns;
+  if (scan_timeout_nanos_.count() > 0) {
+    int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+    if (max_delay_ns <= 0) {
+      CompleteExceptionally(!scanner_closed);
+      return;
+    }
+    delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
+  } else {
+    delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
+  }
+
+  if (scanner_closed) {
+    CompleteWhenError(false);
+    return;
+  }
+
+  if (ExceptionUtil::IsScannerOutOfOrder(error)) {
+    CompleteWhenError(true);
+    return;
+  }
+  if (!ExceptionUtil::ShouldRetry(error)) {
+    CompleteExceptionally(true);
+    return;
+  }
+  tries_++;
+
+  auto self(shared_from_this());
+  conn_->retry_executor()->add([&]() {
+    retry_timer_->scheduleTimeoutFn(
+        [self]() { self->conn_->cpu_executor()->add([&]() { self->Call(); }); },
+        std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
+  });
+}
+
+bool AsyncScanRpcRetryingCaller::NoMoreResultsForScan(const Scan& scan,
+                                                      const pb::RegionInfo& info) {
+  if (BytesUtil::IsEmptyStopRow(info.end_key())) {
+    return true;
+  }
+  if (BytesUtil::IsEmptyStopRow(scan.StopRow())) {
+    return false;
+  }
+  int32_t c = BytesUtil::CompareTo(info.end_key(), scan.StopRow());
+  // 1. if our stop row is less than the endKey of the region
+  // 2. if our stop row is equal to the endKey of the region and we do not include the stop row
+  // for scan.
+  return c > 0 ||
+         (c == 0 /* && !scan.IncludeStopRow()*/);  // TODO: Scans always exclude StopRow for now.
+}
+
+void AsyncScanRpcRetryingCaller::Next() {
+  VLOG(5) << "Scan: Next";
+  next_call_seq_++;
+  tries_ = 1;
+  exceptions_->clear();
+  start_ns_ = TimeUtil::GetNowNanos();
+  Call();
+}
+
+void AsyncScanRpcRetryingCaller::Call() {
+  VLOG(5) << "Scan: Call";
+  auto self(shared_from_this());
+  // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
+  // less than the scan timeout. If the server does not respond in time(usually this will not
+  // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
+  // resending the next request and the only way to fix this is to close the scanner and open a
+  // new one.
+  int64_t call_timeout_nanos;
+  if (scan_timeout_nanos_.count() > 0) {
+    int64_t remaining_nanos = scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+    if (remaining_nanos <= 0) {
+      CompleteExceptionally(true);
+      return;
+    }
+    call_timeout_nanos = remaining_nanos;
+  } else {
+    call_timeout_nanos = 0L;
+  }
+
+  ResetController(controller_, call_timeout_nanos);
+
+  auto req =
+      RequestConverter::ToScanRequest(scanner_id_, scan_->Caching(), false, next_call_seq_, false);
+
+  // do the RPC call
+  rpc_client_
+      ->AsyncCall(region_location_->server_name().host_name(),
+                  region_location_->server_name().port(), std::move(req),
+                  security::User::defaultUser(), "ClientService")
+      .via(conn_->cpu_executor().get())
+      .then([self, this](const std::unique_ptr<Response>& resp) {
+        auto scan_resp = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
+        return OnComplete(controller_, scan_resp, resp->cell_scanner());
+      })
+      .onError([self, this](const folly::exception_wrapper& e) { OnError(e); });
+}
+
+void AsyncScanRpcRetryingCaller::CloseScanner() {
+  auto self(shared_from_this());
+  ResetController(controller_, rpc_timeout_nanos_.count());
+
+  VLOG(5) << "Closing scanner with scanner_id:" << folly::to<std::string>(scanner_id_);
+
+  // Do a close scanner RPC. Fire and forget.
+  auto req = RequestConverter::ToScanRequest(scanner_id_, 0, true);
+  rpc_client_
+      ->AsyncCall(region_location_->server_name().host_name(),
+                  region_location_->server_name().port(), std::move(req),
+                  security::User::defaultUser(), "ClientService")
+      .onError([self, this](const folly::exception_wrapper& e) -> std::unique_ptr<Response> {
+        LOG(WARNING) << "Call to " + region_location_->server_name().ShortDebugString() +
+                            " for closing scanner_id = " + folly::to<std::string>(scanner_id_) +
+                            " for " + region_location_->region_info().ShortDebugString() +
+                            " failed, ignore, probably already closed. Exception:" +
+                            e.what().toStdString();
+        return nullptr;
+      });
+}
+
+void AsyncScanRpcRetryingCaller::ResetController(std::shared_ptr<HBaseRpcController> controller,
+                                                 const int64_t& timeout_nanos) {
+  controller->Reset();
+  if (timeout_nanos >= 0) {
+    controller->set_call_timeout(
+        milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_nanos))));
+  }
+}
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-table-result-scanner.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-table-result-scanner.cc b/hbase-native-client/src/hbase/client/async-table-result-scanner.cc
new file mode 100644
index 0000000..3812b3a
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-table-result-scanner.cc
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-table-result-scanner.h"
+
+#include <vector>
+
+namespace hbase {
+AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size)
+    : max_cache_size_(max_cache_size) {
+  closed_ = false;
+  cache_size_ = 0;
+}
+
+AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); }
+
+void AsyncTableResultScanner::Close() {
+  std::unique_lock<std::mutex> mlock(mutex_);
+  closed_ = true;
+  while (!queue_.empty()) {
+    queue_.pop();
+  }
+  cache_size_ = 0;
+  if (resumer_ != nullptr) {
+    resumer_->Resume();
+  }
+  cond_.notify_all();
+}
+
+std::shared_ptr<Result> AsyncTableResultScanner::Next() {
+  VLOG(5) << "AsyncTableResultScanner: Next()";
+
+  std::shared_ptr<Result> result = nullptr;
+  std::shared_ptr<ScanResumer> local_resumer = nullptr;
+  {
+    std::unique_lock<std::mutex> mlock(mutex_);
+    while (queue_.empty()) {
+      if (closed_) {
+        return nullptr;
+      }
+      if (error_) {
+        throw error_;
+      }
+      cond_.wait(mlock);
+    }
+    result = queue_.front();
+    queue_.pop();
+
+    cache_size_ -= EstimatedSizeWithSharedPtr(result);
+    if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) {
+      VLOG(1) << std::this_thread::get_id() << " resume scan prefetching";
+      local_resumer = resumer_;
+      resumer_ = nullptr;
+    }
+  }
+
+  // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that
+  // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC
+  // in the same event thread before returning from the previous call. This seems like the
+  // wrong thing to do(™), but we cannot fix that now. Since the call back can end up calling
+  // this::OnNext(), we should unlock the mutex.
+  if (local_resumer != nullptr) {
+    local_resumer->Resume();
+  }
+  return result;
+}
+
+void AsyncTableResultScanner::AddToCache(const std::vector<std::shared_ptr<Result>> &results) {
+  VLOG(5) << "AsyncTableResultScanner: AddToCache()";
+  for (const auto r : results) {
+    queue_.push(r);
+    cache_size_ += EstimatedSizeWithSharedPtr(r);
+  }
+}
+
+template <typename T>
+inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr<T> t) {
+  return t->EstimatedSize() + sizeof(std::shared_ptr<T>);
+}
+
+void AsyncTableResultScanner::OnNext(const std::vector<std::shared_ptr<Result>> &results,
+                                     std::shared_ptr<ScanController> controller) {
+  VLOG(5) << "AsyncTableResultScanner: OnNext()";
+  {
+    std::unique_lock<std::mutex> mlock(mutex_);
+    if (closed_) {
+      controller->Terminate();
+      return;
+    }
+    AddToCache(results);
+
+    if (cache_size_ >= max_cache_size_) {
+      StopPrefetch(controller);
+    }
+  }
+  cond_.notify_all();
+}
+
+void AsyncTableResultScanner::StopPrefetch(std::shared_ptr<ScanController> controller) {
+  VLOG(1) << std::this_thread::get_id()
+          << ": stop prefetching when scanning as the cache size " +
+                 folly::to<std::string>(cache_size_) + " is greater than the max_cache_size " +
+                 folly::to<std::string>(max_cache_size_);
+
+  resumer_ = controller->Suspend();
+  num_prefetch_stopped_++;
+}
+
+/**
+ * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+ * onNext.
+ * <p>
+ * This method give you a chance to terminate a slow scan operation.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ *          instance is only valid within the scope of onHeartbeat method. You can only call its
+ *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+ */
+void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr<ScanController> controller) {
+  std::unique_lock<std::mutex> mlock(mutex_);
+  if (closed_) {
+    controller->Terminate();
+  }
+}
+
+/**
+ * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+ * <p>
+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+ */
+void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) {
+  LOG(WARNING) << "Scanner received error" << error.what();
+  std::unique_lock<std::mutex> mlock(mutex_);
+  error_ = error;
+  cond_.notify_all();
+}
+
+/**
+ * Indicate that the scan operation is completed normally.
+ */
+void AsyncTableResultScanner::OnComplete() {
+  std::unique_lock<std::mutex> mlock(mutex_);
+  closed_ = true;
+  cond_.notify_all();
+}
+}  // namespace hbase