You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/09/15 21:20:40 UTC

[23/25] hbase git commit: HBASE-18725 [C++] Install header files as well as library

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/append.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/append.h b/hbase-native-client/core/append.h
deleted file mode 100644
index cf9ac24..0000000
--- a/hbase-native-client/core/append.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <cstdint>
-#include <map>
-#include <memory>
-#include <string>
-#include <vector>
-#include "core/cell.h"
-#include "core/mutation.h"
-
-namespace hbase {
-
-class Append : public Mutation {
- public:
-  /**
-   * Constructors
-   */
-  explicit Append(const std::string& row) : Mutation(row) {}
-  Append(const Append& cappend) : Mutation(cappend) {}
-  Append& operator=(const Append& cappend) {
-    Mutation::operator=(cappend);
-    return *this;
-  }
-
-  ~Append() = default;
-
-  /**
-   *  @brief Add the specified column and value to this Append operation.
-   *  @param family family name
-   *  @param qualifier column qualifier
-   *  @param value value to append
-   */
-  Append& Add(const std::string& family, const std::string& qualifier, const std::string& value);
-  Append& Add(std::unique_ptr<Cell> cell);
-};
-
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
deleted file mode 100644
index dfbf7e7..0000000
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-batch-rpc-retrying-caller.h"
-#include <glog/logging.h>
-#include <limits>
-
-using folly::Future;
-using folly::Promise;
-using folly::Try;
-using hbase::pb::ServerName;
-using hbase::pb::TableName;
-using hbase::security::User;
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-namespace hbase {
-
-template <typename REQ, typename RESP>
-AsyncBatchRpcRetryingCaller<REQ, RESP>::AsyncBatchRpcRetryingCaller(
-    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
-    std::shared_ptr<TableName> table_name, const std::vector<REQ> &actions, nanoseconds pause_ns,
-    int32_t max_attempts, nanoseconds operation_timeout_ns, nanoseconds rpc_timeout_ns,
-    int32_t start_log_errors_count)
-    : conn_(conn),
-      retry_timer_(retry_timer),
-      table_name_(table_name),
-      pause_ns_(pause_ns),
-      operation_timeout_ns_(operation_timeout_ns),
-      rpc_timeout_ns_(rpc_timeout_ns),
-      start_log_errors_count_(start_log_errors_count) {
-  CHECK(conn_ != nullptr);
-  CHECK(retry_timer_ != nullptr);
-  location_cache_ = conn_->region_locator();
-  rpc_client_ = conn_->rpc_client();
-  cpu_pool_ = conn_->cpu_executor();
-  CHECK(location_cache_ != nullptr);
-  CHECK(rpc_client_ != nullptr);
-  CHECK(cpu_pool_ != nullptr);
-
-  max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts);
-  uint32_t index = 0;
-  for (auto row : actions) {
-    actions_.push_back(std::make_shared<Action>(row, index));
-    Promise<RESP> prom{};
-    action2promises_.insert(std::pair<uint64_t, Promise<RESP>>(index, std::move(prom)));
-    action2futures_.push_back(action2promises_[index++].getFuture());
-  }
-}
-
-template <typename REQ, typename RESP>
-AsyncBatchRpcRetryingCaller<REQ, RESP>::~AsyncBatchRpcRetryingCaller() {}
-
-template <typename REQ, typename RESP>
-Future<std::vector<Try<RESP>>> AsyncBatchRpcRetryingCaller<REQ, RESP>::Call() {
-  GroupAndSend(actions_, 1);
-  return collectAll(action2futures_);
-}
-
-template <typename REQ, typename RESP>
-int64_t AsyncBatchRpcRetryingCaller<REQ, RESP>::RemainingTimeNs() {
-  return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
-    int32_t tries, std::shared_ptr<RegionRequest> region_request,
-    const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
-  if (tries > start_log_errors_count_) {
-    std::string regions;
-    regions += region_request->region_location()->region_name() + ", ";
-    LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
-                 << table_name_->qualifier() << " from " << server_name->host_name()
-                 << " failed, tries=" << tries << ":- " << ew.what().toStdString();
-  }
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
-    int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> &region_requests,
-    const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
-  if (tries > start_log_errors_count_) {
-    std::string regions;
-    for (const auto region_request : region_requests) {
-      regions += region_request->region_location()->region_name() + ", ";
-    }
-    LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
-                 << table_name_->qualifier() << " from " << server_name->host_name()
-                 << " failed, tries=" << tries << ew.what().toStdString();
-  }
-}
-
-template <typename REQ, typename RESP>
-const std::string AsyncBatchRpcRetryingCaller<REQ, RESP>::GetExtraContextForError(
-    std::shared_ptr<ServerName> server_name) {
-  return server_name ? server_name->ShortDebugString() : "";
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(const std::shared_ptr<Action> &action,
-                                                      const folly::exception_wrapper &ew,
-                                                      std::shared_ptr<ServerName> server_name) {
-  ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
-  AddAction2Error(action->original_index(), twec);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(
-    const std::vector<std::shared_ptr<Action>> &actions, const folly::exception_wrapper &ew,
-    std::shared_ptr<ServerName> server_name) {
-  for (const auto action : actions) {
-    AddError(action, ew, server_name);
-  }
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailOne(const std::shared_ptr<Action> &action,
-                                                     int32_t tries,
-                                                     const folly::exception_wrapper &ew,
-                                                     int64_t current_time,
-                                                     const std::string extras) {
-  auto action_index = action->original_index();
-  auto itr = action2promises_.find(action_index);
-  if (itr != action2promises_.end()) {
-    if (itr->second.isFulfilled()) {
-      return;
-    }
-  }
-  ThrowableWithExtraContext twec(ew, current_time, extras);
-  AddAction2Error(action_index, twec);
-  action2promises_[action_index].setException(
-      RetriesExhaustedException(tries - 1, action2errors_[action_index]));
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
-    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
-    const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
-  for (const auto action : actions) {
-    FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
-  }
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
-    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
-  for (const auto action : actions) {
-    auto action_index = action->original_index();
-    auto itr = action2promises_.find(action_index);
-    if (itr->second.isFulfilled()) {
-      return;
-    }
-    action2promises_[action_index].setException(
-        RetriesExhaustedException(tries, action2errors_[action_index]));
-  }
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddAction2Error(
-    uint64_t action_index, const ThrowableWithExtraContext &twec) {
-  auto erritr = action2errors_.find(action_index);
-  if (erritr != action2errors_.end()) {
-    erritr->second->push_back(twec);
-  } else {
-    action2errors_[action_index] = std::make_shared<std::vector<ThrowableWithExtraContext>>();
-    action2errors_[action_index]->push_back(twec);
-  }
-  return;
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnError(const ActionsByRegion &actions_by_region,
-                                                     int32_t tries,
-                                                     const folly::exception_wrapper &ew,
-                                                     std::shared_ptr<ServerName> server_name) {
-  std::vector<std::shared_ptr<Action>> copied_actions;
-  std::vector<std::shared_ptr<RegionRequest>> region_requests;
-  for (const auto &action_by_region : actions_by_region) {
-    region_requests.push_back(action_by_region.second);
-    for (const auto &action : action_by_region.second->actions()) {
-      copied_actions.push_back(action);
-    }
-  }
-
-  LogException(tries, region_requests, ew, server_name);
-  if ((tries >= max_attempts_) || !ExceptionUtil::ShouldRetry(ew)) {
-    FailAll(copied_actions, tries, ew, server_name);
-    return;
-  }
-  AddError(copied_actions, ew, server_name);
-  TryResubmit(copied_actions, tries);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::TryResubmit(
-    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
-  int64_t delay_ns;
-  if (operation_timeout_ns_.count() > 0) {
-    int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
-    if (max_delay_ns <= 0) {
-      FailAll(actions, tries);
-      return;
-    }
-    delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1));
-  } else {
-    delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1);
-  }
-
-  conn_->retry_executor()->add([=]() {
-    retry_timer_->scheduleTimeoutFn(
-        [=]() { conn_->cpu_executor()->add([=]() { GroupAndSend(actions, tries + 1); }); },
-        milliseconds(TimeUtil::ToMillis(delay_ns)));
-  });
-}
-
-template <typename REQ, typename RESP>
-Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
-AsyncBatchRpcRetryingCaller<REQ, RESP>::GetRegionLocations(
-    const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns) {
-  auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{};
-  for (auto const &action : actions) {
-    locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(),
-                                                 RegionLocateType::kCurrent, locate_timeout_ns));
-  }
-
-  return collectAll(locs);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend(
-    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
-  int64_t locate_timeout_ns;
-  if (operation_timeout_ns_.count() > 0) {
-    locate_timeout_ns = RemainingTimeNs();
-    if (locate_timeout_ns <= 0) {
-      FailAll(actions, tries);
-      return;
-    }
-  } else {
-    locate_timeout_ns = -1L;
-  }
-
-  GetRegionLocations(actions, locate_timeout_ns)
-      .then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
-        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
-        ActionsByServer actions_by_server;
-        std::vector<std::shared_ptr<Action>> locate_failed;
-
-        for (uint64_t i = 0; i < loc.size(); ++i) {
-          auto action = actions[i];
-          if (loc[i].hasValue()) {
-            auto region_loc = loc[i].value();
-            // Add it to actions_by_server;
-            auto search =
-                actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
-            if (search != actions_by_server.end()) {
-              search->second->AddActionsByRegion(region_loc, action);
-            } else {
-              auto server_request = std::make_shared<ServerRequest>(region_loc);
-              server_request->AddActionsByRegion(region_loc, action);
-              auto server_name = std::make_shared<ServerName>(region_loc->server_name());
-              actions_by_server[server_name] = server_request;
-            }
-            VLOG(5) << "rowkey [" << action->action()->row() << "] of table["
-                    << table_name_->ShortDebugString() << "] found in ["
-                    << region_loc->region_name() << "]; RS["
-                    << region_loc->server_name().host_name() << ":"
-                    << region_loc->server_name().port() << "];";
-          } else if (loc[i].hasException()) {
-            folly::exception_wrapper ew = loc[i].exception();
-            VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
-                    << "for index:" << i << "; tries: " << tries
-                    << "; max_attempts_: " << max_attempts_;
-            // We might receive runtime error from location-cache.cc too, we are doing FailOne and
-            // continue next one
-            if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
-              FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString());
-            } else {
-              AddError(action, loc[i].exception(), nullptr);
-              locate_failed.push_back(action);
-            }
-          }
-        }
-        if (!actions_by_server.empty()) {
-          Send(actions_by_server, tries);
-        }
-
-        if (!locate_failed.empty()) {
-          TryResubmit(locate_failed, tries);
-        }
-      })
-      .onError([=](const folly::exception_wrapper &ew) {
-        VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
-                << "tries: " << tries << "; max_attempts_: " << max_attempts_;
-        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
-        if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
-          FailAll(actions, tries, ew, nullptr);
-        } else {
-          TryResubmit(actions, tries);
-        }
-      });
-  return;
-}
-
-template <typename REQ, typename RESP>
-Future<std::vector<Try<std::unique_ptr<Response>>>>
-AsyncBatchRpcRetryingCaller<REQ, RESP>::GetMultiResponse(const ActionsByServer &actions_by_server) {
-  auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
-  auto user = User::defaultUser();
-  for (const auto &action_by_server : actions_by_server) {
-    std::unique_ptr<Request> multi_req =
-        RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region());
-    auto host = action_by_server.first->host_name();
-    int port = action_by_server.first->port();
-    multi_calls.push_back(
-        rpc_client_->AsyncCall(host, port, std::move(multi_req), user, "ClientService"));
-  }
-  return collectAll(multi_calls);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server,
-                                                  int32_t tries) {
-  int64_t remaining_ns;
-  if (operation_timeout_ns_.count() > 0) {
-    remaining_ns = RemainingTimeNs();
-    if (remaining_ns <= 0) {
-      std::vector<std::shared_ptr<Action>> failed_actions;
-      for (const auto &action_by_server : actions_by_server) {
-        for (auto &value : action_by_server.second->actions_by_region()) {
-          for (const auto &failed_action : value.second->actions()) {
-            failed_actions.push_back(failed_action);
-          }
-        }
-      }
-      FailAll(failed_actions, tries);
-      return;
-    }
-  } else {
-    remaining_ns = std::numeric_limits<int64_t>::max();
-  }
-
-  std::vector<std::shared_ptr<Request>> multi_reqv;
-  for (const auto &action_by_server : actions_by_server)
-    multi_reqv.push_back(
-        std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region())));
-
-  GetMultiResponse(actions_by_server)
-      .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
-        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
-        uint64_t num = 0;
-        for (const auto &action_by_server : actions_by_server) {
-          if (completed_responses[num].hasValue()) {
-            auto multi_response =
-                ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(),
-                                              action_by_server.second->actions_by_region());
-            OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first,
-                       std::move(multi_response));
-          } else if (completed_responses[num].hasException()) {
-            folly::exception_wrapper ew = completed_responses[num].exception();
-            VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString()
-                    << " from server for action index:" << num;
-            OnError(action_by_server.second->actions_by_region(), tries, ew,
-                    action_by_server.first);
-          }
-          num++;
-        }
-      })
-      .onError([=](const folly::exception_wrapper &ew) {
-        VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString();
-        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
-        for (const auto &action_by_server : actions_by_server) {
-          OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first);
-        }
-      });
-  return;
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
-    const ActionsByRegion &actions_by_region, int32_t tries,
-    const std::shared_ptr<ServerName> server_name,
-    const std::unique_ptr<hbase::MultiResponse> multi_response) {
-  std::vector<std::shared_ptr<Action>> failed_actions;
-  const auto region_results = multi_response->RegionResults();
-  for (const auto &action_by_region : actions_by_region) {
-    auto region_result_itr = region_results.find(action_by_region.first);
-    if (region_result_itr != region_results.end()) {
-      for (const auto &action : action_by_region.second->actions()) {
-        OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second,
-                   failed_actions);
-      }
-    } else if (region_result_itr == region_results.end()) {
-      auto region_exc = multi_response->RegionException(action_by_region.first);
-      if (region_exc == nullptr) {
-        // FailAll actions for this particular region as inconsistent server response. So we raise
-        // this exception to the application
-        std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
-                              " sent us neither results nor exceptions for " +
-                              action_by_region.first;
-        VLOG(1) << err_msg;
-        auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
-        FailAll(action_by_region.second->actions(), tries, ew, server_name);
-      } else {
-        // Eg: org.apache.hadoop.hbase.NotServingRegionException:
-        LogException(tries, action_by_region.second, *region_exc, server_name);
-        if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*region_exc)) {
-          FailAll(action_by_region.second->actions(), tries, *region_exc, server_name);
-          return;
-        }
-        location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
-                                              *region_exc);
-        AddError(action_by_region.second->actions(), *region_exc, server_name);
-        for (const auto &action : action_by_region.second->actions()) {
-          failed_actions.push_back(action);
-        }
-      }
-    }
-  }
-  if (!failed_actions.empty()) {
-    TryResubmit(failed_actions, tries);
-  }
-
-  return;
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
-    const std::shared_ptr<Action> &action, const std::shared_ptr<RegionRequest> &region_request,
-    int32_t tries, const std::shared_ptr<ServerName> &server_name,
-    const std::shared_ptr<RegionResult> &region_result,
-    std::vector<std::shared_ptr<Action>> &failed_actions) {
-  std::string err_msg;
-  try {
-    auto result_or_exc = region_result->ResultOrException(action->original_index());
-    auto result = std::get<0>(*result_or_exc);
-    auto exc = std::get<1>(*result_or_exc);
-    if (exc != nullptr) {
-      LogException(tries, region_request, *exc, server_name);
-      if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*exc)) {
-        FailOne(action, tries, *exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
-      } else {
-        failed_actions.push_back(action);
-      }
-    } else if (result != nullptr) {
-      action2promises_[action->original_index()].setValue(std::move(result));
-    } else {
-      std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
-                            " sent us neither results nor exceptions for request @ index " +
-                            std::to_string(action->original_index()) + ", row " +
-                            action->action()->row() + " of " +
-                            region_request->region_location()->region_name();
-      VLOG(1) << err_msg;
-      auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
-      AddError(action, ew, server_name);
-      failed_actions.push_back(action);
-    }
-  } catch (const std::out_of_range &oor) {
-    // This should never occur. Error in logic. Throwing std::runtime_error from here. Will be
-    // retried or failed
-    std::string err_msg = "ResultOrException not present @ index " +
-                          std::to_string(action->original_index()) + ", row " +
-                          action->action()->row() + " of " +
-                          region_request->region_location()->region_name();
-    throw std::runtime_error(err_msg);
-  }
-  return;
-}
-
-template class AsyncBatchRpcRetryingCaller<std::shared_ptr<hbase::Row>,
-                                           std::shared_ptr<hbase::Result>>;
-} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
deleted file mode 100644
index 9194b04..0000000
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/Format.h>
-#include <folly/Try.h>
-#include <folly/futures/Future.h>
-#include <folly/futures/Promise.h>
-#include <folly/io/IOBuf.h>
-#include <folly/io/async/HHWheelTimer.h>
-#include <wangle/concurrent/CPUThreadPoolExecutor.h>
-
-#include <algorithm>
-#include <chrono>
-#include <functional>
-#include <map>
-#include <memory>
-#include <mutex>
-#include <stdexcept>
-#include <string>
-#include <tuple>
-#include <type_traits>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "connection/rpc-client.h"
-#include "core/action.h"
-#include "core/async-connection.h"
-#include "core/location-cache.h"
-#include "core/multi-response.h"
-#include "core/region-location.h"
-#include "core/region-request.h"
-#include "core/region-result.h"
-#include "core/request-converter.h"
-#include "core/response-converter.h"
-#include "core/result.h"
-#include "core/row.h"
-#include "core/server-request.h"
-#include "exceptions/exception.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-#include "security/user.h"
-#include "utils/connection-util.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
-
-namespace hbase {
-/* Equals function for ServerName */
-struct ServerNameEquals {
-  bool operator()(const std::shared_ptr<pb::ServerName> &lhs,
-                  const std::shared_ptr<pb::ServerName> &rhs) const {
-    return (lhs->start_code() == rhs->start_code() && lhs->host_name() == rhs->host_name() &&
-            lhs->port() == rhs->port());
-  }
-};
-
-struct ServerNameHash {
-  /** hash */
-  std::size_t operator()(const std::shared_ptr<pb::ServerName> &sn) const {
-    std::size_t h = 0;
-    boost::hash_combine(h, sn->start_code());
-    boost::hash_combine(h, sn->host_name());
-    boost::hash_combine(h, sn->port());
-    return h;
-  }
-};
-
-template <typename REQ, typename RESP>
-class AsyncBatchRpcRetryingCaller {
- public:
-  using ActionsByServer =
-      std::unordered_map<std::shared_ptr<pb::ServerName>, std::shared_ptr<ServerRequest>,
-                         ServerNameHash, ServerNameEquals>;
-  using ActionsByRegion = ServerRequest::ActionsByRegion;
-
-  AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
-                              std::shared_ptr<folly::HHWheelTimer> retry_timer,
-                              std::shared_ptr<pb::TableName> table_name,
-                              const std::vector<REQ> &actions, std::chrono::nanoseconds pause_ns,
-                              int32_t max_attempts, std::chrono::nanoseconds operation_timeout_ns,
-                              std::chrono::nanoseconds rpc_timeout_ns,
-                              int32_t start_log_errors_count);
-
-  ~AsyncBatchRpcRetryingCaller();
-
-  folly::Future<std::vector<folly::Try<RESP>>> Call();
-
- private:
-  int64_t RemainingTimeNs();
-
-  void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request,
-                    const folly::exception_wrapper &ew,
-                    std::shared_ptr<pb::ServerName> server_name);
-
-  void LogException(int32_t tries,
-                    const std::vector<std::shared_ptr<RegionRequest>> &region_requests,
-                    const folly::exception_wrapper &ew,
-                    std::shared_ptr<pb::ServerName> server_name);
-
-  const std::string GetExtraContextForError(std::shared_ptr<pb::ServerName> server_name);
-
-  void AddError(const std::shared_ptr<Action> &action, const folly::exception_wrapper &ew,
-                std::shared_ptr<pb::ServerName> server_name);
-
-  void AddError(const std::vector<std::shared_ptr<Action>> &actions,
-                const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
-
-  void FailOne(const std::shared_ptr<Action> &action, int32_t tries,
-               const folly::exception_wrapper &ew, int64_t current_time, const std::string extras);
-
-  void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
-               const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
-
-  void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
-
-  void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec);
-
-  void OnError(const ActionsByRegion &actions_by_region, int32_t tries,
-               const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
-
-  void TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
-
-  folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations(
-      const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns);
-
-  void GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
-
-  folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse(
-      const ActionsByServer &actions_by_server);
-
-  void Send(const ActionsByServer &actions_by_server, int32_t tries);
-
-  void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries,
-                  const std::shared_ptr<pb::ServerName> server_name,
-                  const std::unique_ptr<MultiResponse> multi_results);
-
-  void OnComplete(const std::shared_ptr<Action> &action,
-                  const std::shared_ptr<RegionRequest> &region_request, int32_t tries,
-                  const std::shared_ptr<pb::ServerName> &server_name,
-                  const std::shared_ptr<RegionResult> &region_result,
-                  std::vector<std::shared_ptr<Action>> &failed_actions);
-
- private:
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<hbase::AsyncConnection> conn_;
-  std::shared_ptr<pb::TableName> table_name_;
-  std::vector<std::shared_ptr<Action>> actions_;
-  std::chrono::nanoseconds pause_ns_;
-  int32_t max_attempts_ = 0;
-  std::chrono::nanoseconds operation_timeout_ns_;
-  std::chrono::nanoseconds rpc_timeout_ns_;
-  int32_t start_log_errors_count_ = 0;
-
-  int64_t start_ns_ = TimeUtil::GetNowNanos();
-  int32_t tries_ = 1;
-  std::map<uint64_t, folly::Promise<RESP>> action2promises_;
-  std::vector<folly::Future<RESP>> action2futures_;
-  std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_;
-
-  std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr;
-  std::shared_ptr<RpcClient> rpc_client_ = nullptr;
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr;
-
-  std::recursive_mutex multi_mutex_;
-};
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
deleted file mode 100644
index 00cf2b8..0000000
--- a/hbase-native-client/core/async-batch-rpc-retrying-test.cc
+++ /dev/null
@@ -1,577 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <folly/Logging.h>
-#include <folly/Memory.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/ScopedEventBaseThread.h>
-#include <gtest/gtest.h>
-#include <wangle/concurrent/IOThreadPoolExecutor.h>
-
-#include <chrono>
-#include <functional>
-#include <string>
-
-#include "connection/rpc-client.h"
-#include "core/async-batch-rpc-retrying-caller.h"
-#include "core/async-connection.h"
-#include "core/async-rpc-retrying-caller-factory.h"
-#include "core/client.h"
-#include "core/connection-configuration.h"
-#include "core/keyvalue-codec.h"
-#include "core/region-location.h"
-#include "core/result.h"
-#include "exceptions/exception.h"
-#include "test-util/test-util.h"
-#include "utils/time-util.h"
-
-using hbase::AsyncRpcRetryingCallerFactory;
-using hbase::AsyncConnection;
-using hbase::AsyncRegionLocator;
-using hbase::ConnectionConfiguration;
-using hbase::Configuration;
-using hbase::HBaseRpcController;
-using hbase::RegionLocation;
-using hbase::RegionLocateType;
-using hbase::RpcClient;
-using hbase::RequestConverter;
-using hbase::ResponseConverter;
-using hbase::Put;
-using hbase::TimeUtil;
-using hbase::Client;
-using hbase::security::User;
-
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-using namespace hbase;
-
-using folly::exception_wrapper;
-
-class AsyncBatchRpcRetryTest : public ::testing::Test {
- public:
-  static std::unique_ptr<hbase::TestUtil> test_util;
-  static std::string tableName;
-
-  static void SetUpTestCase() {
-    google::InstallFailureSignalHandler();
-    test_util = std::make_unique<hbase::TestUtil>();
-    test_util->StartMiniCluster(2);
-    std::vector<std::string> keys{"test0",   "test100", "test200", "test300", "test400",
-                                  "test500", "test600", "test700", "test800", "test900"};
-    tableName = "split-table1";
-    test_util->CreateTable(tableName, "d", keys);
-  }
-};
-std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr;
-std::string AsyncBatchRpcRetryTest::tableName;
-
-class AsyncRegionLocatorBase : public AsyncRegionLocator {
- public:
-  AsyncRegionLocatorBase() {}
-  explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
-      : region_location_(region_location) {}
-  virtual ~AsyncRegionLocatorBase() = default;
-
-  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
-                                                                     const std::string &row,
-                                                                     const RegionLocateType,
-                                                                     const int64_t) override {
-    folly::Promise<std::shared_ptr<RegionLocation>> promise;
-    promise.setValue(region_locations_.at(row));
-    return promise.getFuture();
-  }
-
-  virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
-    region_location_ = region_location;
-  }
-
-  virtual void set_region_location(
-      const std::map<std::string, std::shared_ptr<RegionLocation>> &reg_locs) {
-    for (auto reg_loc : reg_locs) {
-      region_locations_[reg_loc.first] = reg_loc.second;
-    }
-  }
-
-  void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override {
-  }
-
- protected:
-  std::shared_ptr<RegionLocation> region_location_;
-  std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_;
-  std::map<std::string, uint32_t> mtries_;
-  std::map<std::string, uint32_t> mnum_fails_;
-
-  void InitRetryMaps(uint32_t num_fails) {
-    if (mtries_.size() == 0 && mnum_fails_.size() == 0) {
-      for (auto reg_loc : region_locations_) {
-        mtries_[reg_loc.first] = 0;
-        mnum_fails_[reg_loc.first] = num_fails;
-      }
-    }
-  }
-};
-
-class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
- public:
-  MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
-  explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
-      : AsyncRegionLocatorBase(region_location) {}
-  virtual ~MockAsyncRegionLocator() {}
-};
-
-class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
- private:
-  uint32_t counter_ = 0;
-  uint32_t num_fails_ = 0;
-  uint32_t tries_ = 0;
-
- public:
-  explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
-      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
-  explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
-      : AsyncRegionLocatorBase(region_location) {}
-  virtual ~MockWrongRegionAsyncRegionLocator() {}
-
-  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
-      const hbase::pb::TableName &tn, const std::string &row,
-      const RegionLocateType locate_type = RegionLocateType::kCurrent,
-      const int64_t locate_ns = 0) override {
-    InitRetryMaps(num_fails_);
-    auto &tries = mtries_[row];
-    auto &num_fails = mnum_fails_[row];
-    if (++tries > num_fails) {
-      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
-    }
-
-    folly::Promise<std::shared_ptr<RegionLocation>> promise;
-    /* set random region name, simulating invalid region */
-    auto result = std::make_shared<RegionLocation>("whatever-region-name",
-                                                   region_locations_.at(row)->region_info(),
-                                                   region_locations_.at(row)->server_name());
-    promise.setValue(result);
-    return promise.getFuture();
-  }
-};
-
-class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
- private:
-  uint32_t tries_ = 0;
-  uint32_t num_fails_ = 0;
-  uint32_t counter_ = 0;
-
- public:
-  explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
-      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
-  explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
-      : AsyncRegionLocatorBase(region_location) {}
-  virtual ~MockFailingAsyncRegionLocator() {}
-  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
-      const hbase::pb::TableName &tn, const std::string &row,
-      const RegionLocateType locate_type = RegionLocateType::kCurrent,
-      const int64_t locate_ns = 0) override {
-    InitRetryMaps(num_fails_);
-    auto &tries = mtries_[row];
-    auto &num_fails = mnum_fails_[row];
-    if (++tries > num_fails) {
-      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
-    }
-
-    folly::Promise<std::shared_ptr<RegionLocation>> promise;
-    promise.setException(std::runtime_error{"Failed to look up region location"});
-    return promise.getFuture();
-  }
-};
-
-class MockAsyncConnection : public AsyncConnection,
-                            public std::enable_shared_from_this<MockAsyncConnection> {
- public:
-  MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
-                      std::shared_ptr<folly::HHWheelTimer> retry_timer,
-                      std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
-                      std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
-                      std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
-                      std::shared_ptr<RpcClient> rpc_client,
-                      std::shared_ptr<AsyncRegionLocator> region_locator)
-      : conn_conf_(conn_conf),
-        retry_timer_(retry_timer),
-        cpu_executor_(cpu_executor),
-        io_executor_(io_executor),
-        retry_executor_(retry_executor),
-        rpc_client_(rpc_client),
-        region_locator_(region_locator) {}
-  ~MockAsyncConnection() {}
-  void Init() {
-    caller_factory_ =
-        std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
-  }
-
-  std::shared_ptr<Configuration> conf() override { return nullptr; }
-  std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
-  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
-    return caller_factory_;
-  }
-  std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
-  std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
-  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
-    return retry_executor_;
-  }
-
-  void Close() override {
-    retry_timer_->destroy();
-    retry_executor_->stop();
-    io_executor_->stop();
-    cpu_executor_->stop();
-  }
-  std::shared_ptr<HBaseRpcController> CreateRpcController() override {
-    return std::make_shared<HBaseRpcController>();
-  }
-
- private:
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<ConnectionConfiguration> conn_conf_;
-  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
-  std::shared_ptr<RpcClient> rpc_client_;
-  std::shared_ptr<AsyncRegionLocator> region_locator_;
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
-  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
-};
-
-class MockRawAsyncTableImpl {
- public:
-  explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn,
-                                 std::shared_ptr<hbase::pb::TableName> tn)
-      : conn_(conn), tn_(tn) {}
-  virtual ~MockRawAsyncTableImpl() = default;
-
-  /* implement this in real RawAsyncTableImpl. */
-  template <typename REQ, typename RESP>
-  folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) {
-    /* init request caller builder */
-    auto builder = conn_->caller_factory()->Batch<REQ, RESP>();
-
-    /* call with retry to get result */
-    auto async_caller =
-        builder->table(tn_)
-            ->actions(std::make_shared<std::vector<REQ>>(rows))
-            ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
-            ->operation_timeout(conn_->connection_conf()->operation_timeout())
-            ->pause(conn_->connection_conf()->pause())
-            ->max_attempts(conn_->connection_conf()->max_retries())
-            ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count())
-            ->Build();
-
-    return async_caller->Call().then([async_caller](auto r) { return r; });
-  }
-
- private:
-  std::shared_ptr<MockAsyncConnection> conn_;
-  std::shared_ptr<hbase::pb::TableName> tn_;
-};
-
-std::shared_ptr<MockAsyncConnection> getAsyncConnection(
-    Client &client, uint32_t operation_timeout_millis, uint32_t tries,
-    std::shared_ptr<AsyncRegionLocatorBase> region_locator) {
-  /* init region location and rpc channel */
-  auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
-  auto io_executor_ = client.async_connection()->io_executor();
-  auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
-  auto codec = std::make_shared<hbase::KeyValueCodec>();
-  auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
-                                                AsyncBatchRpcRetryTest::test_util->conf());
-  std::shared_ptr<folly::HHWheelTimer> retry_timer =
-      folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
-
-  /* init connection configuration */
-  auto connection_conf = std::make_shared<ConnectionConfiguration>(
-      TimeUtil::SecondsToNanos(20),                       // connect_timeout
-      TimeUtil::MillisToNanos(operation_timeout_millis),  // operation_timeout
-      TimeUtil::SecondsToNanos(60),                       // rpc_timeout
-      TimeUtil::MillisToNanos(100),                       // pause
-      tries,                                              // max retries
-      1);                                                 // start log errors count
-
-  return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
-                                               io_executor_, retry_executor_, rpc_client,
-                                               region_locator);
-}
-
-template <typename ACTION>
-std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) {
-  std::vector<std::shared_ptr<hbase::Row>> rows;
-  for (auto action : actions) {
-    std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action);
-    rows.push_back(srow);
-  }
-  return rows;
-}
-
-template <typename REQ, typename RESP>
-std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions,
-                                                       std::vector<folly::Try<RESP>> &tresults) {
-  std::vector<std::shared_ptr<hbase::Result>> results{};
-  uint64_t num = 0;
-  for (auto tresult : tresults) {
-    if (tresult.hasValue()) {
-      results.push_back(tresult.value());
-    } else if (tresult.hasException()) {
-      folly::exception_wrapper ew = tresult.exception();
-      LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for "
-                 << actions[num].row();
-      throw ew;
-    }
-    ++num;
-  }
-  return results;
-}
-
-template <typename ACTION>
-std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions(
-    uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) {
-  std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
-  for (uint64_t i = 0; i < num_rows; ++i) {
-    auto row = "test" + std::to_string(i);
-    ACTION action(row);
-    actions.push_back(action);
-    region_locations[row] = table->GetRegionLocation(row);
-  }
-  return region_locations;
-}
-
-void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
-                  const std::string &table_name, bool split_regions, uint32_t tries = 3,
-                  uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) {
-  // Create TableName and Row to be fetched from HBase
-  auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
-
-  // Create a client
-  Client client(*AsyncBatchRpcRetryTest::test_util->conf());
-
-  // Get connection to HBase Table
-  std::shared_ptr<Table> table = client.Table(tn);
-
-  for (uint64_t i = 0; i < num_rows; i++) {
-    table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
-                                                         "value" + std::to_string(i)));
-  }
-  std::vector<hbase::Get> gets;
-  auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table);
-
-  /* set region locator */
-  region_locator->set_region_location(region_locations);
-
-  /* init hbase client connection */
-  auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
-  conn->Init();
-
-  /* init retry caller factory */
-  auto tableImpl =
-      std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
-
-  std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets);
-  auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
-      milliseconds(operation_timeout_millis));
-  ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
-
-  auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults);
-  // Test the values, should be same as in put executed on hbase shell
-  ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
-  uint32_t i = 0;
-  for (; i < num_rows; ++i) {
-    ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
-                                        << " must not be empty";
-    EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
-    EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
-  }
-
-  table->Close();
-  client.Close();
-  conn->Close();
-}
-
-void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
-                  const std::string &table_name, bool split_regions, uint32_t tries = 3,
-                  uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
-  // Create TableName and Row to be fetched from HBase
-  auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
-
-  // Create a client
-  Client client(*AsyncBatchRpcRetryTest::test_util->conf());
-
-  // Get connection to HBase Table
-  std::shared_ptr<Table> table = client.Table(tn);
-
-  std::vector<hbase::Put> puts;
-  auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table);
-
-  /* set region locator */
-  region_locator->set_region_location(region_locations);
-
-  /* init hbase client connection */
-  auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
-  conn->Init();
-
-  /* init retry caller factory */
-  auto tableImpl =
-      std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
-
-  std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts);
-  auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
-      milliseconds(operation_timeout_millis));
-  ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
-
-  auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults);
-  // Test the values, should be same as in put executed on hbase shell
-  ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
-
-  table->Close();
-  client.Close();
-  conn->Close();
-}
-
-// Test successful case
-TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockAsyncRegionLocator>());
-  runMultiGets(region_locator, "table1", false);
-}
-
-// Tests the RPC failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, HandleException) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
-  runMultiGets(region_locator, "table2", false, 5);
-}
-
-// Tests the RPC failing 4 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
-  EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false));
-}
-
-// Tests the region location lookup failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(3));
-  runMultiGets(region_locator, "table4", false);
-}
-
-// Tests the region location lookup failing 5 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(4));
-  EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3));
-}
-
-// Tests hitting operation timeout, thus not retrying anymore
-TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(6));
-  EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000));
-}
-
-//////////////////////
-// Test successful case
-TEST_F(AsyncBatchRpcRetryTest, MultiPuts) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockAsyncRegionLocator>());
-  runMultiPuts(region_locator, "table1", false);
-}
-
-// Tests the RPC failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
-  runMultiPuts(region_locator, "table2", false, 5);
-}
-
-// Tests the RPC failing 4 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
-  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false));
-}
-
-// Tests the region location lookup failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(3));
-  runMultiPuts(region_locator, "table4", false);
-}
-
-// Tests the region location lookup failing 5 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(4));
-  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3));
-}
-
-// Tests hitting operation timeout, thus not retrying anymore
-TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) {
-  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
-      std::make_shared<MockFailingAsyncRegionLocator>(6));
-  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000));
-}
-
- // Test successful case
- TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockAsyncRegionLocator>());
- runMultiGets(region_locator, "table7", true);
- }
-
- // Tests the RPC failing 3 times, then succeeding
- TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
- runMultiGets(region_locator, "table8", true, 5);
- }
-
- // Tests the RPC failing 4 times, throwing an exception
- TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true));
- }
-
- // Tests the region location lookup failing 3 times, then succeeding
- TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(3));
- runMultiGets(region_locator, "table10", true);
- }
-
- // Tests the region location lookup failing 5 times, throwing an exception
- TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3));
- }
-
- // Tests hitting operation timeout, thus not retrying anymore
- TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(6));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000));
- }

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-client-scanner.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-client-scanner.cc b/hbase-native-client/core/async-client-scanner.cc
deleted file mode 100644
index 720ab25..0000000
--- a/hbase-native-client/core/async-client-scanner.cc
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-client-scanner.h"
-
-#include <algorithm>
-#include <iterator>
-#include <limits>
-#include <stdexcept>
-
-namespace hbase {
-
-AsyncClientScanner::AsyncClientScanner(
-    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
-    std::shared_ptr<pb::TableName> table_name, std::shared_ptr<RawScanResultConsumer> consumer,
-    nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
-    nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
-    : conn_(conn),
-      scan_(scan),
-      table_name_(table_name),
-      consumer_(consumer),
-      pause_(pause),
-      max_retries_(max_retries),
-      scan_timeout_nanos_(scan_timeout_nanos),
-      rpc_timeout_nanos_(rpc_timeout_nanos),
-      start_log_errors_count_(start_log_errors_count) {
-  results_cache_ = std::make_shared<ScanResultCache>();
-  max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
-}
-
-void AsyncClientScanner::Start() { OpenScanner(); }
-
-folly::Future<std::shared_ptr<OpenScannerResponse>> AsyncClientScanner::CallOpenScanner(
-    std::shared_ptr<hbase::RpcClient> rpc_client,
-    std::shared_ptr<hbase::HBaseRpcController> controller,
-    std::shared_ptr<hbase::RegionLocation> loc) {
-  open_scanner_tries_++;
-
-  auto preq = RequestConverter::ToScanRequest(*scan_, loc->region_name(), scan_->Caching(), false);
-
-  auto self(shared_from_this());
-  VLOG(5) << "Calling RPC Client to open the scanner for region:" << loc->DebugString();
-  return rpc_client
-      ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq),
-                  security::User::defaultUser(), "ClientService")
-      .then([self, loc, controller, rpc_client](const std::unique_ptr<Response>& presp) {
-        VLOG(5) << "Scan Response:" << presp->DebugString();
-        return std::make_shared<OpenScannerResponse>(rpc_client, presp, loc, controller);
-      });
-}
-
-void AsyncClientScanner::OpenScanner() {
-  auto self(shared_from_this());
-  open_scanner_tries_ = 1;
-
-  auto caller = conn_->caller_factory()
-                    ->Single<std::shared_ptr<OpenScannerResponse>>()
-                    ->table(table_name_)
-                    ->row(scan_->StartRow())
-                    ->locate_type(GetLocateType(*scan_))
-                    ->rpc_timeout(rpc_timeout_nanos_)
-                    ->operation_timeout(scan_timeout_nanos_)
-                    ->pause(pause_)
-                    ->max_retries(max_retries_)
-                    ->start_log_errors_count(start_log_errors_count_)
-                    ->action([&](std::shared_ptr<hbase::HBaseRpcController> controller,
-                                 std::shared_ptr<hbase::RegionLocation> loc,
-                                 std::shared_ptr<hbase::RpcClient> rpc_client)
-                                 -> folly::Future<std::shared_ptr<OpenScannerResponse>> {
-                                   return CallOpenScanner(rpc_client, controller, loc);
-                                 })
-                    ->Build();
-
-  caller->Call()
-      .then([this, self](std::shared_ptr<OpenScannerResponse> resp) {
-        VLOG(3) << "Opened scanner with id:" << resp->scan_resp_->scanner_id()
-                << ", region:" << resp->region_location_->DebugString() << ", starting scan";
-        StartScan(resp);
-      })
-      .onError([this, self](const folly::exception_wrapper& e) {
-        VLOG(3) << "Open scan request received error:" << e.what();
-        consumer_->OnError(e);
-      })
-      .then([caller, self](const auto r) { return r; });
-}
-
-void AsyncClientScanner::StartScan(std::shared_ptr<OpenScannerResponse> resp) {
-  auto self(shared_from_this());
-  auto caller = conn_->caller_factory()
-                    ->Scan()
-                    ->scanner_id(resp->scan_resp_->scanner_id())
-                    ->region_location(resp->region_location_)
-                    ->scanner_lease_timeout(TimeUtil::MillisToNanos(resp->scan_resp_->ttl()))
-                    ->scan(scan_)
-                    ->rpc_client(resp->rpc_client_)
-                    ->consumer(consumer_)
-                    ->results_cache(results_cache_)
-                    ->rpc_timeout(rpc_timeout_nanos_)
-                    ->scan_timeout(scan_timeout_nanos_)
-                    ->pause(pause_)
-                    ->max_retries(max_retries_)
-                    ->start_log_errors_count(start_log_errors_count_)
-                    ->Build();
-
-  caller->Start(resp->controller_, resp->scan_resp_, resp->cell_scanner_)
-      .then([caller, self](const bool has_more) {
-        if (has_more) {
-          // open the next scanner on the next region.
-          self->OpenScanner();
-        } else {
-          self->consumer_->OnComplete();
-        }
-      })
-      .onError([caller, self](const folly::exception_wrapper& e) { self->consumer_->OnError(e); })
-      .then([caller, self](const auto r) { return r; });
-}
-
-RegionLocateType AsyncClientScanner::GetLocateType(const Scan& scan) {
-  // TODO: In C++, there is no Scan::IncludeStartRow() and Scan::IncludeStopRow().
-  // When added, this method should be modified to return other RegionLocateTypes
-  // (see ConnectionUtils.java #getLocateType())
-  // TODO: When reversed scans are implemented, return other RegionLocateTypes
-  return RegionLocateType::kCurrent;
-}
-
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-client-scanner.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-client-scanner.h b/hbase-native-client/core/async-client-scanner.h
deleted file mode 100644
index 8663468..0000000
--- a/hbase-native-client/core/async-client-scanner.h
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Format.h>
-#include <folly/Logging.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/HHWheelTimer.h>
-
-#include <algorithm>
-#include <chrono>
-#include <functional>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "connection/rpc-client.h"
-#include "core/async-connection.h"
-#include "core/async-rpc-retrying-caller-factory.h"
-#include "core/async-rpc-retrying-caller.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/raw-scan-result-consumer.h"
-#include "core/region-location.h"
-#include "core/request-converter.h"
-#include "core/response-converter.h"
-#include "core/result.h"
-#include "core/scan-result-cache.h"
-#include "core/scan.h"
-#include "exceptions/exception.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-#include "utils/connection-util.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
-
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-namespace hbase {
-class OpenScannerResponse {
- public:
-  OpenScannerResponse(std::shared_ptr<hbase::RpcClient> rpc_client,
-                      const std::unique_ptr<Response>& resp,
-                      std::shared_ptr<RegionLocation> region_location,
-                      std::shared_ptr<hbase::HBaseRpcController> controller)
-      : rpc_client_(rpc_client), region_location_(region_location), controller_(controller) {
-    scan_resp_ = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
-    cell_scanner_ = resp->cell_scanner();
-  }
-  std::shared_ptr<hbase::RpcClient> rpc_client_;
-  std::shared_ptr<pb::ScanResponse> scan_resp_;
-  std::shared_ptr<RegionLocation> region_location_;
-  std::shared_ptr<hbase::HBaseRpcController> controller_;
-  std::shared_ptr<CellScanner> cell_scanner_;
-};
-
-class AsyncClientScanner : public std::enable_shared_from_this<AsyncClientScanner> {
- public:
-  template <typename... T>
-  static std::shared_ptr<AsyncClientScanner> Create(T&&... all) {
-    return std::shared_ptr<AsyncClientScanner>(new AsyncClientScanner(std::forward<T>(all)...));
-  }
-
-  void Start();
-
- private:
-  // methods
-  AsyncClientScanner(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
-                     std::shared_ptr<pb::TableName> table_name,
-                     std::shared_ptr<RawScanResultConsumer> consumer, nanoseconds pause,
-                     uint32_t max_retries, nanoseconds scan_timeout_nanos,
-                     nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
-
-  folly::Future<std::shared_ptr<OpenScannerResponse>> CallOpenScanner(
-      std::shared_ptr<hbase::RpcClient> rpc_client,
-      std::shared_ptr<hbase::HBaseRpcController> controller,
-      std::shared_ptr<hbase::RegionLocation> loc);
-
-  void OpenScanner();
-
-  void StartScan(std::shared_ptr<OpenScannerResponse> resp);
-
-  RegionLocateType GetLocateType(const Scan& scan);
-
- private:
-  // data
-  std::shared_ptr<AsyncConnection> conn_;
-  std::shared_ptr<Scan> scan_;
-  std::shared_ptr<pb::TableName> table_name_;
-  std::shared_ptr<ScanResultCache> results_cache_;
-  std::shared_ptr<RawScanResultConsumer> consumer_;
-  nanoseconds pause_;
-  uint32_t max_retries_;
-  nanoseconds scan_timeout_nanos_;
-  nanoseconds rpc_timeout_nanos_;
-  uint32_t start_log_errors_count_;
-  uint32_t max_attempts_;
-  uint32_t open_scanner_tries_ = 0;
-};
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-connection.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc
deleted file mode 100644
index 850fb8f..0000000
--- a/hbase-native-client/core/async-connection.cc
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-connection.h"
-#include "core/async-rpc-retrying-caller-factory.h"
-
-namespace hbase {
-
-void AsyncConnectionImpl::Init() {
-  connection_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
-  // start thread pools
-  auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
-  auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
-  cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
-  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
-  /*
-   * We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly.
-   * Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments
-   * in async-rpc-retrying-caller.cc.
-   */
-  retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
-  retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
-
-  std::shared_ptr<Codec> codec = nullptr;
-  if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
-      std::string(KeyValueCodec::kJavaClassName)) {
-    codec = std::make_shared<hbase::KeyValueCodec>();
-  } else {
-    LOG(WARNING) << "Not using RPC Cell Codec";
-  }
-  rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, cpu_executor_, codec, conf_,
-                                                   connection_conf_->connect_timeout());
-  location_cache_ = std::make_shared<hbase::LocationCache>(conf_, io_executor_, cpu_executor_,
-                                                           rpc_client_->connection_pool());
-  caller_factory_ =
-      std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
-}
-
-// We can't have the threads continue running after everything is done
-// that leads to an error.
-AsyncConnectionImpl::~AsyncConnectionImpl() { Close(); }
-
-void AsyncConnectionImpl::Close() {
-  if (is_closed_) return;
-
-  cpu_executor_->stop();
-  io_executor_->stop();
-  retry_executor_->stop();
-  retry_timer_->destroy();
-  if (rpc_client_.get()) rpc_client_->Close();
-  is_closed_ = true;
-}
-
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-connection.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h
deleted file mode 100644
index 7b260a5..0000000
--- a/hbase-native-client/core/async-connection.h
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/futures/Future.h>
-#include <folly/io/IOBuf.h>
-#include <wangle/concurrent/CPUThreadPoolExecutor.h>
-#include <wangle/concurrent/IOThreadPoolExecutor.h>
-
-#include <memory>
-#include <string>
-#include <utility>
-
-#include "connection/rpc-client.h"
-#include "core/async-region-locator.h"
-#include "core/configuration.h"
-#include "core/connection-configuration.h"
-#include "core/hbase-configuration-loader.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/keyvalue-codec.h"
-#include "core/location-cache.h"
-#include "if/Cell.pb.h"
-#include "serde/table-name.h"
-
-namespace hbase {
-
-class AsyncRpcRetryingCallerFactory;
-
-class AsyncConnection {
- public:
-  AsyncConnection() {}
-  virtual ~AsyncConnection() {}
-  virtual std::shared_ptr<Configuration> conf() = 0;
-  virtual std::shared_ptr<ConnectionConfiguration> connection_conf() = 0;
-  virtual std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() = 0;
-  virtual std::shared_ptr<RpcClient> rpc_client() = 0;
-  virtual std::shared_ptr<AsyncRegionLocator> region_locator() = 0;
-  virtual std::shared_ptr<HBaseRpcController> CreateRpcController() = 0;
-  virtual std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() = 0;
-  virtual std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() = 0;
-  virtual std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() = 0;
-  virtual void Close() = 0;
-};
-
-class AsyncConnectionImpl : public AsyncConnection,
-                            public std::enable_shared_from_this<AsyncConnectionImpl> {
- public:
-  virtual ~AsyncConnectionImpl();
-
-  // See https://mortoray.com/2013/08/02/safely-using-enable_shared_from_this/
-  template <typename... T>
-  static std::shared_ptr<AsyncConnectionImpl> Create(T&&... all) {
-    auto conn =
-        std::shared_ptr<AsyncConnectionImpl>(new AsyncConnectionImpl(std::forward<T>(all)...));
-    conn->Init();
-    return conn;
-  }
-
-  std::shared_ptr<Configuration> conf() override { return conf_; }
-  std::shared_ptr<ConnectionConfiguration> connection_conf() override { return connection_conf_; }
-  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
-    return caller_factory_;
-  }
-  std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
-  std::shared_ptr<LocationCache> location_cache() { return location_cache_; }
-  std::shared_ptr<AsyncRegionLocator> region_locator() override { return location_cache_; }
-  std::shared_ptr<HBaseRpcController> CreateRpcController() override {
-    return std::make_shared<HBaseRpcController>();
-  }
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
-  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
-    return retry_executor_;
-  }
-
-  void Close() override;
-
- protected:
-  AsyncConnectionImpl() {}
-
- private:
-  /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */
-  static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size";
-  /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
-  static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size";
-  /** The RPC codec to encode cells. For now it is KeyValueCodec */
-  static constexpr const char* kRpcCodec = "hbase.client.rpc.codec";
-
-  std::shared_ptr<Configuration> conf_;
-  std::shared_ptr<ConnectionConfiguration> connection_conf_;
-  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
-  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
-  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
-  std::shared_ptr<LocationCache> location_cache_;
-  std::shared_ptr<RpcClient> rpc_client_;
-  bool is_closed_ = false;
-
- private:
-  explicit AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {}
-  void Init();
-};
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-region-locator.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-region-locator.h b/hbase-native-client/core/async-region-locator.h
deleted file mode 100644
index f75cb7e..0000000
--- a/hbase-native-client/core/async-region-locator.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/futures/Future.h>
-#include <memory>
-#include <string>
-
-#include "core/region-location.h"
-#include "if/Client.pb.h"
-#include "serde/region-info.h"
-#include "serde/server-name.h"
-#include "serde/table-name.h"
-
-namespace hbase {
-
-class AsyncRegionLocator {
- public:
-  AsyncRegionLocator() {}
-  virtual ~AsyncRegionLocator() = default;
-
-  /**
-   * The only method clients should use for meta lookups. If corresponding
-   * location is cached, it's returned from the cache, otherwise lookup
-   * in meta table is done, location is cached and then returned.
-   * It's expected that tiny fraction of invocations incurs meta scan.
-   * This method is to look up non-meta regions; use LocateMeta() to get the
-   * location of hbase:meta region.
-   *
-   * @param tn Table name of the table to look up. This object must live until
-   * after the future is returned
-   *
-   * @param row of the table to look up. This object must live until after the
-   * future is returned
-   */
-  virtual folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(
-      const hbase::pb::TableName &tn, const std::string &row,
-      const RegionLocateType locate_type = RegionLocateType::kCurrent,
-      const int64_t locate_ns = 0) = 0;
-  /**
-   * Update cached region location, possibly using the information from exception.
-   */
-  virtual void UpdateCachedLocation(const RegionLocation &loc,
-                                    const folly::exception_wrapper &error) = 0;
-};
-
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
deleted file mode 100644
index 0ac9cac..0000000
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-rpc-retrying-caller-factory.h"
-
-namespace hbase {}  // namespace hbase