You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/09/15 21:20:39 UTC

[22/25] hbase git commit: HBASE-18725 [C++] Install header files as well as library

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
deleted file mode 100644
index 188f469..0000000
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Logging.h>
-#include <folly/io/async/EventBase.h>
-#include <chrono>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "connection/rpc-client.h"
-#include "core/async-batch-rpc-retrying-caller.h"
-#include "core/async-rpc-retrying-caller.h"
-#include "core/async-scan-rpc-retrying-caller.h"
-#include "core/raw-scan-result-consumer.h"
-#include "core/region-location.h"
-#include "core/row.h"
-#include "core/scan-result-cache.h"
-#include "core/scan.h"
-
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-
-namespace hbase {
-
-class AsyncConnection;
-
-template <typename RESP>
-class SingleRequestCallerBuilder
-    : public std::enable_shared_from_this<SingleRequestCallerBuilder<RESP>> {
- public:
-  explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn,
-                                      std::shared_ptr<folly::HHWheelTimer> retry_timer)
-      : conn_(conn),
-        retry_timer_(retry_timer),
-        table_name_(nullptr),
-        rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
-        pause_(conn->connection_conf()->pause()),
-        operation_timeout_nanos_(conn->connection_conf()->operation_timeout()),
-        max_retries_(conn->connection_conf()->max_retries()),
-        start_log_errors_count_(conn->connection_conf()->start_log_errors_count()),
-        locate_type_(RegionLocateType::kCurrent) {}
-
-  virtual ~SingleRequestCallerBuilder() = default;
-
-  typedef SingleRequestCallerBuilder<RESP> GenericThisType;
-  typedef std::shared_ptr<GenericThisType> SharedThisPtr;
-
-  SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
-    table_name_ = table_name;
-    return shared_this();
-  }
-
-  SharedThisPtr rpc_timeout(std::chrono::nanoseconds rpc_timeout_nanos) {
-    rpc_timeout_nanos_ = rpc_timeout_nanos;
-    return shared_this();
-  }
-
-  SharedThisPtr operation_timeout(std::chrono::nanoseconds operation_timeout_nanos) {
-    operation_timeout_nanos_ = operation_timeout_nanos;
-    return shared_this();
-  }
-
-  SharedThisPtr pause(std::chrono::nanoseconds pause) {
-    pause_ = pause;
-    return shared_this();
-  }
-
-  SharedThisPtr max_retries(uint32_t max_retries) {
-    max_retries_ = max_retries;
-    return shared_this();
-  }
-
-  SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) {
-    start_log_errors_count_ = start_log_errors_count;
-    return shared_this();
-  }
-
-  SharedThisPtr row(const std::string& row) {
-    row_ = row;
-    return shared_this();
-  }
-
-  SharedThisPtr locate_type(RegionLocateType locate_type) {
-    locate_type_ = locate_type;
-    return shared_this();
-  }
-
-  SharedThisPtr action(Callable<RESP> callable) {
-    callable_ = callable;
-    return shared_this();
-  }
-
-  folly::Future<RESP> Call() { return Build()->Call(); }
-
-  std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<RESP>> Build() {
-    return std::make_shared<AsyncSingleRequestRpcRetryingCaller<RESP>>(
-        conn_, retry_timer_, table_name_, row_, locate_type_, callable_, pause_, max_retries_,
-        operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
-  }
-
- private:
-  SharedThisPtr shared_this() {
-    return std::enable_shared_from_this<GenericThisType>::shared_from_this();
-  }
-
- private:
-  std::shared_ptr<AsyncConnection> conn_;
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<pb::TableName> table_name_;
-  std::chrono::nanoseconds rpc_timeout_nanos_;
-  std::chrono::nanoseconds operation_timeout_nanos_;
-  std::chrono::nanoseconds pause_;
-  uint32_t max_retries_;
-  uint32_t start_log_errors_count_;
-  std::string row_;
-  RegionLocateType locate_type_;
-  Callable<RESP> callable_;
-};  // end of SingleRequestCallerBuilder
-
-template <typename REQ, typename RESP>
-class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder<REQ, RESP>> {
- public:
-  explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn,
-                              std::shared_ptr<folly::HHWheelTimer> retry_timer)
-      : conn_(conn), retry_timer_(retry_timer) {}
-
-  virtual ~BatchCallerBuilder() = default;
-
-  typedef std::shared_ptr<BatchCallerBuilder<REQ, RESP>> SharedThisPtr;
-
-  SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
-    table_name_ = table_name;
-    return shared_this();
-  }
-
-  SharedThisPtr actions(std::shared_ptr<std::vector<REQ>> actions) {
-    actions_ = actions;
-    return shared_this();
-  }
-
-  SharedThisPtr operation_timeout(std::chrono::nanoseconds operation_timeout_nanos) {
-    operation_timeout_nanos_ = operation_timeout_nanos;
-    return shared_this();
-  }
-
-  SharedThisPtr rpc_timeout(std::chrono::nanoseconds rpc_timeout_nanos) {
-    rpc_timeout_nanos_ = rpc_timeout_nanos;
-    return shared_this();
-  }
-
-  SharedThisPtr pause(std::chrono::nanoseconds pause_ns) {
-    pause_ns_ = pause_ns;
-    return shared_this();
-  }
-
-  SharedThisPtr max_attempts(int32_t max_attempts) {
-    max_attempts_ = max_attempts;
-    return shared_this();
-  }
-
-  SharedThisPtr start_log_errors_count(int32_t start_log_errors_count) {
-    start_log_errors_count_ = start_log_errors_count;
-    return shared_this();
-  }
-
-  folly::Future<std::vector<folly::Try<RESP>>> Call() { return Build()->Call(); }
-
-  std::shared_ptr<AsyncBatchRpcRetryingCaller<REQ, RESP>> Build() {
-    return std::make_shared<AsyncBatchRpcRetryingCaller<REQ, RESP>>(
-        conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_,
-        operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
-  }
-
- private:
-  SharedThisPtr shared_this() {
-    return std::enable_shared_from_this<BatchCallerBuilder>::shared_from_this();
-  }
-
- private:
-  std::shared_ptr<AsyncConnection> conn_;
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr;
-  std::shared_ptr<std::vector<REQ>> actions_ = nullptr;
-  std::chrono::nanoseconds pause_ns_;
-  int32_t max_attempts_ = 0;
-  std::chrono::nanoseconds operation_timeout_nanos_;
-  std::chrono::nanoseconds rpc_timeout_nanos_;
-  int32_t start_log_errors_count_ = 0;
-};
-
-class ScanCallerBuilder : public std::enable_shared_from_this<ScanCallerBuilder> {
- public:
-  explicit ScanCallerBuilder(std::shared_ptr<AsyncConnection> conn,
-                             std::shared_ptr<folly::HHWheelTimer> retry_timer)
-      : conn_(conn),
-        retry_timer_(retry_timer),
-        rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
-        pause_(conn->connection_conf()->pause()),
-        scan_timeout_nanos_(conn->connection_conf()->scan_timeout()),
-        max_retries_(conn->connection_conf()->max_retries()),
-        start_log_errors_count_(conn->connection_conf()->start_log_errors_count()),
-        scanner_id_(-1) {}
-
-  virtual ~ScanCallerBuilder() = default;
-
-  typedef ScanCallerBuilder GenericThisType;
-  typedef std::shared_ptr<ScanCallerBuilder> SharedThisPtr;
-
-  SharedThisPtr rpc_client(std::shared_ptr<hbase::RpcClient> rpc_client) {
-    rpc_client_ = rpc_client;
-    return shared_this();
-  }
-
-  SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) {
-    rpc_timeout_nanos_ = rpc_timeout_nanos;
-    return shared_this();
-  }
-
-  SharedThisPtr scan_timeout(nanoseconds scan_timeout_nanos) {
-    scan_timeout_nanos_ = scan_timeout_nanos;
-    return shared_this();
-  }
-
-  SharedThisPtr scanner_lease_timeout(nanoseconds scanner_lease_timeout_nanos) {
-    scanner_lease_timeout_nanos_ = scanner_lease_timeout_nanos;
-    return shared_this();
-  }
-
-  SharedThisPtr pause(nanoseconds pause) {
-    pause_ = pause;
-    return shared_this();
-  }
-
-  SharedThisPtr max_retries(uint32_t max_retries) {
-    max_retries_ = max_retries;
-    return shared_this();
-  }
-
-  SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) {
-    start_log_errors_count_ = start_log_errors_count;
-    return shared_this();
-  }
-
-  SharedThisPtr region_location(std::shared_ptr<RegionLocation> region_location) {
-    region_location_ = region_location;
-    return shared_this();
-  }
-
-  SharedThisPtr scanner_id(int64_t scanner_id) {
-    scanner_id_ = scanner_id;
-    return shared_this();
-  }
-
-  SharedThisPtr scan(std::shared_ptr<Scan> scan) {
-    scan_ = scan;
-    return shared_this();
-  }
-
-  SharedThisPtr results_cache(std::shared_ptr<ScanResultCache> results_cache) {
-    results_cache_ = results_cache;
-    return shared_this();
-  }
-
-  SharedThisPtr consumer(std::shared_ptr<RawScanResultConsumer> consumer) {
-    consumer_ = consumer;
-    return shared_this();
-  }
-
-  std::shared_ptr<AsyncScanRpcRetryingCaller> Build() {
-    return std::make_shared<AsyncScanRpcRetryingCaller>(
-        conn_, retry_timer_, rpc_client_, scan_, scanner_id_, results_cache_, consumer_,
-        region_location_, scanner_lease_timeout_nanos_, pause_, max_retries_, scan_timeout_nanos_,
-        rpc_timeout_nanos_, start_log_errors_count_);
-  }
-
- private:
-  SharedThisPtr shared_this() {
-    return std::enable_shared_from_this<GenericThisType>::shared_from_this();
-  }
-
- private:
-  std::shared_ptr<AsyncConnection> conn_;
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<hbase::RpcClient> rpc_client_;
-  std::shared_ptr<Scan> scan_;
-  nanoseconds rpc_timeout_nanos_;
-  nanoseconds scan_timeout_nanos_;
-  nanoseconds scanner_lease_timeout_nanos_;
-  nanoseconds pause_;
-  uint32_t max_retries_;
-  uint32_t start_log_errors_count_;
-  std::shared_ptr<RegionLocation> region_location_;
-  int64_t scanner_id_;
-  std::shared_ptr<RawScanResultConsumer> consumer_;
-  std::shared_ptr<ScanResultCache> results_cache_;
-};  // end of ScanCallerBuilder
-
-class AsyncRpcRetryingCallerFactory {
- private:
-  std::shared_ptr<AsyncConnection> conn_;
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-
- public:
-  explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn,
-                                         std::shared_ptr<folly::HHWheelTimer> retry_timer)
-      : conn_(conn), retry_timer_(retry_timer) {}
-
-  virtual ~AsyncRpcRetryingCallerFactory() = default;
-
-  template <typename RESP>
-  std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() {
-    return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_);
-  }
-
-  template <typename REQ, typename RESP>
-  std::shared_ptr<BatchCallerBuilder<REQ, RESP>> Batch() {
-    return std::make_shared<BatchCallerBuilder<REQ, RESP>>(conn_, retry_timer_);
-  }
-
-  std::shared_ptr<ScanCallerBuilder> Scan() {
-    return std::make_shared<ScanCallerBuilder>(conn_, retry_timer_);
-  }
-};
-
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
deleted file mode 100644
index 8e60991..0000000
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-rpc-retrying-caller.h"
-
-#include <folly/Conv.h>
-#include <folly/ExceptionWrapper.h>
-#include <folly/Format.h>
-#include <folly/Logging.h>
-#include <folly/Unit.h>
-
-#include "connection/rpc-client.h"
-#include "core/async-connection.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/region-location.h"
-#include "core/result.h"
-#include "exceptions/exception.h"
-#include "if/HBase.pb.h"
-#include "utils/connection-util.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
-
-using folly::exception_wrapper;
-
-namespace hbase {
-
-template <typename RESP>
-AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
-    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
-    std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row,
-    RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause,
-    uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos,
-    std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
-    : conn_(conn),
-      retry_timer_(retry_timer),
-      table_name_(table_name),
-      row_(row),
-      locate_type_(locate_type),
-      callable_(callable),
-      pause_(pause),
-      max_retries_(max_retries),
-      operation_timeout_nanos_(operation_timeout_nanos),
-      rpc_timeout_nanos_(rpc_timeout_nanos),
-      start_log_errors_count_(start_log_errors_count),
-      promise_(std::make_shared<folly::Promise<RESP>>()),
-      tries_(1) {
-  controller_ = conn_->CreateRpcController();
-  start_ns_ = TimeUtil::GetNowNanos();
-  max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
-  exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
-}
-
-template <typename RESP>
-AsyncSingleRequestRpcRetryingCaller<RESP>::~AsyncSingleRequestRpcRetryingCaller() {}
-
-template <typename RESP>
-folly::Future<RESP> AsyncSingleRequestRpcRetryingCaller<RESP>::Call() {
-  auto f = promise_->getFuture();
-  LocateThenCall();
-  return f;
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
-  int64_t locate_timeout_ns;
-  if (operation_timeout_nanos_.count() > 0) {
-    locate_timeout_ns = RemainingTimeNs();
-    if (locate_timeout_ns <= 0) {
-      CompleteExceptionally();
-      return;
-    }
-  } else {
-    locate_timeout_ns = -1L;
-  }
-
-  conn_->region_locator()
-      ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
-      .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
-      .onError([this](const exception_wrapper& e) {
-        OnError(e,
-                [this, e]() -> std::string {
-                  return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
-                         table_name_->qualifier() + " failed with e.what()=" +
-                         e.what().toStdString() + ", tries = " + std::to_string(tries_) +
-                         ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " +
-                         TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " +
-                         TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
-                },
-                [](const exception_wrapper& error) {});
-      });
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
-    const exception_wrapper& error, Supplier<std::string> err_msg,
-    Consumer<exception_wrapper> update_cached_location) {
-  ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
-  exceptions_->push_back(twec);
-  if (!ExceptionUtil::ShouldRetry(error) || tries_ >= max_retries_) {
-    CompleteExceptionally();
-    return;
-  }
-
-  if (tries_ > start_log_errors_count_) {
-    LOG(WARNING) << err_msg();
-  } else {
-    VLOG(1) << err_msg();
-  }
-
-  int64_t delay_ns;
-  if (operation_timeout_nanos_.count() > 0) {
-    int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
-    if (max_delay_ns <= 0) {
-      CompleteExceptionally();
-      return;
-    }
-    delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
-  } else {
-    delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
-  }
-  update_cached_location(error);
-  tries_++;
-
-  /*
-   * The HHWheelTimer::scheduleTimeout() fails with an assertion from
-   * EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of
-   * the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry
-   * timer from IOThreadPool threads. It only works when executed from a single-thread pool
-   * (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should
-   * still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool.
-   * IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection
-   * establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread
-   * just hangs because it deadlocks itself.
-   */
-  conn_->retry_executor()->add([=]() {
-    retry_timer_->scheduleTimeoutFn(
-        [=]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); },
-        std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
-  });
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) {
-  int64_t call_timeout_ns;
-  if (operation_timeout_nanos_.count() > 0) {
-    call_timeout_ns = this->RemainingTimeNs();
-    if (call_timeout_ns <= 0) {
-      this->CompleteExceptionally();
-      return;
-    }
-    call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
-  } else {
-    call_timeout_ns = rpc_timeout_nanos_.count();
-  }
-
-  std::shared_ptr<RpcClient> rpc_client;
-
-  rpc_client = conn_->rpc_client();
-
-  ResetController(controller_, call_timeout_ns);
-
-  // TODO: RegionLocation should propagate through these method chains as a shared_ptr.
-  // Otherwise, it may get deleted underneat us. We are just copying for now.
-  auto loc_ptr = std::make_shared<RegionLocation>(loc);
-  callable_(controller_, loc_ptr, rpc_client)
-      .then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
-      .onError([&, loc_ptr, this](const exception_wrapper& e) {
-        OnError(
-            e,
-            [&, this, e]() -> std::string {
-              return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(),
-                                                 loc_ptr->server_name().port()) +
-                     " for '" + row_ + "' in " + loc_ptr->DebugString() + " of " +
-                     table_name_->namespace_() + "::" + table_name_->qualifier() +
-                     " failed with e.what()=" + e.what().toStdString() + ", tries = " +
-                     std::to_string(tries_) + ", maxAttempts = " + std::to_string(max_attempts_) +
-                     ", timeout = " + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
-                     " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
-            },
-            [&, this](const exception_wrapper& error) {
-              conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error);
-            });
-      });
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::CompleteExceptionally() {
-  this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
-}
-
-template <typename RESP>
-int64_t AsyncSingleRequestRpcRetryingCaller<RESP>::RemainingTimeNs() {
-  return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController(
-    std::shared_ptr<HBaseRpcController> controller, const int64_t& timeout_ns) {
-  controller->Reset();
-  if (timeout_ns >= 0) {
-    controller->set_call_timeout(std::chrono::milliseconds(
-        std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns))));
-  }
-}
-
-// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the
-// templetized
-// class definitions.
-class OpenScannerResponse;
-template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
-template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
-template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>;
-template class AsyncSingleRequestRpcRetryingCaller<bool>;
-} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
deleted file mode 100644
index c7e28d0..0000000
--- a/hbase-native-client/core/async-rpc-retrying-caller.h
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/HHWheelTimer.h>
-
-#include <algorithm>
-#include <chrono>
-#include <functional>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <vector>
-#include "core/async-connection.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/region-location.h"
-#include "exceptions/exception.h"
-#include "if/HBase.pb.h"
-
-namespace hbase {
-
-template <typename T>
-using Supplier = std::function<T()>;
-
-template <typename T>
-using Consumer = std::function<void(T)>;
-
-template <typename R, typename S, typename... I>
-using ReqConverter = std::function<R(const S&, const I&...)>;
-
-template <typename R, typename S>
-using RespConverter = std::function<R(const S&)>;
-
-template <typename RESP>
-using RpcCallback = std::function<void(const RESP&)>;
-
-template <typename REQ, typename RESP>
-using RpcCall = std::function<folly::Future<std::unique_ptr<RESP>>(
-    std::shared_ptr<RpcClient>, std::shared_ptr<RegionLocation>,
-    std::shared_ptr<HBaseRpcController>, std::unique_ptr<REQ>)>;
-
-template <typename RESP>
-using Callable =
-    std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>,
-                                      std::shared_ptr<RegionLocation>, std::shared_ptr<RpcClient>)>;
-
-template <typename RESP>
-class AsyncSingleRequestRpcRetryingCaller {
- public:
-  AsyncSingleRequestRpcRetryingCaller(
-      std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
-      std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row,
-      RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause,
-      uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos,
-      std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
-
-  virtual ~AsyncSingleRequestRpcRetryingCaller();
-
-  folly::Future<RESP> Call();
-
- private:
-  void LocateThenCall();
-
-  void OnError(const folly::exception_wrapper& error, Supplier<std::string> err_msg,
-               Consumer<folly::exception_wrapper> update_cached_location);
-
-  void Call(const RegionLocation& loc);
-
-  void CompleteExceptionally();
-
-  int64_t RemainingTimeNs();
-
-  static void ResetController(std::shared_ptr<HBaseRpcController> controller,
-                              const int64_t& timeout_ns);
-
- private:
-  std::shared_ptr<AsyncConnection> conn_;
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<hbase::pb::TableName> table_name_;
-  std::string row_;
-  RegionLocateType locate_type_;
-  Callable<RESP> callable_;
-  std::chrono::nanoseconds pause_;
-  uint32_t max_retries_;
-  std::chrono::nanoseconds operation_timeout_nanos_;
-  std::chrono::nanoseconds rpc_timeout_nanos_;
-  uint32_t start_log_errors_count_;
-  std::shared_ptr<folly::Promise<RESP>> promise_;
-  std::shared_ptr<HBaseRpcController> controller_;
-  uint64_t start_ns_;
-  uint32_t tries_;
-  std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
-  uint32_t max_attempts_;
-};
-} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
deleted file mode 100644
index 2eb82a9..0000000
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <folly/Logging.h>
-#include <folly/Memory.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/ScopedEventBaseThread.h>
-#include <gmock/gmock.h>
-#include <google/protobuf/stubs/callback.h>
-#include <wangle/concurrent/IOThreadPoolExecutor.h>
-
-#include <chrono>
-#include <functional>
-#include <string>
-
-#include "connection/request.h"
-#include "connection/response.h"
-#include "connection/rpc-client.h"
-#include "core/async-connection.h"
-#include "core/async-rpc-retrying-caller-factory.h"
-#include "core/async-rpc-retrying-caller.h"
-#include "core/client.h"
-#include "core/connection-configuration.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/keyvalue-codec.h"
-#include "core/region-location.h"
-#include "core/request-converter.h"
-#include "core/response-converter.h"
-#include "core/result.h"
-#include "exceptions/exception.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-#include "test-util/test-util.h"
-#include "utils/time-util.h"
-
-using hbase::AsyncRpcRetryingCallerFactory;
-using hbase::AsyncConnection;
-using hbase::AsyncRegionLocator;
-using hbase::ConnectionConfiguration;
-using hbase::Configuration;
-using hbase::HBaseRpcController;
-using hbase::RegionLocation;
-using hbase::RegionLocateType;
-using hbase::RpcClient;
-using hbase::RequestConverter;
-using hbase::ResponseConverter;
-using hbase::ReqConverter;
-using hbase::RespConverter;
-using hbase::Put;
-using hbase::TimeUtil;
-using hbase::Client;
-using hbase::security::User;
-
-using ::testing::Return;
-using ::testing::_;
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-using namespace hbase;
-
-using folly::exception_wrapper;
-
-class AsyncRpcRetryTest : public ::testing::Test {
- public:
-  static std::unique_ptr<hbase::TestUtil> test_util;
-
-  static void SetUpTestCase() {
-    google::InstallFailureSignalHandler();
-    test_util = std::make_unique<hbase::TestUtil>();
-    test_util->StartMiniCluster(2);
-  }
-};
-std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr;
-
-class AsyncRegionLocatorBase : public AsyncRegionLocator {
- public:
-  AsyncRegionLocatorBase() {}
-  explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
-      : region_location_(region_location) {}
-  virtual ~AsyncRegionLocatorBase() = default;
-
-  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
-                                                                     const std::string &,
-                                                                     const RegionLocateType,
-                                                                     const int64_t) override {
-    folly::Promise<std::shared_ptr<RegionLocation>> promise;
-    promise.setValue(region_location_);
-    return promise.getFuture();
-  }
-
-  virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
-    region_location_ = region_location;
-  }
-
-  void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {}
-
- protected:
-  std::shared_ptr<RegionLocation> region_location_;
-};
-
-class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
- public:
-  MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
-  explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
-      : AsyncRegionLocatorBase(region_location) {}
-  virtual ~MockAsyncRegionLocator() {}
-};
-
-class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
- private:
-  uint32_t tries_ = 0;
-  uint32_t num_fails_ = 0;
-
- public:
-  explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
-      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
-  explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
-      : AsyncRegionLocatorBase(region_location) {}
-  virtual ~MockWrongRegionAsyncRegionLocator() {}
-
-  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
-      const hbase::pb::TableName &tn, const std::string &row,
-      const RegionLocateType locate_type = RegionLocateType::kCurrent,
-      const int64_t locate_ns = 0) override {
-    // Fail for num_fails_ times, then delegate to the super class which will give the correct
-    // region location.
-    if (tries_++ > num_fails_) {
-      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
-    }
-    folly::Promise<std::shared_ptr<RegionLocation>> promise;
-    /* set random region name, simulating invalid region */
-    auto result = std::make_shared<RegionLocation>(
-        "whatever-region-name", region_location_->region_info(), region_location_->server_name());
-    promise.setValue(result);
-    return promise.getFuture();
-  }
-};
-
-class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
- private:
-  uint32_t tries_ = 0;
-  uint32_t num_fails_ = 0;
-
- public:
-  explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
-      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
-  explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
-      : AsyncRegionLocatorBase(region_location) {}
-  virtual ~MockFailingAsyncRegionLocator() {}
-  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
-      const hbase::pb::TableName &tn, const std::string &row,
-      const RegionLocateType locate_type = RegionLocateType::kCurrent,
-      const int64_t locate_ns = 0) override {
-    // Fail for num_fails_ times, then delegate to the super class which will give the correct
-    // region location.
-    if (tries_++ > num_fails_) {
-      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
-    }
-    folly::Promise<std::shared_ptr<RegionLocation>> promise;
-    promise.setException(std::runtime_error{"Failed to look up region location"});
-    return promise.getFuture();
-  }
-};
-
-class MockAsyncConnection : public AsyncConnection,
-                            public std::enable_shared_from_this<MockAsyncConnection> {
- public:
-  MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
-                      std::shared_ptr<folly::HHWheelTimer> retry_timer,
-                      std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
-                      std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
-                      std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
-                      std::shared_ptr<RpcClient> rpc_client,
-                      std::shared_ptr<AsyncRegionLocator> region_locator)
-      : conn_conf_(conn_conf),
-        retry_timer_(retry_timer),
-        cpu_executor_(cpu_executor),
-        io_executor_(io_executor),
-        retry_executor_(retry_executor),
-        rpc_client_(rpc_client),
-        region_locator_(region_locator) {}
-  ~MockAsyncConnection() {}
-  void Init() {
-    caller_factory_ =
-        std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
-  }
-
-  std::shared_ptr<Configuration> conf() override { return nullptr; }
-  std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
-  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
-    return caller_factory_;
-  }
-  std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
-  std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
-  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
-    return retry_executor_;
-  }
-
-  void Close() override {}
-  std::shared_ptr<HBaseRpcController> CreateRpcController() override {
-    return std::make_shared<HBaseRpcController>();
-  }
-
- private:
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<ConnectionConfiguration> conn_conf_;
-  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
-  std::shared_ptr<RpcClient> rpc_client_;
-  std::shared_ptr<AsyncRegionLocator> region_locator_;
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
-  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
-};
-
-template <typename CONN>
-class MockRawAsyncTableImpl {
- public:
-  explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {}
-  virtual ~MockRawAsyncTableImpl() = default;
-
-  /* implement this in real RawAsyncTableImpl. */
-
-  /* in real RawAsyncTableImpl, this should be private. */
-  folly::Future<std::shared_ptr<hbase::Result>> GetCall(
-      std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
-      std::shared_ptr<RegionLocation> loc, const hbase::Get &get) {
-    hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = [](
-        std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
-        std::shared_ptr<HBaseRpcController> controller,
-        std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
-      VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:"
-              << loc->DebugString();
-      return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
-                                   std::move(preq), User::defaultUser(), "ClientService");
-    };
-
-    return Call<hbase::Get, hbase::Request, hbase::Response, std::shared_ptr<hbase::Result>>(
-        rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call,
-        &hbase::ResponseConverter::FromGetResponse);
-  }
-
-  /* in real RawAsyncTableImpl, this should be private. */
-  template <typename REQ, typename PREQ, typename PRESP, typename RESP>
-  folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client,
-                           std::shared_ptr<HBaseRpcController> controller,
-                           std::shared_ptr<RegionLocation> loc, const REQ &req,
-                           ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
-                           hbase::RpcCall<PREQ, PRESP> rpc_call,
-                           RespConverter<RESP, PRESP> resp_converter) {
-    promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
-    auto f = promise_->getFuture();
-    VLOG(1) << "calling rpc_call";
-    rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
-        .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) {
-          VLOG(1) << "MockRawAsyncTableImpl#call succeded: ";
-          RESP result = resp_converter(*presp);
-          promise_->setValue(result);
-        })
-        .onError([this](const exception_wrapper &e) {
-          VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what();
-          VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name();
-          promise_->setException(e);
-        });
-    return f;
-  }
-
- private:
-  std::shared_ptr<CONN> conn_;
-  std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_;
-};
-
-void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName,
-             uint32_t operation_timeout_millis = 1200000) {
-  AsyncRpcRetryTest::test_util->CreateTable(tableName, "d");
-
-  // Create TableName and Row to be fetched from HBase
-  auto tn = folly::to<hbase::pb::TableName>(tableName);
-  auto row = "test2";
-
-  // Get to be performed on above HBase Table
-  hbase::Get get(row);
-
-  // Create a client
-  Client client(*(AsyncRpcRetryTest::test_util->conf()));
-
-  // Get connection to HBase Table
-  auto table = client.Table(tn);
-
-  table->Put(Put{"test2"}.AddColumn("d", "2", "value2"));
-  table->Put(Put{"test2"}.AddColumn("d", "extra", "value for extra"));
-
-  /* init region location and rpc channel */
-  auto region_location = table->GetRegionLocation(row);
-
-  // auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4);
-  auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
-  auto io_executor_ = client.async_connection()->io_executor();
-  auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
-  auto codec = std::make_shared<hbase::KeyValueCodec>();
-  auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
-                                                AsyncRpcRetryTest::test_util->conf());
-  // auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true);
-  std::shared_ptr<folly::HHWheelTimer> retry_timer =
-      folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
-
-  /* init connection configuration */
-  auto connection_conf = std::make_shared<ConnectionConfiguration>(
-      TimeUtil::SecondsToNanos(20),                       // connect_timeout
-      TimeUtil::MillisToNanos(operation_timeout_millis),  // operation_timeout
-      TimeUtil::SecondsToNanos(60),                       // rpc_timeout
-      TimeUtil::MillisToNanos(100),                       // pause
-      5,                                                  // max retries
-      9);                                                 // start log errors count
-
-  /* set region locator */
-  region_locator->set_region_location(region_location);
-
-  /* init hbase client connection */
-  auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
-                                                    io_executor_, retry_executor_, rpc_client,
-                                                    region_locator);
-  conn->Init();
-
-  /* init retry caller factory */
-  auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn);
-
-  /* init request caller builder */
-  auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>();
-
-  /* call with retry to get result */
-
-  auto async_caller =
-      builder->table(std::make_shared<hbase::pb::TableName>(tn))
-          ->row(row)
-          ->rpc_timeout(conn->connection_conf()->read_rpc_timeout())
-          ->operation_timeout(conn->connection_conf()->operation_timeout())
-          ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
-                             std::shared_ptr<hbase::RegionLocation> loc,
-                             std::shared_ptr<hbase::RpcClient> rpc_client)
-                       -> folly::Future<std::shared_ptr<hbase::Result>> {
-                         return tableImpl->GetCall(rpc_client, controller, loc, get);
-                       })
-          ->Build();
-
-  auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
-
-  auto result = async_caller->Call().get(milliseconds(500000));
-
-  // Test the values, should be same as in put executed on hbase shell
-  ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
-  EXPECT_EQ("test2", result->Row());
-  EXPECT_EQ("value2", *(result->Value("d", "2")));
-  EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
-
-  retry_timer->destroy();
-  table->Close();
-  client.Close();
-  retry_executor_->stop();
-}
-
-// Test successful case
-TEST_F(AsyncRpcRetryTest, TestGetBasic) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockAsyncRegionLocator>());
-  runTest(region_locator, "table1");
-}
-
-// Tests the RPC failing 3 times, then succeeding
-TEST_F(AsyncRpcRetryTest, TestHandleException) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
-  runTest(region_locator, "table2");
-}
-
-// Tests the RPC failing 5 times, throwing an exception
-TEST_F(AsyncRpcRetryTest, TestFailWithException) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockWrongRegionAsyncRegionLocator>(5));
-  EXPECT_ANY_THROW(runTest(region_locator, "table3"));
-}
-
-// Tests the region location lookup failing 3 times, then succeeding
-TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(3));
-  runTest(region_locator, "table4");
-}
-
-// Tests the region location lookup failing 5 times, throwing an exception
-TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(5));
-  EXPECT_ANY_THROW(runTest(region_locator, "table5"));
-}
-
-// Tests hitting operation timeout, thus not retrying anymore
-TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(3));
-  EXPECT_ANY_THROW(runTest(region_locator, "table6", 200));
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.cc b/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
deleted file mode 100644
index a1e8362..0000000
--- a/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-scan-rpc-retrying-caller.h"
-
-namespace hbase {
-
-ScanResumerImpl::ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
-    : caller_(caller), mutex_() {}
-
-void ScanResumerImpl::Resume() {
-  // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
-  // just return at the first if condition without loading the resp and numValidResuls field. If
-  // resume is called after suspend, then it is also safe to just reference resp and
-  // numValidResults after the synchronized block as no one will change it anymore.
-  std::shared_ptr<pb::ScanResponse> local_resp;
-  int64_t local_num_complete_rows;
-
-  {
-    std::unique_lock<std::mutex> mlock{mutex_};
-    if (state_ == ScanResumerState::kInitialized) {
-      // user calls this method before we call prepare, so just set the state to
-      // RESUMED, the implementation will just go on.
-      state_ = ScanResumerState::kResumed;
-      return;
-    }
-    if (state_ == ScanResumerState::kResumed) {
-      // already resumed, give up.
-      return;
-    }
-    state_ = ScanResumerState::kResumed;
-    local_resp = resp_;
-    local_num_complete_rows = num_complete_rows_;
-  }
-
-  caller_->CompleteOrNext(local_resp);
-}
-
-bool ScanResumerImpl::Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows) {
-  std::unique_lock<std::mutex> mlock(mutex_);
-  if (state_ == ScanResumerState::kResumed) {
-    // user calls resume before we actually suspend the scan, just continue;
-    return false;
-  }
-  state_ = ScanResumerState::kSuspended;
-  resp_ = resp;
-  num_complete_rows_ = num_complete_rows;
-
-  return true;
-}
-
-ScanControllerImpl::ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
-    : caller_(caller) {}
-
-std::shared_ptr<ScanResumer> ScanControllerImpl::Suspend() {
-  PreCheck();
-  state_ = ScanControllerState::kSuspended;
-  resumer_ = std::make_shared<ScanResumerImpl>(caller_);
-  return resumer_;
-}
-
-void ScanControllerImpl::Terminate() {
-  PreCheck();
-  state_ = ScanControllerState::kTerminated;
-}
-
-// return the current state, and set the state to DESTROYED.
-ScanControllerState ScanControllerImpl::Destroy() {
-  ScanControllerState state = state_;
-  state_ = ScanControllerState::kDestroyed;
-  return state;
-}
-
-void ScanControllerImpl::PreCheck() {
-  CHECK(std::this_thread::get_id() == caller_thread_id_)
-      << "The current thread is" << std::this_thread::get_id() << ", expected thread is "
-      << caller_thread_id_ << ", you should not call this method outside OnNext or OnHeartbeat";
-
-  CHECK(state_ == ScanControllerState::kInitialized) << "Invalid Stopper state "
-                                                     << DebugString(state_);
-}
-
-std::string ScanControllerImpl::DebugString(ScanControllerState state) {
-  switch (state) {
-    case ScanControllerState::kInitialized:
-      return "kInitialized";
-    case ScanControllerState::kSuspended:
-      return "kSuspended";
-    case ScanControllerState::kTerminated:
-      return "kTerminated";
-    case ScanControllerState::kDestroyed:
-      return "kDestroyed";
-    default:
-      return "UNKNOWN";
-  }
-}
-
-std::string ScanControllerImpl::DebugString(ScanResumerState state) {
-  switch (state) {
-    case ScanResumerState::kInitialized:
-      return "kInitialized";
-    case ScanResumerState::kSuspended:
-      return "kSuspended";
-    case ScanResumerState::kResumed:
-      return "kResumed";
-    default:
-      return "UNKNOWN";
-  }
-}
-
-AsyncScanRpcRetryingCaller::AsyncScanRpcRetryingCaller(
-    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
-    std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<Scan> scan, int64_t scanner_id,
-    std::shared_ptr<ScanResultCache> results_cache, std::shared_ptr<RawScanResultConsumer> consumer,
-    std::shared_ptr<RegionLocation> region_location, nanoseconds scanner_lease_timeout_nanos,
-    nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
-    nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
-    : conn_(conn),
-      retry_timer_(retry_timer),
-      rpc_client_(rpc_client),
-      scan_(scan),
-      scanner_id_(scanner_id),
-      results_cache_(results_cache),
-      consumer_(consumer),
-      region_location_(region_location),
-      scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos),
-      pause_(pause),
-      max_retries_(max_retries),
-      scan_timeout_nanos_(scan_timeout_nanos),
-      rpc_timeout_nanos_(rpc_timeout_nanos),
-      start_log_errors_count_(start_log_errors_count),
-      promise_(std::make_shared<folly::Promise<bool>>()),
-      tries_(1) {
-  controller_ = conn_->CreateRpcController();
-  start_ns_ = TimeUtil::GetNowNanos();
-  max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
-  exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
-}
-
-folly::Future<bool> AsyncScanRpcRetryingCaller::Start(
-    std::shared_ptr<HBaseRpcController> controller,
-    std::shared_ptr<pb::ScanResponse> open_scan_resp,
-    const std::shared_ptr<CellScanner> cell_scanner) {
-  OnComplete(controller, open_scan_resp, cell_scanner);
-  return promise_->getFuture();
-}
-
-int64_t AsyncScanRpcRetryingCaller::RemainingTimeNs() {
-  return scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
-}
-
-void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller,
-                                            std::shared_ptr<pb::ScanResponse> resp,
-                                            const std::shared_ptr<CellScanner> cell_scanner) {
-  VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_;
-
-  if (controller->Failed()) {
-    OnError(controller->exception());
-    return;
-  }
-
-  bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message();
-
-  int64_t num_complete_rows_before = results_cache_->num_complete_rows();
-  try {
-    auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner);
-
-    auto results = results_cache_->AddAndGet(raw_results, is_heartbeat);
-
-    auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this());
-
-    if (results.size() > 0) {
-      UpdateNextStartRowWhenError(*results[results.size() - 1]);
-      VLOG(5) << "Calling consumer->OnNext()";
-      consumer_->OnNext(results, scan_controller);
-    } else if (is_heartbeat) {
-      consumer_->OnHeartbeat(scan_controller);
-    }
-
-    ScanControllerState state = scan_controller->Destroy();
-    if (state == ScanControllerState::kTerminated) {
-      if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
-        // we have more results in region but user request to stop the scan, so we need to close the
-        // scanner explicitly.
-        CloseScanner();
-      }
-      CompleteNoMoreResults();
-      return;
-    }
-
-    int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before;
-    if (state == ScanControllerState::kSuspended) {
-      if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) {
-        return;
-      }
-    }
-  } catch (const std::runtime_error& e) {
-    // We can not retry here. The server has responded normally and the call sequence has been
-    // increased so a new scan with the same call sequence will cause an
-    // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
-    LOG(WARNING) << "Received exception in reading the scan response:" << e.what();
-    CompleteWhenError(true);
-    return;
-  }
-
-  CompleteOrNext(resp);
-}
-
-void AsyncScanRpcRetryingCaller::CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp) {
-  VLOG(5) << "Scan: CompleteOrNext, scanner_id" << scanner_id_
-          << ", response:" << resp->ShortDebugString();
-
-  if (resp->has_more_results() && !resp->more_results()) {
-    // RS tells us there is no more data for the whole scan
-    CompleteNoMoreResults();
-    return;
-  }
-  // TODO: Implement Scan::limit(), and check the limit here
-
-  if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
-    // TODO: check whether Scan is reversed here
-    CompleteWhenNoMoreResultsInRegion();
-    return;
-  }
-  Next();
-}
-
-void AsyncScanRpcRetryingCaller::CompleteExceptionally(bool close_scanner) {
-  VLOG(5) << "Scan: CompleteExceptionally";
-  results_cache_->Clear();
-  if (close_scanner) {
-    CloseScanner();
-  }
-  this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
-}
-
-void AsyncScanRpcRetryingCaller::CompleteNoMoreResults() {
-  // In master code, scanners auto-close if we have exhausted the region. It may not be the case
-  // in branch-1 code. If this is backported, make sure that the scanner is closed.
-  VLOG(5) << "Scan: CompleteNoMoreResults, scanner_id:" << scanner_id_;
-  promise_->setValue(false);
-}
-
-void AsyncScanRpcRetryingCaller::CompleteWhenNoMoreResultsInRegion() {
-  VLOG(5) << "Scan: CompleteWhenNoMoreResultsInRegion, scanner_id:" << scanner_id_;
-  // In master code, scanners auto-close if we have exhausted the region. It may not be the case
-  // in branch-1 code. If this is backported, make sure that the scanner is closed.
-  if (NoMoreResultsForScan(*scan_, region_location_->region_info())) {
-    CompleteNoMoreResults();
-  } else {
-    CompleteWithNextStartRow(region_location_->region_info().end_key(), true);
-  }
-}
-
-void AsyncScanRpcRetryingCaller::CompleteWithNextStartRow(std::string row, bool inclusive) {
-  VLOG(5) << "Scan: CompleteWithNextStartRow: region scan is complete, move to next region";
-  scan_->SetStartRow(row);
-  // TODO: set inclusive if it is reverse scans
-  promise_->setValue(true);
-}
-
-void AsyncScanRpcRetryingCaller::UpdateNextStartRowWhenError(const Result& result) {
-  next_start_row_when_error_ = optional<std::string>(result.Row());
-  include_next_start_row_when_error_ = result.Partial();
-}
-
-void AsyncScanRpcRetryingCaller::CompleteWhenError(bool close_scanner) {
-  VLOG(5) << "Scan: CompleteWhenError, scanner_id:" << scanner_id_;
-  results_cache_->Clear();
-  if (close_scanner) {
-    CloseScanner();
-  }
-  if (next_start_row_when_error_) {
-    // TODO: HBASE-17583 adds include start / stop row to the Scan. Once we rebase and implement
-    // those options in Scan , we can start using that here.
-    scan_->SetStartRow(include_next_start_row_when_error_
-                           ? *next_start_row_when_error_
-                           : BytesUtil::CreateClosestRowAfter(*next_start_row_when_error_));
-  }
-  promise_->setValue(true);
-}
-
-void AsyncScanRpcRetryingCaller::OnError(const folly::exception_wrapper& error) {
-  VLOG(5) << "Scan: OnError, scanner_id:" << scanner_id_;
-  if (tries_ > start_log_errors_count_ || VLOG_IS_ON(5)) {
-    LOG(WARNING) << "Call to " << region_location_->server_name().ShortDebugString()
-                 << " for scanner id = " << scanner_id_ << " for "
-                 << region_location_->region_info().ShortDebugString()
-                 << " failed, , tries = " << tries_ << ", maxAttempts = " << max_attempts_
-                 << ", timeout = " << TimeUtil::ToMillis(scan_timeout_nanos_).count()
-                 << " ms, time elapsed = " << TimeUtil::ElapsedMillis(start_ns_) << " ms"
-                 << error.what().toStdString();
-  }
-
-  bool scanner_closed = ExceptionUtil::IsScannerClosed(error);
-  ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
-  exceptions_->push_back(twec);
-  if (tries_ >= max_retries_) {
-    CompleteExceptionally(!scanner_closed);
-    return;
-  }
-
-  int64_t delay_ns;
-  if (scan_timeout_nanos_.count() > 0) {
-    int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
-    if (max_delay_ns <= 0) {
-      CompleteExceptionally(!scanner_closed);
-      return;
-    }
-    delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
-  } else {
-    delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
-  }
-
-  if (scanner_closed) {
-    CompleteWhenError(false);
-    return;
-  }
-
-  if (ExceptionUtil::IsScannerOutOfOrder(error)) {
-    CompleteWhenError(true);
-    return;
-  }
-  if (!ExceptionUtil::ShouldRetry(error)) {
-    CompleteExceptionally(true);
-    return;
-  }
-  tries_++;
-
-  auto self(shared_from_this());
-  conn_->retry_executor()->add([&]() {
-    retry_timer_->scheduleTimeoutFn(
-        [self]() { self->conn_->cpu_executor()->add([&]() { self->Call(); }); },
-        std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
-  });
-}
-
-bool AsyncScanRpcRetryingCaller::NoMoreResultsForScan(const Scan& scan,
-                                                      const pb::RegionInfo& info) {
-  if (BytesUtil::IsEmptyStopRow(info.end_key())) {
-    return true;
-  }
-  if (BytesUtil::IsEmptyStopRow(scan.StopRow())) {
-    return false;
-  }
-  int32_t c = BytesUtil::CompareTo(info.end_key(), scan.StopRow());
-  // 1. if our stop row is less than the endKey of the region
-  // 2. if our stop row is equal to the endKey of the region and we do not include the stop row
-  // for scan.
-  return c > 0 ||
-         (c == 0 /* && !scan.IncludeStopRow()*/);  // TODO: Scans always exclude StopRow for now.
-}
-
-void AsyncScanRpcRetryingCaller::Next() {
-  VLOG(5) << "Scan: Next";
-  next_call_seq_++;
-  tries_ = 1;
-  exceptions_->clear();
-  start_ns_ = TimeUtil::GetNowNanos();
-  Call();
-}
-
-void AsyncScanRpcRetryingCaller::Call() {
-  VLOG(5) << "Scan: Call";
-  auto self(shared_from_this());
-  // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
-  // less than the scan timeout. If the server does not respond in time(usually this will not
-  // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
-  // resending the next request and the only way to fix this is to close the scanner and open a
-  // new one.
-  int64_t call_timeout_nanos;
-  if (scan_timeout_nanos_.count() > 0) {
-    int64_t remaining_nanos = scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
-    if (remaining_nanos <= 0) {
-      CompleteExceptionally(true);
-      return;
-    }
-    call_timeout_nanos = remaining_nanos;
-  } else {
-    call_timeout_nanos = 0L;
-  }
-
-  ResetController(controller_, call_timeout_nanos);
-
-  auto req =
-      RequestConverter::ToScanRequest(scanner_id_, scan_->Caching(), false, next_call_seq_, false);
-
-  // do the RPC call
-  rpc_client_
-      ->AsyncCall(region_location_->server_name().host_name(),
-                  region_location_->server_name().port(), std::move(req),
-                  security::User::defaultUser(), "ClientService")
-      .via(conn_->cpu_executor().get())
-      .then([self, this](const std::unique_ptr<Response>& resp) {
-        auto scan_resp = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
-        return OnComplete(controller_, scan_resp, resp->cell_scanner());
-      })
-      .onError([self, this](const folly::exception_wrapper& e) { OnError(e); });
-}
-
-void AsyncScanRpcRetryingCaller::CloseScanner() {
-  auto self(shared_from_this());
-  ResetController(controller_, rpc_timeout_nanos_.count());
-
-  VLOG(5) << "Closing scanner with scanner_id:" << folly::to<std::string>(scanner_id_);
-
-  // Do a close scanner RPC. Fire and forget.
-  auto req = RequestConverter::ToScanRequest(scanner_id_, 0, true);
-  rpc_client_
-      ->AsyncCall(region_location_->server_name().host_name(),
-                  region_location_->server_name().port(), std::move(req),
-                  security::User::defaultUser(), "ClientService")
-      .onError([self, this](const folly::exception_wrapper& e) -> std::unique_ptr<Response> {
-        LOG(WARNING) << "Call to " + region_location_->server_name().ShortDebugString() +
-                            " for closing scanner_id = " + folly::to<std::string>(scanner_id_) +
-                            " for " + region_location_->region_info().ShortDebugString() +
-                            " failed, ignore, probably already closed. Exception:" +
-                            e.what().toStdString();
-        return nullptr;
-      });
-}
-
-void AsyncScanRpcRetryingCaller::ResetController(std::shared_ptr<HBaseRpcController> controller,
-                                                 const int64_t& timeout_nanos) {
-  controller->Reset();
-  if (timeout_nanos >= 0) {
-    controller->set_call_timeout(
-        milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_nanos))));
-  }
-}
-
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-scan-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.h b/hbase-native-client/core/async-scan-rpc-retrying-caller.h
deleted file mode 100644
index 9555e80..0000000
--- a/hbase-native-client/core/async-scan-rpc-retrying-caller.h
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Conv.h>
-#include <folly/ExceptionWrapper.h>
-#include <folly/Format.h>
-#include <folly/Logging.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/HHWheelTimer.h>
-
-#include <algorithm>
-#include <chrono>
-#include <functional>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "connection/rpc-client.h"
-#include "core/async-connection.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/raw-scan-result-consumer.h"
-#include "core/region-location.h"
-#include "core/request-converter.h"
-#include "core/response-converter.h"
-#include "core/result.h"
-#include "core/scan-result-cache.h"
-#include "core/scan.h"
-#include "exceptions/exception.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-#include "utils/bytes-util.h"
-#include "utils/connection-util.h"
-#include "utils/optional.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
-
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-namespace hbase {
-
-class AsyncScanRpcRetryingCaller;
-
-// The resume method is allowed to be called in another thread so here we also use the
-// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
-// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
-// and when user calls resume method, we will change the state to RESUMED. But the resume method
-// could be called in other thread, and in fact, user could just do this:
-// controller.suspend().resume()
-// This is strange but valid. This means the scan could be resumed before we call the prepare
-// method to do the actual suspend work. So in the resume method, we will check if the state is
-// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
-// method, if the state is RESUMED already, we will just return an let the scan go on.
-// Notice that, the public methods of this class is supposed to be called by upper layer only, and
-// package private methods can only be called within the implementation of
-// AsyncScanSingleRegionRpcRetryingCaller.
-// TODO: Unlike the Java counter part, we do not do scan lease renewals in a background thread.
-// Since there is also no async scan API exposed to the users, only ScanResultConsumer is the
-// AsyncTableResultScanner which will only pause the scanner if the result cache is maxed. The
-// application is expected to consume the scan results before the scanner lease timeout.
-class ScanResumerImpl : public ScanResumer {
- public:
-  explicit ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
-
-  virtual ~ScanResumerImpl() = default;
-
-  /**
-   * Resume the scan. You are free to call it multiple time but only the first call will take
-   * effect.
-   */
-  void Resume() override;
-
-  // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
-  // for more details.
-  bool Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows);
-
- private:
-  // INITIALIZED -> SUSPENDED -> RESUMED
-  // INITIALIZED -> RESUMED
-  ScanResumerState state_ = ScanResumerState::kInitialized;
-  std::mutex mutex_;
-  std::shared_ptr<pb::ScanResponse> resp_ = nullptr;
-  int64_t num_complete_rows_ = 0;
-  std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
-};
-
-class ScanControllerImpl : public ScanController {
- public:
-  virtual ~ScanControllerImpl() = default;
-
-  explicit ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
-
-  /**
-   * Suspend the scan.
-   * <p>
-   * This means we will stop fetching data in background, i.e., will not call onNext any more
-   * before you resume the scan.
-   * @return A resumer used to resume the scan later.
-   */
-  std::shared_ptr<ScanResumer> Suspend();
-
-  /**
-   * Terminate the scan.
-   * <p>
-   * This is useful when you have got enough results and want to stop the scan in onNext method,
-   * or you want to stop the scan in onHeartbeat method because it has spent too many time.
-   */
-  void Terminate();
-
-  // return the current state, and set the state to DESTROYED.
-  ScanControllerState Destroy();
-
-  std::shared_ptr<ScanResumerImpl> resumer() { return resumer_; }
-
- private:
-  void PreCheck();
-
-  std::string DebugString(ScanControllerState state);
-
-  std::string DebugString(ScanResumerState state);
-
- private:
-  // Make sure the methods are only called in this thread.
-  std::thread::id caller_thread_id_ = std::this_thread::get_id();
-  // INITIALIZED -> SUSPENDED -> DESTROYED
-  // INITIALIZED -> TERMINATED -> DESTROYED
-  // INITIALIZED -> DESTROYED
-  // If the state is incorrect we will throw IllegalStateException.
-  ScanControllerState state_ = ScanControllerState::kInitialized;
-  std::shared_ptr<ScanResumerImpl> resumer_ = nullptr;
-  std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
-};
-
-class AsyncScanRpcRetryingCaller : public std::enable_shared_from_this<AsyncScanRpcRetryingCaller> {
- public:
-  AsyncScanRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
-                             std::shared_ptr<folly::HHWheelTimer> retry_timer,
-                             std::shared_ptr<hbase::RpcClient> rpc_client,
-                             std::shared_ptr<Scan> scan, int64_t scanner_id,
-                             std::shared_ptr<ScanResultCache> results_cache,
-                             std::shared_ptr<RawScanResultConsumer> consumer,
-                             std::shared_ptr<RegionLocation> region_location,
-                             nanoseconds scanner_lease_timeout_nanos, nanoseconds pause,
-                             uint32_t max_retries, nanoseconds scan_timeout_nanos,
-                             nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
-
-  folly::Future<bool> Start(std::shared_ptr<HBaseRpcController> controller,
-                            std::shared_ptr<pb::ScanResponse> open_scan_resp,
-                            const std::shared_ptr<CellScanner> cell_scanner);
-
- private:
-  int64_t RemainingTimeNs();
-  void OnComplete(std::shared_ptr<HBaseRpcController> controller,
-                  std::shared_ptr<pb::ScanResponse> resp,
-                  const std::shared_ptr<CellScanner> cell_scanner);
-
-  void CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp);
-
-  void CompleteExceptionally(bool close_scanner);
-
-  void CompleteNoMoreResults();
-
-  void CompleteWhenNoMoreResultsInRegion();
-
-  void CompleteWithNextStartRow(std::string row, bool inclusive);
-
-  void UpdateNextStartRowWhenError(const Result& result);
-
-  void CompleteWhenError(bool close_scanner);
-
-  void OnError(const folly::exception_wrapper& e);
-
-  bool NoMoreResultsForScan(const Scan& scan, const pb::RegionInfo& info);
-
-  void Next();
-
-  void Call();
-
-  void CloseScanner();
-
-  void ResetController(std::shared_ptr<HBaseRpcController> controller,
-                       const int64_t& timeout_nanos);
-
- private:
-  std::shared_ptr<AsyncConnection> conn_;
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<hbase::RpcClient> rpc_client_;
-  std::shared_ptr<Scan> scan_;
-  int64_t scanner_id_;
-  std::shared_ptr<ScanResultCache> results_cache_;
-  std::shared_ptr<RawScanResultConsumer> consumer_;
-  std::shared_ptr<RegionLocation> region_location_;
-  nanoseconds scanner_lease_timeout_nanos_;
-  nanoseconds pause_;
-  uint32_t max_retries_;
-  nanoseconds scan_timeout_nanos_;
-  nanoseconds rpc_timeout_nanos_;
-  uint32_t start_log_errors_count_;
-  std::shared_ptr<folly::Promise<bool>> promise_;
-  std::shared_ptr<HBaseRpcController> controller_;
-  optional<std::string> next_start_row_when_error_ = optional<std::string>();
-  bool include_next_start_row_when_error_ = true;
-  uint64_t start_ns_;
-  uint32_t tries_;
-  std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
-  uint32_t max_attempts_;
-  int64_t next_call_seq_ = -1L;
-
-  friend class ScanResumerImpl;
-  friend class ScanControllerImpl;
-};
-
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-table-result-scanner.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-table-result-scanner.cc b/hbase-native-client/core/async-table-result-scanner.cc
deleted file mode 100644
index b1935ae..0000000
--- a/hbase-native-client/core/async-table-result-scanner.cc
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-table-result-scanner.h"
-
-#include <vector>
-
-namespace hbase {
-AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size)
-    : max_cache_size_(max_cache_size) {
-  closed_ = false;
-  cache_size_ = 0;
-}
-
-AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); }
-
-void AsyncTableResultScanner::Close() {
-  std::unique_lock<std::mutex> mlock(mutex_);
-  closed_ = true;
-  while (!queue_.empty()) {
-    queue_.pop();
-  }
-  cache_size_ = 0;
-  if (resumer_ != nullptr) {
-    resumer_->Resume();
-  }
-  cond_.notify_all();
-}
-
-std::shared_ptr<Result> AsyncTableResultScanner::Next() {
-  VLOG(5) << "AsyncTableResultScanner: Next()";
-
-  std::shared_ptr<Result> result = nullptr;
-  std::shared_ptr<ScanResumer> local_resumer = nullptr;
-  {
-    std::unique_lock<std::mutex> mlock(mutex_);
-    while (queue_.empty()) {
-      if (closed_) {
-        return nullptr;
-      }
-      if (error_) {
-        throw error_;
-      }
-      cond_.wait(mlock);
-    }
-    result = queue_.front();
-    queue_.pop();
-
-    cache_size_ -= EstimatedSizeWithSharedPtr(result);
-    if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) {
-      VLOG(1) << std::this_thread::get_id() << " resume scan prefetching";
-      local_resumer = resumer_;
-      resumer_ = nullptr;
-    }
-  }
-
-  // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that
-  // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC
-  // in the same event thread before returning from the previous call. This seems like the
-  // wrong thing to do(™), but we cannot fix that now. Since the call back can end up calling
-  // this::OnNext(), we should unlock the mutex.
-  if (local_resumer != nullptr) {
-    local_resumer->Resume();
-  }
-  return result;
-}
-
-void AsyncTableResultScanner::AddToCache(const std::vector<std::shared_ptr<Result>> &results) {
-  VLOG(5) << "AsyncTableResultScanner: AddToCache()";
-  for (const auto r : results) {
-    queue_.push(r);
-    cache_size_ += EstimatedSizeWithSharedPtr(r);
-  }
-}
-
-template <typename T>
-inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr<T> t) {
-  return t->EstimatedSize() + sizeof(std::shared_ptr<T>);
-}
-
-void AsyncTableResultScanner::OnNext(const std::vector<std::shared_ptr<Result>> &results,
-                                     std::shared_ptr<ScanController> controller) {
-  VLOG(5) << "AsyncTableResultScanner: OnNext()";
-  {
-    std::unique_lock<std::mutex> mlock(mutex_);
-    if (closed_) {
-      controller->Terminate();
-      return;
-    }
-    AddToCache(results);
-
-    if (cache_size_ >= max_cache_size_) {
-      StopPrefetch(controller);
-    }
-  }
-  cond_.notify_all();
-}
-
-void AsyncTableResultScanner::StopPrefetch(std::shared_ptr<ScanController> controller) {
-  VLOG(1) << std::this_thread::get_id()
-          << ": stop prefetching when scanning as the cache size " +
-                 folly::to<std::string>(cache_size_) + " is greater than the max_cache_size " +
-                 folly::to<std::string>(max_cache_size_);
-
-  resumer_ = controller->Suspend();
-  num_prefetch_stopped_++;
-}
-
-/**
- * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
- * onNext.
- * <p>
- * This method give you a chance to terminate a slow scan operation.
- * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
- *          instance is only valid within the scope of onHeartbeat method. You can only call its
- *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
- */
-void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr<ScanController> controller) {
-  std::unique_lock<std::mutex> mlock(mutex_);
-  if (closed_) {
-    controller->Terminate();
-  }
-}
-
-/**
- * Indicate that we hit an unrecoverable error and the scan operation is terminated.
- * <p>
- * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
- */
-void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) {
-  LOG(WARNING) << "Scanner received error" << error.what();
-  std::unique_lock<std::mutex> mlock(mutex_);
-  error_ = error;
-  cond_.notify_all();
-}
-
-/**
- * Indicate that the scan operation is completed normally.
- */
-void AsyncTableResultScanner::OnComplete() {
-  std::unique_lock<std::mutex> mlock(mutex_);
-  closed_ = true;
-  cond_.notify_all();
-}
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-table-result-scanner.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-table-result-scanner.h b/hbase-native-client/core/async-table-result-scanner.h
deleted file mode 100644
index dcdf871..0000000
--- a/hbase-native-client/core/async-table-result-scanner.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Conv.h>
-#include <folly/ExceptionWrapper.h>
-#include <folly/Logging.h>
-#include <chrono>
-#include <condition_variable>
-#include <memory>
-#include <mutex>
-#include <queue>
-#include <string>
-#include <vector>
-
-#include "core/raw-scan-result-consumer.h"
-#include "core/result-scanner.h"
-#include "core/result.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-
-namespace hbase {
-
-class AsyncTableResultScanner : public ResultScanner, public RawScanResultConsumer {
- public:
-  explicit AsyncTableResultScanner(int64_t max_cache_size);
-
-  virtual ~AsyncTableResultScanner();
-
-  void Close() override;
-
-  std::shared_ptr<Result> Next() override;
-
-  void OnNext(const std::vector<std::shared_ptr<Result>> &results,
-              std::shared_ptr<ScanController> controller) override;
-
-  /**
-   * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
-   * onNext.
-   * <p>
-   * This method give you a chance to terminate a slow scan operation.
-   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
-   *          instance is only valid within the scope of onHeartbeat method. You can only call its
-   *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
-   */
-  void OnHeartbeat(std::shared_ptr<ScanController> controller) override;
-
-  /**
-   * Indicate that we hit an unrecoverable error and the scan operation is terminated.
-   * <p>
-   * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
-   */
-  void OnError(const folly::exception_wrapper &error) override;
-
-  /**
-   * Indicate that the scan operation is completed normally.
-   */
-  void OnComplete() override;
-
-  // For testing
-  uint32_t num_prefetch_stopped() { return num_prefetch_stopped_; }
-
- private:
-  void AddToCache(const std::vector<std::shared_ptr<Result>> &results);
-
-  template <typename T>
-  inline size_t EstimatedSizeWithSharedPtr(std::shared_ptr<T> t);
-
-  void StopPrefetch(std::shared_ptr<ScanController> controller);
-
- private:
-  std::queue<std::shared_ptr<Result>> queue_;
-  std::mutex mutex_;
-  std::condition_variable cond_;
-  folly::exception_wrapper error_;
-  int64_t cache_size_;
-  int64_t max_cache_size_;
-  bool closed_;
-  std::shared_ptr<ScanResumer> resumer_ = nullptr;
-  uint32_t num_prefetch_stopped_ = 0;
-};
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/cell-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/cell-test.cc b/hbase-native-client/core/cell-test.cc
deleted file mode 100644
index 4611473..0000000
--- a/hbase-native-client/core/cell-test.cc
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/cell.h"
-
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-#include <memory>
-
-using hbase::Cell;
-using hbase::CellType;
-
-TEST(CellTest, Constructor) {
-  std::string row = "row-value";
-  std::string family = "family-value";
-  std::string column = "column-value";
-  std::string value = "value-value";
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-  CellType cell_type = CellType::PUT;
-
-  Cell cell{row, family, column, timestamp, value, cell_type};
-
-  EXPECT_EQ(row, cell.Row());
-  EXPECT_EQ(family, cell.Family());
-  EXPECT_EQ(column, cell.Qualifier());
-  EXPECT_EQ(value, cell.Value());
-  EXPECT_EQ(timestamp, cell.Timestamp());
-  EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CopyConstructor) {
-  std::string row = "row-value";
-  std::string family = "family-value";
-  std::string column = "column-value";
-  std::string value = "value-value";
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-  CellType cell_type = CellType::PUT;
-
-  auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
-  Cell cell2{*cell};
-  cell = nullptr;
-
-  EXPECT_EQ(row, cell2.Row());
-  EXPECT_EQ(family, cell2.Family());
-  EXPECT_EQ(column, cell2.Qualifier());
-  EXPECT_EQ(value, cell2.Value());
-  EXPECT_EQ(timestamp, cell2.Timestamp());
-  EXPECT_EQ(cell_type, cell2.Type());
-}
-
-TEST(CellTest, CopyAssignment) {
-  std::string row = "row-value";
-  std::string family = "family-value";
-  std::string column = "column-value";
-  std::string value = "value-value";
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-  CellType cell_type = CellType::PUT;
-
-  auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
-  Cell cell2 = *cell;
-  cell = nullptr;
-
-  EXPECT_EQ(row, cell2.Row());
-  EXPECT_EQ(family, cell2.Family());
-  EXPECT_EQ(column, cell2.Qualifier());
-  EXPECT_EQ(value, cell2.Value());
-  EXPECT_EQ(timestamp, cell2.Timestamp());
-  EXPECT_EQ(cell_type, cell2.Type());
-}
-
-TEST(CellTest, CellRowTest) {
-  std::string row = "only-row";
-  std::string family = "D";
-  std::string column = "";
-  std::string value = "";
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-  CellType cell_type = CellType::PUT;
-  Cell cell{row, family, column, timestamp, value, cell_type};
-
-  EXPECT_EQ(row, cell.Row());
-  EXPECT_EQ(family, cell.Family());
-  EXPECT_EQ(column, cell.Qualifier());
-  EXPECT_EQ(value, cell.Value());
-  EXPECT_EQ(timestamp, cell.Timestamp());
-  EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CellRowFamilyTest) {
-  std::string row = "only-row";
-  std::string family = "only-family";
-  std::string column = "";
-  std::string value = "";
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-  CellType cell_type = CellType::PUT;
-  Cell cell{row, family, column, timestamp, value, cell_type};
-
-  EXPECT_EQ(row, cell.Row());
-  EXPECT_EQ(family, cell.Family());
-  EXPECT_EQ(column, cell.Qualifier());
-  EXPECT_EQ(value, cell.Value());
-  EXPECT_EQ(timestamp, cell.Timestamp());
-  EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CellRowFamilyValueTest) {
-  std::string row = "only-row";
-  std::string family = "only-family";
-  std::string column = "";
-  std::string value = "only-value";
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-  CellType cell_type = CellType::PUT;
-
-  Cell cell{row, family, column, timestamp, value, cell_type};
-
-  EXPECT_EQ(row, cell.Row());
-  EXPECT_EQ(family, cell.Family());
-  EXPECT_EQ(column, cell.Qualifier());
-  EXPECT_EQ(value, cell.Value());
-  EXPECT_EQ(timestamp, cell.Timestamp());
-  EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CellRowFamilyColumnValueTest) {
-  std::string row = "only-row";
-  std::string family = "only-family";
-  std::string column = "only-column";
-  std::string value = "only-value";
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-  CellType cell_type = CellType::PUT;
-  Cell cell{row, family, column, timestamp, value, cell_type};
-
-  EXPECT_EQ(row, cell.Row());
-  EXPECT_EQ(family, cell.Family());
-  EXPECT_EQ(column, cell.Qualifier());
-  EXPECT_EQ(value, cell.Value());
-  EXPECT_EQ(timestamp, cell.Timestamp());
-  EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CellDebugString) {
-  CellType cell_type = CellType::PUT;
-  std::string row = "row";
-  std::string family = "family";
-  std::string column = "column";
-  std::string value = "value";
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-
-  Cell cell{row, family, column, timestamp, value, cell_type};
-  LOG(INFO) << cell.DebugString();
-  EXPECT_EQ("row/family:column/LATEST_TIMESTAMP/PUT/vlen=5/seqid=0", cell.DebugString());
-
-  Cell cell2{row, "", column, 42, value, CellType::DELETE};
-  LOG(INFO) << cell2.DebugString();
-  EXPECT_EQ("row/column/42/DELETE/vlen=5/seqid=0", cell2.DebugString());
-}
-
-TEST(CellTest, CellEstimatedSize) {
-  CellType cell_type = CellType::PUT;
-  int64_t timestamp = std::numeric_limits<int64_t>::max();
-
-  Cell empty{"a", "a", "", timestamp, "", cell_type};
-  Cell cell1{"aa", "a", "", timestamp, "", cell_type};
-  Cell cell2{"a", "aa", "", timestamp, "", cell_type};
-  Cell cell3{"a", "a", "a", timestamp, "", cell_type};
-  Cell cell4{"a", "a", "", timestamp, "a", cell_type};
-  Cell cell5{"a", "a", "", timestamp, "a", CellType::DELETE};
-  Cell cell6{"aaaaaa", "a", "", timestamp, "a", cell_type};
-
-  LOG(INFO) << empty.EstimatedSize();
-  LOG(INFO) << cell1.EstimatedSize();
-
-  EXPECT_TRUE(empty.EstimatedSize() > sizeof(Cell));
-  EXPECT_TRUE(cell1.EstimatedSize() > empty.EstimatedSize());
-  EXPECT_EQ(cell1.EstimatedSize(), cell2.EstimatedSize());
-  EXPECT_EQ(cell2.EstimatedSize(), cell3.EstimatedSize());
-  EXPECT_EQ(cell3.EstimatedSize(), cell4.EstimatedSize());
-  EXPECT_EQ(cell4.EstimatedSize(), cell5.EstimatedSize());
-  EXPECT_TRUE(cell6.EstimatedSize() > cell1.EstimatedSize());
-}