You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/09/15 21:20:40 UTC
[23/25] hbase git commit: HBASE-18725 [C++] Install header files as
well as library
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/append.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/append.h b/hbase-native-client/core/append.h
deleted file mode 100644
index cf9ac24..0000000
--- a/hbase-native-client/core/append.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <cstdint>
-#include <map>
-#include <memory>
-#include <string>
-#include <vector>
-#include "core/cell.h"
-#include "core/mutation.h"
-
-namespace hbase {
-
-class Append : public Mutation {
- public:
- /**
- * Constructors
- */
- explicit Append(const std::string& row) : Mutation(row) {}
- Append(const Append& cappend) : Mutation(cappend) {}
- Append& operator=(const Append& cappend) {
- Mutation::operator=(cappend);
- return *this;
- }
-
- ~Append() = default;
-
- /**
- * @brief Add the specified column and value to this Append operation.
- * @param family family name
- * @param qualifier column qualifier
- * @param value value to append
- */
- Append& Add(const std::string& family, const std::string& qualifier, const std::string& value);
- Append& Add(std::unique_ptr<Cell> cell);
-};
-
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
deleted file mode 100644
index dfbf7e7..0000000
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-batch-rpc-retrying-caller.h"
-#include <glog/logging.h>
-#include <limits>
-
-using folly::Future;
-using folly::Promise;
-using folly::Try;
-using hbase::pb::ServerName;
-using hbase::pb::TableName;
-using hbase::security::User;
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-namespace hbase {
-
-template <typename REQ, typename RESP>
-AsyncBatchRpcRetryingCaller<REQ, RESP>::AsyncBatchRpcRetryingCaller(
- std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<TableName> table_name, const std::vector<REQ> &actions, nanoseconds pause_ns,
- int32_t max_attempts, nanoseconds operation_timeout_ns, nanoseconds rpc_timeout_ns,
- int32_t start_log_errors_count)
- : conn_(conn),
- retry_timer_(retry_timer),
- table_name_(table_name),
- pause_ns_(pause_ns),
- operation_timeout_ns_(operation_timeout_ns),
- rpc_timeout_ns_(rpc_timeout_ns),
- start_log_errors_count_(start_log_errors_count) {
- CHECK(conn_ != nullptr);
- CHECK(retry_timer_ != nullptr);
- location_cache_ = conn_->region_locator();
- rpc_client_ = conn_->rpc_client();
- cpu_pool_ = conn_->cpu_executor();
- CHECK(location_cache_ != nullptr);
- CHECK(rpc_client_ != nullptr);
- CHECK(cpu_pool_ != nullptr);
-
- max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts);
- uint32_t index = 0;
- for (auto row : actions) {
- actions_.push_back(std::make_shared<Action>(row, index));
- Promise<RESP> prom{};
- action2promises_.insert(std::pair<uint64_t, Promise<RESP>>(index, std::move(prom)));
- action2futures_.push_back(action2promises_[index++].getFuture());
- }
-}
-
-template <typename REQ, typename RESP>
-AsyncBatchRpcRetryingCaller<REQ, RESP>::~AsyncBatchRpcRetryingCaller() {}
-
-template <typename REQ, typename RESP>
-Future<std::vector<Try<RESP>>> AsyncBatchRpcRetryingCaller<REQ, RESP>::Call() {
- GroupAndSend(actions_, 1);
- return collectAll(action2futures_);
-}
-
-template <typename REQ, typename RESP>
-int64_t AsyncBatchRpcRetryingCaller<REQ, RESP>::RemainingTimeNs() {
- return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
- int32_t tries, std::shared_ptr<RegionRequest> region_request,
- const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
- if (tries > start_log_errors_count_) {
- std::string regions;
- regions += region_request->region_location()->region_name() + ", ";
- LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
- << table_name_->qualifier() << " from " << server_name->host_name()
- << " failed, tries=" << tries << ":- " << ew.what().toStdString();
- }
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
- int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
- const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
- if (tries > start_log_errors_count_) {
- std::string regions;
- for (const auto region_request : region_requests) {
- regions += region_request->region_location()->region_name() + ", ";
- }
- LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
- << table_name_->qualifier() << " from " << server_name->host_name()
- << " failed, tries=" << tries << ew.what().toStdString();
- }
-}
-
-template <typename REQ, typename RESP>
-const std::string AsyncBatchRpcRetryingCaller<REQ, RESP>::GetExtraContextForError(
- std::shared_ptr<ServerName> server_name) {
- return server_name ? server_name->ShortDebugString() : "";
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(const std::shared_ptr<Action> &action,
- const folly::exception_wrapper &ew,
- std::shared_ptr<ServerName> server_name) {
- ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
- AddAction2Error(action->original_index(), twec);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(
- const std::vector<std::shared_ptr<Action>> &actions, const folly::exception_wrapper &ew,
- std::shared_ptr<ServerName> server_name) {
- for (const auto action : actions) {
- AddError(action, ew, server_name);
- }
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailOne(const std::shared_ptr<Action> &action,
- int32_t tries,
- const folly::exception_wrapper &ew,
- int64_t current_time,
- const std::string extras) {
- auto action_index = action->original_index();
- auto itr = action2promises_.find(action_index);
- if (itr != action2promises_.end()) {
- if (itr->second.isFulfilled()) {
- return;
- }
- }
- ThrowableWithExtraContext twec(ew, current_time, extras);
- AddAction2Error(action_index, twec);
- action2promises_[action_index].setException(
- RetriesExhaustedException(tries - 1, action2errors_[action_index]));
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
- const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
- const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
- for (const auto action : actions) {
- FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
- }
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
- const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
- for (const auto action : actions) {
- auto action_index = action->original_index();
- auto itr = action2promises_.find(action_index);
- if (itr->second.isFulfilled()) {
- return;
- }
- action2promises_[action_index].setException(
- RetriesExhaustedException(tries, action2errors_[action_index]));
- }
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddAction2Error(
- uint64_t action_index, const ThrowableWithExtraContext &twec) {
- auto erritr = action2errors_.find(action_index);
- if (erritr != action2errors_.end()) {
- erritr->second->push_back(twec);
- } else {
- action2errors_[action_index] = std::make_shared<std::vector<ThrowableWithExtraContext>>();
- action2errors_[action_index]->push_back(twec);
- }
- return;
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnError(const ActionsByRegion &actions_by_region,
- int32_t tries,
- const folly::exception_wrapper &ew,
- std::shared_ptr<ServerName> server_name) {
- std::vector<std::shared_ptr<Action>> copied_actions;
- std::vector<std::shared_ptr<RegionRequest>> region_requests;
- for (const auto &action_by_region : actions_by_region) {
- region_requests.push_back(action_by_region.second);
- for (const auto &action : action_by_region.second->actions()) {
- copied_actions.push_back(action);
- }
- }
-
- LogException(tries, region_requests, ew, server_name);
- if ((tries >= max_attempts_) || !ExceptionUtil::ShouldRetry(ew)) {
- FailAll(copied_actions, tries, ew, server_name);
- return;
- }
- AddError(copied_actions, ew, server_name);
- TryResubmit(copied_actions, tries);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::TryResubmit(
- const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
- int64_t delay_ns;
- if (operation_timeout_ns_.count() > 0) {
- int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
- if (max_delay_ns <= 0) {
- FailAll(actions, tries);
- return;
- }
- delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1));
- } else {
- delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1);
- }
-
- conn_->retry_executor()->add([=]() {
- retry_timer_->scheduleTimeoutFn(
- [=]() { conn_->cpu_executor()->add([=]() { GroupAndSend(actions, tries + 1); }); },
- milliseconds(TimeUtil::ToMillis(delay_ns)));
- });
-}
-
-template <typename REQ, typename RESP>
-Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
-AsyncBatchRpcRetryingCaller<REQ, RESP>::GetRegionLocations(
- const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns) {
- auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{};
- for (auto const &action : actions) {
- locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(),
- RegionLocateType::kCurrent, locate_timeout_ns));
- }
-
- return collectAll(locs);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend(
- const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
- int64_t locate_timeout_ns;
- if (operation_timeout_ns_.count() > 0) {
- locate_timeout_ns = RemainingTimeNs();
- if (locate_timeout_ns <= 0) {
- FailAll(actions, tries);
- return;
- }
- } else {
- locate_timeout_ns = -1L;
- }
-
- GetRegionLocations(actions, locate_timeout_ns)
- .then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
- std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
- ActionsByServer actions_by_server;
- std::vector<std::shared_ptr<Action>> locate_failed;
-
- for (uint64_t i = 0; i < loc.size(); ++i) {
- auto action = actions[i];
- if (loc[i].hasValue()) {
- auto region_loc = loc[i].value();
- // Add it to actions_by_server;
- auto search =
- actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
- if (search != actions_by_server.end()) {
- search->second->AddActionsByRegion(region_loc, action);
- } else {
- auto server_request = std::make_shared<ServerRequest>(region_loc);
- server_request->AddActionsByRegion(region_loc, action);
- auto server_name = std::make_shared<ServerName>(region_loc->server_name());
- actions_by_server[server_name] = server_request;
- }
- VLOG(5) << "rowkey [" << action->action()->row() << "] of table["
- << table_name_->ShortDebugString() << "] found in ["
- << region_loc->region_name() << "]; RS["
- << region_loc->server_name().host_name() << ":"
- << region_loc->server_name().port() << "];";
- } else if (loc[i].hasException()) {
- folly::exception_wrapper ew = loc[i].exception();
- VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
- << "for index:" << i << "; tries: " << tries
- << "; max_attempts_: " << max_attempts_;
- // We might receive runtime error from location-cache.cc too, we are doing FailOne and
- // continue next one
- if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
- FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString());
- } else {
- AddError(action, loc[i].exception(), nullptr);
- locate_failed.push_back(action);
- }
- }
- }
- if (!actions_by_server.empty()) {
- Send(actions_by_server, tries);
- }
-
- if (!locate_failed.empty()) {
- TryResubmit(locate_failed, tries);
- }
- })
- .onError([=](const folly::exception_wrapper &ew) {
- VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
- << "tries: " << tries << "; max_attempts_: " << max_attempts_;
- std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
- if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
- FailAll(actions, tries, ew, nullptr);
- } else {
- TryResubmit(actions, tries);
- }
- });
- return;
-}
-
-template <typename REQ, typename RESP>
-Future<std::vector<Try<std::unique_ptr<Response>>>>
-AsyncBatchRpcRetryingCaller<REQ, RESP>::GetMultiResponse(const ActionsByServer &actions_by_server) {
- auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
- auto user = User::defaultUser();
- for (const auto &action_by_server : actions_by_server) {
- std::unique_ptr<Request> multi_req =
- RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region());
- auto host = action_by_server.first->host_name();
- int port = action_by_server.first->port();
- multi_calls.push_back(
- rpc_client_->AsyncCall(host, port, std::move(multi_req), user, "ClientService"));
- }
- return collectAll(multi_calls);
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server,
- int32_t tries) {
- int64_t remaining_ns;
- if (operation_timeout_ns_.count() > 0) {
- remaining_ns = RemainingTimeNs();
- if (remaining_ns <= 0) {
- std::vector<std::shared_ptr<Action>> failed_actions;
- for (const auto &action_by_server : actions_by_server) {
- for (auto &value : action_by_server.second->actions_by_region()) {
- for (const auto &failed_action : value.second->actions()) {
- failed_actions.push_back(failed_action);
- }
- }
- }
- FailAll(failed_actions, tries);
- return;
- }
- } else {
- remaining_ns = std::numeric_limits<int64_t>::max();
- }
-
- std::vector<std::shared_ptr<Request>> multi_reqv;
- for (const auto &action_by_server : actions_by_server)
- multi_reqv.push_back(
- std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region())));
-
- GetMultiResponse(actions_by_server)
- .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
- std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
- uint64_t num = 0;
- for (const auto &action_by_server : actions_by_server) {
- if (completed_responses[num].hasValue()) {
- auto multi_response =
- ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(),
- action_by_server.second->actions_by_region());
- OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first,
- std::move(multi_response));
- } else if (completed_responses[num].hasException()) {
- folly::exception_wrapper ew = completed_responses[num].exception();
- VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString()
- << " from server for action index:" << num;
- OnError(action_by_server.second->actions_by_region(), tries, ew,
- action_by_server.first);
- }
- num++;
- }
- })
- .onError([=](const folly::exception_wrapper &ew) {
- VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString();
- std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
- for (const auto &action_by_server : actions_by_server) {
- OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first);
- }
- });
- return;
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
- const ActionsByRegion &actions_by_region, int32_t tries,
- const std::shared_ptr<ServerName> server_name,
- const std::unique_ptr<hbase::MultiResponse> multi_response) {
- std::vector<std::shared_ptr<Action>> failed_actions;
- const auto region_results = multi_response->RegionResults();
- for (const auto &action_by_region : actions_by_region) {
- auto region_result_itr = region_results.find(action_by_region.first);
- if (region_result_itr != region_results.end()) {
- for (const auto &action : action_by_region.second->actions()) {
- OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second,
- failed_actions);
- }
- } else if (region_result_itr == region_results.end()) {
- auto region_exc = multi_response->RegionException(action_by_region.first);
- if (region_exc == nullptr) {
- // FailAll actions for this particular region as inconsistent server response. So we raise
- // this exception to the application
- std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
- " sent us neither results nor exceptions for " +
- action_by_region.first;
- VLOG(1) << err_msg;
- auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
- FailAll(action_by_region.second->actions(), tries, ew, server_name);
- } else {
- // Eg: org.apache.hadoop.hbase.NotServingRegionException:
- LogException(tries, action_by_region.second, *region_exc, server_name);
- if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*region_exc)) {
- FailAll(action_by_region.second->actions(), tries, *region_exc, server_name);
- return;
- }
- location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
- *region_exc);
- AddError(action_by_region.second->actions(), *region_exc, server_name);
- for (const auto &action : action_by_region.second->actions()) {
- failed_actions.push_back(action);
- }
- }
- }
- }
- if (!failed_actions.empty()) {
- TryResubmit(failed_actions, tries);
- }
-
- return;
-}
-
-template <typename REQ, typename RESP>
-void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
- const std::shared_ptr<Action> &action, const std::shared_ptr<RegionRequest> ®ion_request,
- int32_t tries, const std::shared_ptr<ServerName> &server_name,
- const std::shared_ptr<RegionResult> ®ion_result,
- std::vector<std::shared_ptr<Action>> &failed_actions) {
- std::string err_msg;
- try {
- auto result_or_exc = region_result->ResultOrException(action->original_index());
- auto result = std::get<0>(*result_or_exc);
- auto exc = std::get<1>(*result_or_exc);
- if (exc != nullptr) {
- LogException(tries, region_request, *exc, server_name);
- if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*exc)) {
- FailOne(action, tries, *exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
- } else {
- failed_actions.push_back(action);
- }
- } else if (result != nullptr) {
- action2promises_[action->original_index()].setValue(std::move(result));
- } else {
- std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
- " sent us neither results nor exceptions for request @ index " +
- std::to_string(action->original_index()) + ", row " +
- action->action()->row() + " of " +
- region_request->region_location()->region_name();
- VLOG(1) << err_msg;
- auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
- AddError(action, ew, server_name);
- failed_actions.push_back(action);
- }
- } catch (const std::out_of_range &oor) {
- // This should never occur. Error in logic. Throwing std::runtime_error from here. Will be
- // retried or failed
- std::string err_msg = "ResultOrException not present @ index " +
- std::to_string(action->original_index()) + ", row " +
- action->action()->row() + " of " +
- region_request->region_location()->region_name();
- throw std::runtime_error(err_msg);
- }
- return;
-}
-
-template class AsyncBatchRpcRetryingCaller<std::shared_ptr<hbase::Row>,
- std::shared_ptr<hbase::Result>>;
-} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
deleted file mode 100644
index 9194b04..0000000
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/Format.h>
-#include <folly/Try.h>
-#include <folly/futures/Future.h>
-#include <folly/futures/Promise.h>
-#include <folly/io/IOBuf.h>
-#include <folly/io/async/HHWheelTimer.h>
-#include <wangle/concurrent/CPUThreadPoolExecutor.h>
-
-#include <algorithm>
-#include <chrono>
-#include <functional>
-#include <map>
-#include <memory>
-#include <mutex>
-#include <stdexcept>
-#include <string>
-#include <tuple>
-#include <type_traits>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "connection/rpc-client.h"
-#include "core/action.h"
-#include "core/async-connection.h"
-#include "core/location-cache.h"
-#include "core/multi-response.h"
-#include "core/region-location.h"
-#include "core/region-request.h"
-#include "core/region-result.h"
-#include "core/request-converter.h"
-#include "core/response-converter.h"
-#include "core/result.h"
-#include "core/row.h"
-#include "core/server-request.h"
-#include "exceptions/exception.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-#include "security/user.h"
-#include "utils/connection-util.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
-
-namespace hbase {
-/* Equals function for ServerName */
-struct ServerNameEquals {
- bool operator()(const std::shared_ptr<pb::ServerName> &lhs,
- const std::shared_ptr<pb::ServerName> &rhs) const {
- return (lhs->start_code() == rhs->start_code() && lhs->host_name() == rhs->host_name() &&
- lhs->port() == rhs->port());
- }
-};
-
-struct ServerNameHash {
- /** hash */
- std::size_t operator()(const std::shared_ptr<pb::ServerName> &sn) const {
- std::size_t h = 0;
- boost::hash_combine(h, sn->start_code());
- boost::hash_combine(h, sn->host_name());
- boost::hash_combine(h, sn->port());
- return h;
- }
-};
-
-template <typename REQ, typename RESP>
-class AsyncBatchRpcRetryingCaller {
- public:
- using ActionsByServer =
- std::unordered_map<std::shared_ptr<pb::ServerName>, std::shared_ptr<ServerRequest>,
- ServerNameHash, ServerNameEquals>;
- using ActionsByRegion = ServerRequest::ActionsByRegion;
-
- AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
- std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<pb::TableName> table_name,
- const std::vector<REQ> &actions, std::chrono::nanoseconds pause_ns,
- int32_t max_attempts, std::chrono::nanoseconds operation_timeout_ns,
- std::chrono::nanoseconds rpc_timeout_ns,
- int32_t start_log_errors_count);
-
- ~AsyncBatchRpcRetryingCaller();
-
- folly::Future<std::vector<folly::Try<RESP>>> Call();
-
- private:
- int64_t RemainingTimeNs();
-
- void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request,
- const folly::exception_wrapper &ew,
- std::shared_ptr<pb::ServerName> server_name);
-
- void LogException(int32_t tries,
- const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
- const folly::exception_wrapper &ew,
- std::shared_ptr<pb::ServerName> server_name);
-
- const std::string GetExtraContextForError(std::shared_ptr<pb::ServerName> server_name);
-
- void AddError(const std::shared_ptr<Action> &action, const folly::exception_wrapper &ew,
- std::shared_ptr<pb::ServerName> server_name);
-
- void AddError(const std::vector<std::shared_ptr<Action>> &actions,
- const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
-
- void FailOne(const std::shared_ptr<Action> &action, int32_t tries,
- const folly::exception_wrapper &ew, int64_t current_time, const std::string extras);
-
- void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
- const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
-
- void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
-
- void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec);
-
- void OnError(const ActionsByRegion &actions_by_region, int32_t tries,
- const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
-
- void TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
-
- folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations(
- const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns);
-
- void GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
-
- folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse(
- const ActionsByServer &actions_by_server);
-
- void Send(const ActionsByServer &actions_by_server, int32_t tries);
-
- void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries,
- const std::shared_ptr<pb::ServerName> server_name,
- const std::unique_ptr<MultiResponse> multi_results);
-
- void OnComplete(const std::shared_ptr<Action> &action,
- const std::shared_ptr<RegionRequest> ®ion_request, int32_t tries,
- const std::shared_ptr<pb::ServerName> &server_name,
- const std::shared_ptr<RegionResult> ®ion_result,
- std::vector<std::shared_ptr<Action>> &failed_actions);
-
- private:
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<hbase::AsyncConnection> conn_;
- std::shared_ptr<pb::TableName> table_name_;
- std::vector<std::shared_ptr<Action>> actions_;
- std::chrono::nanoseconds pause_ns_;
- int32_t max_attempts_ = 0;
- std::chrono::nanoseconds operation_timeout_ns_;
- std::chrono::nanoseconds rpc_timeout_ns_;
- int32_t start_log_errors_count_ = 0;
-
- int64_t start_ns_ = TimeUtil::GetNowNanos();
- int32_t tries_ = 1;
- std::map<uint64_t, folly::Promise<RESP>> action2promises_;
- std::vector<folly::Future<RESP>> action2futures_;
- std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_;
-
- std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr;
- std::shared_ptr<RpcClient> rpc_client_ = nullptr;
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr;
-
- std::recursive_mutex multi_mutex_;
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
deleted file mode 100644
index 00cf2b8..0000000
--- a/hbase-native-client/core/async-batch-rpc-retrying-test.cc
+++ /dev/null
@@ -1,577 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <folly/Logging.h>
-#include <folly/Memory.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/ScopedEventBaseThread.h>
-#include <gtest/gtest.h>
-#include <wangle/concurrent/IOThreadPoolExecutor.h>
-
-#include <chrono>
-#include <functional>
-#include <string>
-
-#include "connection/rpc-client.h"
-#include "core/async-batch-rpc-retrying-caller.h"
-#include "core/async-connection.h"
-#include "core/async-rpc-retrying-caller-factory.h"
-#include "core/client.h"
-#include "core/connection-configuration.h"
-#include "core/keyvalue-codec.h"
-#include "core/region-location.h"
-#include "core/result.h"
-#include "exceptions/exception.h"
-#include "test-util/test-util.h"
-#include "utils/time-util.h"
-
-using hbase::AsyncRpcRetryingCallerFactory;
-using hbase::AsyncConnection;
-using hbase::AsyncRegionLocator;
-using hbase::ConnectionConfiguration;
-using hbase::Configuration;
-using hbase::HBaseRpcController;
-using hbase::RegionLocation;
-using hbase::RegionLocateType;
-using hbase::RpcClient;
-using hbase::RequestConverter;
-using hbase::ResponseConverter;
-using hbase::Put;
-using hbase::TimeUtil;
-using hbase::Client;
-using hbase::security::User;
-
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-using namespace hbase;
-
-using folly::exception_wrapper;
-
-class AsyncBatchRpcRetryTest : public ::testing::Test {
- public:
- static std::unique_ptr<hbase::TestUtil> test_util;
- static std::string tableName;
-
- static void SetUpTestCase() {
- google::InstallFailureSignalHandler();
- test_util = std::make_unique<hbase::TestUtil>();
- test_util->StartMiniCluster(2);
- std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400",
- "test500", "test600", "test700", "test800", "test900"};
- tableName = "split-table1";
- test_util->CreateTable(tableName, "d", keys);
- }
-};
-std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr;
-std::string AsyncBatchRpcRetryTest::tableName;
-
-class AsyncRegionLocatorBase : public AsyncRegionLocator {
- public:
- AsyncRegionLocatorBase() {}
- explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
- : region_location_(region_location) {}
- virtual ~AsyncRegionLocatorBase() = default;
-
- folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
- const std::string &row,
- const RegionLocateType,
- const int64_t) override {
- folly::Promise<std::shared_ptr<RegionLocation>> promise;
- promise.setValue(region_locations_.at(row));
- return promise.getFuture();
- }
-
- virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
- region_location_ = region_location;
- }
-
- virtual void set_region_location(
- const std::map<std::string, std::shared_ptr<RegionLocation>> ®_locs) {
- for (auto reg_loc : reg_locs) {
- region_locations_[reg_loc.first] = reg_loc.second;
- }
- }
-
- void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override {
- }
-
- protected:
- std::shared_ptr<RegionLocation> region_location_;
- std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_;
- std::map<std::string, uint32_t> mtries_;
- std::map<std::string, uint32_t> mnum_fails_;
-
- void InitRetryMaps(uint32_t num_fails) {
- if (mtries_.size() == 0 && mnum_fails_.size() == 0) {
- for (auto reg_loc : region_locations_) {
- mtries_[reg_loc.first] = 0;
- mnum_fails_[reg_loc.first] = num_fails;
- }
- }
- }
-};
-
-class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
- public:
- MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
- explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
- : AsyncRegionLocatorBase(region_location) {}
- virtual ~MockAsyncRegionLocator() {}
-};
-
-class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
- private:
- uint32_t counter_ = 0;
- uint32_t num_fails_ = 0;
- uint32_t tries_ = 0;
-
- public:
- explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
- : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
- explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
- : AsyncRegionLocatorBase(region_location) {}
- virtual ~MockWrongRegionAsyncRegionLocator() {}
-
- folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
- const hbase::pb::TableName &tn, const std::string &row,
- const RegionLocateType locate_type = RegionLocateType::kCurrent,
- const int64_t locate_ns = 0) override {
- InitRetryMaps(num_fails_);
- auto &tries = mtries_[row];
- auto &num_fails = mnum_fails_[row];
- if (++tries > num_fails) {
- return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
- }
-
- folly::Promise<std::shared_ptr<RegionLocation>> promise;
- /* set random region name, simulating invalid region */
- auto result = std::make_shared<RegionLocation>("whatever-region-name",
- region_locations_.at(row)->region_info(),
- region_locations_.at(row)->server_name());
- promise.setValue(result);
- return promise.getFuture();
- }
-};
-
-class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
- private:
- uint32_t tries_ = 0;
- uint32_t num_fails_ = 0;
- uint32_t counter_ = 0;
-
- public:
- explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
- : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
- explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
- : AsyncRegionLocatorBase(region_location) {}
- virtual ~MockFailingAsyncRegionLocator() {}
- folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
- const hbase::pb::TableName &tn, const std::string &row,
- const RegionLocateType locate_type = RegionLocateType::kCurrent,
- const int64_t locate_ns = 0) override {
- InitRetryMaps(num_fails_);
- auto &tries = mtries_[row];
- auto &num_fails = mnum_fails_[row];
- if (++tries > num_fails) {
- return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
- }
-
- folly::Promise<std::shared_ptr<RegionLocation>> promise;
- promise.setException(std::runtime_error{"Failed to look up region location"});
- return promise.getFuture();
- }
-};
-
-class MockAsyncConnection : public AsyncConnection,
- public std::enable_shared_from_this<MockAsyncConnection> {
- public:
- MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
- std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
- std::shared_ptr<RpcClient> rpc_client,
- std::shared_ptr<AsyncRegionLocator> region_locator)
- : conn_conf_(conn_conf),
- retry_timer_(retry_timer),
- cpu_executor_(cpu_executor),
- io_executor_(io_executor),
- retry_executor_(retry_executor),
- rpc_client_(rpc_client),
- region_locator_(region_locator) {}
- ~MockAsyncConnection() {}
- void Init() {
- caller_factory_ =
- std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
- }
-
- std::shared_ptr<Configuration> conf() override { return nullptr; }
- std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
- std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
- return caller_factory_;
- }
- std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
- std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
- std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
- return retry_executor_;
- }
-
- void Close() override {
- retry_timer_->destroy();
- retry_executor_->stop();
- io_executor_->stop();
- cpu_executor_->stop();
- }
- std::shared_ptr<HBaseRpcController> CreateRpcController() override {
- return std::make_shared<HBaseRpcController>();
- }
-
- private:
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<ConnectionConfiguration> conn_conf_;
- std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
- std::shared_ptr<RpcClient> rpc_client_;
- std::shared_ptr<AsyncRegionLocator> region_locator_;
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
-};
-
-class MockRawAsyncTableImpl {
- public:
- explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn,
- std::shared_ptr<hbase::pb::TableName> tn)
- : conn_(conn), tn_(tn) {}
- virtual ~MockRawAsyncTableImpl() = default;
-
- /* implement this in real RawAsyncTableImpl. */
- template <typename REQ, typename RESP>
- folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) {
- /* init request caller builder */
- auto builder = conn_->caller_factory()->Batch<REQ, RESP>();
-
- /* call with retry to get result */
- auto async_caller =
- builder->table(tn_)
- ->actions(std::make_shared<std::vector<REQ>>(rows))
- ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
- ->operation_timeout(conn_->connection_conf()->operation_timeout())
- ->pause(conn_->connection_conf()->pause())
- ->max_attempts(conn_->connection_conf()->max_retries())
- ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count())
- ->Build();
-
- return async_caller->Call().then([async_caller](auto r) { return r; });
- }
-
- private:
- std::shared_ptr<MockAsyncConnection> conn_;
- std::shared_ptr<hbase::pb::TableName> tn_;
-};
-
-std::shared_ptr<MockAsyncConnection> getAsyncConnection(
- Client &client, uint32_t operation_timeout_millis, uint32_t tries,
- std::shared_ptr<AsyncRegionLocatorBase> region_locator) {
- /* init region location and rpc channel */
- auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
- auto io_executor_ = client.async_connection()->io_executor();
- auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
- auto codec = std::make_shared<hbase::KeyValueCodec>();
- auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
- AsyncBatchRpcRetryTest::test_util->conf());
- std::shared_ptr<folly::HHWheelTimer> retry_timer =
- folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
-
- /* init connection configuration */
- auto connection_conf = std::make_shared<ConnectionConfiguration>(
- TimeUtil::SecondsToNanos(20), // connect_timeout
- TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout
- TimeUtil::SecondsToNanos(60), // rpc_timeout
- TimeUtil::MillisToNanos(100), // pause
- tries, // max retries
- 1); // start log errors count
-
- return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
- io_executor_, retry_executor_, rpc_client,
- region_locator);
-}
-
-template <typename ACTION>
-std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) {
- std::vector<std::shared_ptr<hbase::Row>> rows;
- for (auto action : actions) {
- std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action);
- rows.push_back(srow);
- }
- return rows;
-}
-
-template <typename REQ, typename RESP>
-std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions,
- std::vector<folly::Try<RESP>> &tresults) {
- std::vector<std::shared_ptr<hbase::Result>> results{};
- uint64_t num = 0;
- for (auto tresult : tresults) {
- if (tresult.hasValue()) {
- results.push_back(tresult.value());
- } else if (tresult.hasException()) {
- folly::exception_wrapper ew = tresult.exception();
- LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for "
- << actions[num].row();
- throw ew;
- }
- ++num;
- }
- return results;
-}
-
-template <typename ACTION>
-std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions(
- uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) {
- std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
- for (uint64_t i = 0; i < num_rows; ++i) {
- auto row = "test" + std::to_string(i);
- ACTION action(row);
- actions.push_back(action);
- region_locations[row] = table->GetRegionLocation(row);
- }
- return region_locations;
-}
-
-void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
- const std::string &table_name, bool split_regions, uint32_t tries = 3,
- uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) {
- // Create TableName and Row to be fetched from HBase
- auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
-
- // Create a client
- Client client(*AsyncBatchRpcRetryTest::test_util->conf());
-
- // Get connection to HBase Table
- std::shared_ptr<Table> table = client.Table(tn);
-
- for (uint64_t i = 0; i < num_rows; i++) {
- table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
- "value" + std::to_string(i)));
- }
- std::vector<hbase::Get> gets;
- auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table);
-
- /* set region locator */
- region_locator->set_region_location(region_locations);
-
- /* init hbase client connection */
- auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
- conn->Init();
-
- /* init retry caller factory */
- auto tableImpl =
- std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
-
- std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets);
- auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
- milliseconds(operation_timeout_millis));
- ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
-
- auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults);
- // Test the values, should be same as in put executed on hbase shell
- ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
- uint32_t i = 0;
- for (; i < num_rows; ++i) {
- ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
- << " must not be empty";
- EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
- EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
- }
-
- table->Close();
- client.Close();
- conn->Close();
-}
-
-void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
- const std::string &table_name, bool split_regions, uint32_t tries = 3,
- uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
- // Create TableName and Row to be fetched from HBase
- auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
-
- // Create a client
- Client client(*AsyncBatchRpcRetryTest::test_util->conf());
-
- // Get connection to HBase Table
- std::shared_ptr<Table> table = client.Table(tn);
-
- std::vector<hbase::Put> puts;
- auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table);
-
- /* set region locator */
- region_locator->set_region_location(region_locations);
-
- /* init hbase client connection */
- auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
- conn->Init();
-
- /* init retry caller factory */
- auto tableImpl =
- std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
-
- std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts);
- auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
- milliseconds(operation_timeout_millis));
- ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
-
- auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults);
- // Test the values, should be same as in put executed on hbase shell
- ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
-
- table->Close();
- client.Close();
- conn->Close();
-}
-
-// Test successful case
-TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockAsyncRegionLocator>());
- runMultiGets(region_locator, "table1", false);
-}
-
-// Tests the RPC failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, HandleException) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
- runMultiGets(region_locator, "table2", false, 5);
-}
-
-// Tests the RPC failing 4 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false));
-}
-
-// Tests the region location lookup failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(3));
- runMultiGets(region_locator, "table4", false);
-}
-
-// Tests the region location lookup failing 5 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3));
-}
-
-// Tests hitting operation timeout, thus not retrying anymore
-TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(6));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000));
-}
-
-//////////////////////
-// Test successful case
-TEST_F(AsyncBatchRpcRetryTest, MultiPuts) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockAsyncRegionLocator>());
- runMultiPuts(region_locator, "table1", false);
-}
-
-// Tests the RPC failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
- runMultiPuts(region_locator, "table2", false, 5);
-}
-
-// Tests the RPC failing 4 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false));
-}
-
-// Tests the region location lookup failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(3));
- runMultiPuts(region_locator, "table4", false);
-}
-
-// Tests the region location lookup failing 5 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3));
-}
-
-// Tests hitting operation timeout, thus not retrying anymore
-TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(6));
- EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000));
-}
-
- // Test successful case
- TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockAsyncRegionLocator>());
- runMultiGets(region_locator, "table7", true);
- }
-
- // Tests the RPC failing 3 times, then succeeding
- TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
- runMultiGets(region_locator, "table8", true, 5);
- }
-
- // Tests the RPC failing 4 times, throwing an exception
- TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true));
- }
-
- // Tests the region location lookup failing 3 times, then succeeding
- TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(3));
- runMultiGets(region_locator, "table10", true);
- }
-
- // Tests the region location lookup failing 5 times, throwing an exception
- TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3));
- }
-
- // Tests hitting operation timeout, thus not retrying anymore
- TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(6));
- EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000));
- }
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-client-scanner.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-client-scanner.cc b/hbase-native-client/core/async-client-scanner.cc
deleted file mode 100644
index 720ab25..0000000
--- a/hbase-native-client/core/async-client-scanner.cc
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-client-scanner.h"
-
-#include <algorithm>
-#include <iterator>
-#include <limits>
-#include <stdexcept>
-
-namespace hbase {
-
-AsyncClientScanner::AsyncClientScanner(
- std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
- std::shared_ptr<pb::TableName> table_name, std::shared_ptr<RawScanResultConsumer> consumer,
- nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
- nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
- : conn_(conn),
- scan_(scan),
- table_name_(table_name),
- consumer_(consumer),
- pause_(pause),
- max_retries_(max_retries),
- scan_timeout_nanos_(scan_timeout_nanos),
- rpc_timeout_nanos_(rpc_timeout_nanos),
- start_log_errors_count_(start_log_errors_count) {
- results_cache_ = std::make_shared<ScanResultCache>();
- max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
-}
-
-void AsyncClientScanner::Start() { OpenScanner(); }
-
-folly::Future<std::shared_ptr<OpenScannerResponse>> AsyncClientScanner::CallOpenScanner(
- std::shared_ptr<hbase::RpcClient> rpc_client,
- std::shared_ptr<hbase::HBaseRpcController> controller,
- std::shared_ptr<hbase::RegionLocation> loc) {
- open_scanner_tries_++;
-
- auto preq = RequestConverter::ToScanRequest(*scan_, loc->region_name(), scan_->Caching(), false);
-
- auto self(shared_from_this());
- VLOG(5) << "Calling RPC Client to open the scanner for region:" << loc->DebugString();
- return rpc_client
- ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq),
- security::User::defaultUser(), "ClientService")
- .then([self, loc, controller, rpc_client](const std::unique_ptr<Response>& presp) {
- VLOG(5) << "Scan Response:" << presp->DebugString();
- return std::make_shared<OpenScannerResponse>(rpc_client, presp, loc, controller);
- });
-}
-
-void AsyncClientScanner::OpenScanner() {
- auto self(shared_from_this());
- open_scanner_tries_ = 1;
-
- auto caller = conn_->caller_factory()
- ->Single<std::shared_ptr<OpenScannerResponse>>()
- ->table(table_name_)
- ->row(scan_->StartRow())
- ->locate_type(GetLocateType(*scan_))
- ->rpc_timeout(rpc_timeout_nanos_)
- ->operation_timeout(scan_timeout_nanos_)
- ->pause(pause_)
- ->max_retries(max_retries_)
- ->start_log_errors_count(start_log_errors_count_)
- ->action([&](std::shared_ptr<hbase::HBaseRpcController> controller,
- std::shared_ptr<hbase::RegionLocation> loc,
- std::shared_ptr<hbase::RpcClient> rpc_client)
- -> folly::Future<std::shared_ptr<OpenScannerResponse>> {
- return CallOpenScanner(rpc_client, controller, loc);
- })
- ->Build();
-
- caller->Call()
- .then([this, self](std::shared_ptr<OpenScannerResponse> resp) {
- VLOG(3) << "Opened scanner with id:" << resp->scan_resp_->scanner_id()
- << ", region:" << resp->region_location_->DebugString() << ", starting scan";
- StartScan(resp);
- })
- .onError([this, self](const folly::exception_wrapper& e) {
- VLOG(3) << "Open scan request received error:" << e.what();
- consumer_->OnError(e);
- })
- .then([caller, self](const auto r) { return r; });
-}
-
-void AsyncClientScanner::StartScan(std::shared_ptr<OpenScannerResponse> resp) {
- auto self(shared_from_this());
- auto caller = conn_->caller_factory()
- ->Scan()
- ->scanner_id(resp->scan_resp_->scanner_id())
- ->region_location(resp->region_location_)
- ->scanner_lease_timeout(TimeUtil::MillisToNanos(resp->scan_resp_->ttl()))
- ->scan(scan_)
- ->rpc_client(resp->rpc_client_)
- ->consumer(consumer_)
- ->results_cache(results_cache_)
- ->rpc_timeout(rpc_timeout_nanos_)
- ->scan_timeout(scan_timeout_nanos_)
- ->pause(pause_)
- ->max_retries(max_retries_)
- ->start_log_errors_count(start_log_errors_count_)
- ->Build();
-
- caller->Start(resp->controller_, resp->scan_resp_, resp->cell_scanner_)
- .then([caller, self](const bool has_more) {
- if (has_more) {
- // open the next scanner on the next region.
- self->OpenScanner();
- } else {
- self->consumer_->OnComplete();
- }
- })
- .onError([caller, self](const folly::exception_wrapper& e) { self->consumer_->OnError(e); })
- .then([caller, self](const auto r) { return r; });
-}
-
-RegionLocateType AsyncClientScanner::GetLocateType(const Scan& scan) {
- // TODO: In C++, there is no Scan::IncludeStartRow() and Scan::IncludeStopRow().
- // When added, this method should be modified to return other RegionLocateTypes
- // (see ConnectionUtils.java #getLocateType())
- // TODO: When reversed scans are implemented, return other RegionLocateTypes
- return RegionLocateType::kCurrent;
-}
-
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-client-scanner.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-client-scanner.h b/hbase-native-client/core/async-client-scanner.h
deleted file mode 100644
index 8663468..0000000
--- a/hbase-native-client/core/async-client-scanner.h
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Format.h>
-#include <folly/Logging.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/HHWheelTimer.h>
-
-#include <algorithm>
-#include <chrono>
-#include <functional>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "connection/rpc-client.h"
-#include "core/async-connection.h"
-#include "core/async-rpc-retrying-caller-factory.h"
-#include "core/async-rpc-retrying-caller.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/raw-scan-result-consumer.h"
-#include "core/region-location.h"
-#include "core/request-converter.h"
-#include "core/response-converter.h"
-#include "core/result.h"
-#include "core/scan-result-cache.h"
-#include "core/scan.h"
-#include "exceptions/exception.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-#include "utils/connection-util.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
-
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-namespace hbase {
-class OpenScannerResponse {
- public:
- OpenScannerResponse(std::shared_ptr<hbase::RpcClient> rpc_client,
- const std::unique_ptr<Response>& resp,
- std::shared_ptr<RegionLocation> region_location,
- std::shared_ptr<hbase::HBaseRpcController> controller)
- : rpc_client_(rpc_client), region_location_(region_location), controller_(controller) {
- scan_resp_ = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
- cell_scanner_ = resp->cell_scanner();
- }
- std::shared_ptr<hbase::RpcClient> rpc_client_;
- std::shared_ptr<pb::ScanResponse> scan_resp_;
- std::shared_ptr<RegionLocation> region_location_;
- std::shared_ptr<hbase::HBaseRpcController> controller_;
- std::shared_ptr<CellScanner> cell_scanner_;
-};
-
-class AsyncClientScanner : public std::enable_shared_from_this<AsyncClientScanner> {
- public:
- template <typename... T>
- static std::shared_ptr<AsyncClientScanner> Create(T&&... all) {
- return std::shared_ptr<AsyncClientScanner>(new AsyncClientScanner(std::forward<T>(all)...));
- }
-
- void Start();
-
- private:
- // methods
- AsyncClientScanner(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
- std::shared_ptr<pb::TableName> table_name,
- std::shared_ptr<RawScanResultConsumer> consumer, nanoseconds pause,
- uint32_t max_retries, nanoseconds scan_timeout_nanos,
- nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
-
- folly::Future<std::shared_ptr<OpenScannerResponse>> CallOpenScanner(
- std::shared_ptr<hbase::RpcClient> rpc_client,
- std::shared_ptr<hbase::HBaseRpcController> controller,
- std::shared_ptr<hbase::RegionLocation> loc);
-
- void OpenScanner();
-
- void StartScan(std::shared_ptr<OpenScannerResponse> resp);
-
- RegionLocateType GetLocateType(const Scan& scan);
-
- private:
- // data
- std::shared_ptr<AsyncConnection> conn_;
- std::shared_ptr<Scan> scan_;
- std::shared_ptr<pb::TableName> table_name_;
- std::shared_ptr<ScanResultCache> results_cache_;
- std::shared_ptr<RawScanResultConsumer> consumer_;
- nanoseconds pause_;
- uint32_t max_retries_;
- nanoseconds scan_timeout_nanos_;
- nanoseconds rpc_timeout_nanos_;
- uint32_t start_log_errors_count_;
- uint32_t max_attempts_;
- uint32_t open_scanner_tries_ = 0;
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-connection.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc
deleted file mode 100644
index 850fb8f..0000000
--- a/hbase-native-client/core/async-connection.cc
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-connection.h"
-#include "core/async-rpc-retrying-caller-factory.h"
-
-namespace hbase {
-
-void AsyncConnectionImpl::Init() {
- connection_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
- // start thread pools
- auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
- auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
- cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
- io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
- /*
- * We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly.
- * Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments
- * in async-rpc-retrying-caller.cc.
- */
- retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
- retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
-
- std::shared_ptr<Codec> codec = nullptr;
- if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
- std::string(KeyValueCodec::kJavaClassName)) {
- codec = std::make_shared<hbase::KeyValueCodec>();
- } else {
- LOG(WARNING) << "Not using RPC Cell Codec";
- }
- rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, cpu_executor_, codec, conf_,
- connection_conf_->connect_timeout());
- location_cache_ = std::make_shared<hbase::LocationCache>(conf_, io_executor_, cpu_executor_,
- rpc_client_->connection_pool());
- caller_factory_ =
- std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
-}
-
-// We can't have the threads continue running after everything is done
-// that leads to an error.
-AsyncConnectionImpl::~AsyncConnectionImpl() { Close(); }
-
-void AsyncConnectionImpl::Close() {
- if (is_closed_) return;
-
- cpu_executor_->stop();
- io_executor_->stop();
- retry_executor_->stop();
- retry_timer_->destroy();
- if (rpc_client_.get()) rpc_client_->Close();
- is_closed_ = true;
-}
-
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-connection.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h
deleted file mode 100644
index 7b260a5..0000000
--- a/hbase-native-client/core/async-connection.h
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/futures/Future.h>
-#include <folly/io/IOBuf.h>
-#include <wangle/concurrent/CPUThreadPoolExecutor.h>
-#include <wangle/concurrent/IOThreadPoolExecutor.h>
-
-#include <memory>
-#include <string>
-#include <utility>
-
-#include "connection/rpc-client.h"
-#include "core/async-region-locator.h"
-#include "core/configuration.h"
-#include "core/connection-configuration.h"
-#include "core/hbase-configuration-loader.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/keyvalue-codec.h"
-#include "core/location-cache.h"
-#include "if/Cell.pb.h"
-#include "serde/table-name.h"
-
-namespace hbase {
-
-class AsyncRpcRetryingCallerFactory;
-
-class AsyncConnection {
- public:
- AsyncConnection() {}
- virtual ~AsyncConnection() {}
- virtual std::shared_ptr<Configuration> conf() = 0;
- virtual std::shared_ptr<ConnectionConfiguration> connection_conf() = 0;
- virtual std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() = 0;
- virtual std::shared_ptr<RpcClient> rpc_client() = 0;
- virtual std::shared_ptr<AsyncRegionLocator> region_locator() = 0;
- virtual std::shared_ptr<HBaseRpcController> CreateRpcController() = 0;
- virtual std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() = 0;
- virtual std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() = 0;
- virtual std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() = 0;
- virtual void Close() = 0;
-};
-
-class AsyncConnectionImpl : public AsyncConnection,
- public std::enable_shared_from_this<AsyncConnectionImpl> {
- public:
- virtual ~AsyncConnectionImpl();
-
- // See https://mortoray.com/2013/08/02/safely-using-enable_shared_from_this/
- template <typename... T>
- static std::shared_ptr<AsyncConnectionImpl> Create(T&&... all) {
- auto conn =
- std::shared_ptr<AsyncConnectionImpl>(new AsyncConnectionImpl(std::forward<T>(all)...));
- conn->Init();
- return conn;
- }
-
- std::shared_ptr<Configuration> conf() override { return conf_; }
- std::shared_ptr<ConnectionConfiguration> connection_conf() override { return connection_conf_; }
- std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
- return caller_factory_;
- }
- std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
- std::shared_ptr<LocationCache> location_cache() { return location_cache_; }
- std::shared_ptr<AsyncRegionLocator> region_locator() override { return location_cache_; }
- std::shared_ptr<HBaseRpcController> CreateRpcController() override {
- return std::make_shared<HBaseRpcController>();
- }
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
- std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
- return retry_executor_;
- }
-
- void Close() override;
-
- protected:
- AsyncConnectionImpl() {}
-
- private:
- /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */
- static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size";
- /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
- static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size";
- /** The RPC codec to encode cells. For now it is KeyValueCodec */
- static constexpr const char* kRpcCodec = "hbase.client.rpc.codec";
-
- std::shared_ptr<Configuration> conf_;
- std::shared_ptr<ConnectionConfiguration> connection_conf_;
- std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
- std::shared_ptr<LocationCache> location_cache_;
- std::shared_ptr<RpcClient> rpc_client_;
- bool is_closed_ = false;
-
- private:
- explicit AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {}
- void Init();
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-region-locator.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-region-locator.h b/hbase-native-client/core/async-region-locator.h
deleted file mode 100644
index f75cb7e..0000000
--- a/hbase-native-client/core/async-region-locator.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/futures/Future.h>
-#include <memory>
-#include <string>
-
-#include "core/region-location.h"
-#include "if/Client.pb.h"
-#include "serde/region-info.h"
-#include "serde/server-name.h"
-#include "serde/table-name.h"
-
-namespace hbase {
-
-class AsyncRegionLocator {
- public:
- AsyncRegionLocator() {}
- virtual ~AsyncRegionLocator() = default;
-
- /**
- * The only method clients should use for meta lookups. If corresponding
- * location is cached, it's returned from the cache, otherwise lookup
- * in meta table is done, location is cached and then returned.
- * It's expected that tiny fraction of invocations incurs meta scan.
- * This method is to look up non-meta regions; use LocateMeta() to get the
- * location of hbase:meta region.
- *
- * @param tn Table name of the table to look up. This object must live until
- * after the future is returned
- *
- * @param row of the table to look up. This object must live until after the
- * future is returned
- */
- virtual folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(
- const hbase::pb::TableName &tn, const std::string &row,
- const RegionLocateType locate_type = RegionLocateType::kCurrent,
- const int64_t locate_ns = 0) = 0;
- /**
- * Update cached region location, possibly using the information from exception.
- */
- virtual void UpdateCachedLocation(const RegionLocation &loc,
- const folly::exception_wrapper &error) = 0;
-};
-
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
deleted file mode 100644
index 0ac9cac..0000000
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-rpc-retrying-caller-factory.h"
-
-namespace hbase {} // namespace hbase