You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:30 UTC
[hbase] 102/133: HBASE-18061 [C++] Fix retry logic in multi-get
calls (Sudeep Sunthankar)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit bca3f8f98fcbba0b36033f6dffa2d3a774c54c7f
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Wed Jul 19 11:50:40 2017 -0700
HBASE-18061 [C++] Fix retry logic in multi-get calls (Sudeep Sunthankar)
---
hbase-native-client/core/BUCK | 11 +
.../core/async-batch-rpc-retrying-caller.cc | 241 +++++------
.../core/async-batch-rpc-retrying-caller.h | 28 +-
.../core/async-batch-rpc-retrying-test.cc | 463 +++++++++++++++++++++
hbase-native-client/core/client-test.cc | 91 +++-
hbase-native-client/core/multi-response.cc | 34 +-
hbase-native-client/core/multi-response.h | 14 +-
hbase-native-client/core/region-result.cc | 2 +-
hbase-native-client/core/region-result.h | 5 +-
hbase-native-client/core/response-converter.cc | 50 ++-
hbase-native-client/core/response-converter.h | 8 +-
hbase-native-client/core/server-request.h | 10 +-
12 files changed, 753 insertions(+), 204 deletions(-)
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 464c010..f9db0bd 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -323,6 +323,17 @@ cxx_test(
":core",
],
run_test_separately=True,)
+cxx_test(
+ name="multi-retry-test",
+ srcs=[
+ "async-batch-rpc-retrying-test.cc",
+ ],
+ deps=[
+ ":core",
+ "//test-util:test-util",
+ "//exceptions:exceptions",
+ ],
+ run_test_separately=True,)
cxx_binary(
name="simple-client",
srcs=[
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
index 05290f5..0d67b17 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
@@ -77,20 +77,20 @@ int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() {
void AsyncBatchRpcRetryingCaller::LogException(int32_t tries,
std::shared_ptr<RegionRequest> region_request,
- std::shared_ptr<std::exception> &error,
+ const folly::exception_wrapper &ew,
std::shared_ptr<ServerName> server_name) {
if (tries > start_log_errors_count_) {
std::string regions;
regions += region_request->region_location()->region_name() + ", ";
LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
<< table_name_->qualifier() << " from " << server_name->host_name()
- << " failed, tries=" << tries << ":- " << error->what();
+ << " failed, tries=" << tries << ":- " << ew.what().toStdString();
}
}
void AsyncBatchRpcRetryingCaller::LogException(
- int32_t tries, std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
- std::shared_ptr<std::exception> &error, std::shared_ptr<ServerName> server_name) {
+ int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
+ const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
if (tries > start_log_errors_count_) {
std::string regions;
for (const auto region_request : region_requests) {
@@ -98,7 +98,7 @@ void AsyncBatchRpcRetryingCaller::LogException(
}
LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
<< table_name_->qualifier() << " from " << server_name->host_name()
- << " failed, tries=" << tries << error->what();
+ << " failed, tries=" << tries << ew.what().toStdString();
}
}
@@ -107,27 +107,24 @@ const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError(
return server_name ? server_name->ShortDebugString() : "";
}
-// TODO HBASE-17800 pass folly ew instead of std::exception
void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr<Action> &action,
- std::shared_ptr<std::exception> error,
+ const folly::exception_wrapper &ew,
std::shared_ptr<ServerName> server_name) {
- folly::exception_wrapper ew;
ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
AddAction2Error(action->original_index(), twec);
}
void AsyncBatchRpcRetryingCaller::AddError(const std::vector<std::shared_ptr<Action>> &actions,
- std::shared_ptr<std::exception> error,
+ const folly::exception_wrapper &ew,
std::shared_ptr<ServerName> server_name) {
for (const auto action : actions) {
- AddError(action, error, server_name);
+ AddError(action, ew, server_name);
}
}
-// TODO HBASE-17800 pass folly ew instead of std::exception
void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, int32_t tries,
- std::shared_ptr<std::exception> error,
- int64_t current_time, const std::string extras) {
+ const folly::exception_wrapper &ew, int64_t current_time,
+ const std::string extras) {
auto action_index = action->original_index();
auto itr = action2promises_.find(action_index);
if (itr != action2promises_.end()) {
@@ -135,7 +132,6 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action,
return;
}
}
- folly::exception_wrapper ew;
ThrowableWithExtraContext twec(ew, current_time, extras);
AddAction2Error(action_index, twec);
action2promises_[action_index].setException(
@@ -143,10 +139,10 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action,
}
void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
- int32_t tries, std::shared_ptr<std::exception> error,
+ int32_t tries, const folly::exception_wrapper &ew,
std::shared_ptr<ServerName> server_name) {
for (const auto action : actions) {
- FailOne(action, tries, error, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+ FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
}
}
@@ -159,7 +155,7 @@ void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Acti
return;
}
action2promises_[action_index].setException(
- RetriesExhaustedException(tries - 1, action2errors_[action_index]));
+ RetriesExhaustedException(tries, action2errors_[action_index]));
}
}
@@ -176,34 +172,32 @@ void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index,
}
void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries,
- std::shared_ptr<std::exception> exc,
+ const folly::exception_wrapper &ew,
std::shared_ptr<ServerName> server_name) {
std::vector<std::shared_ptr<Action>> copied_actions;
std::vector<std::shared_ptr<RegionRequest>> region_requests;
for (const auto &action_by_region : actions_by_region) {
region_requests.push_back(action_by_region.second);
- // Concurrent
for (const auto &action : action_by_region.second->actions()) {
copied_actions.push_back(action);
}
}
- // TODO HBASE-17800 for exc check with DoNotRetryIOException
- LogException(tries, region_requests, exc, server_name);
- if (tries >= max_attempts_) {
- FailAll(copied_actions, tries, exc, server_name);
+
+ LogException(tries, region_requests, ew, server_name);
+ if ((tries >= max_attempts_) || !ExceptionUtil::ShouldRetry(ew)) {
+ FailAll(copied_actions, tries, ew, server_name);
return;
}
- AddError(copied_actions, exc, server_name);
+ AddError(copied_actions, ew, server_name);
TryResubmit(copied_actions, tries);
}
-void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector<std::shared_ptr<Action>> actions,
+void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr<Action>> &actions,
int32_t tries) {
int64_t delay_ns;
if (operation_timeout_ns_.count() > 0) {
int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
if (max_delay_ns <= 0) {
- VLOG(8) << "Fail All from onError";
FailAll(actions, tries);
return;
}
@@ -211,9 +205,12 @@ void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector<std::shared_ptr<Action
} else {
delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1);
}
- // TODO This gives segfault @ present, when retried
- // retry_timer_->scheduleTimeoutFn([&]() { GroupAndSend(actions, tries + 1); },
- // milliseconds(TimeUtil::ToMillis(delay_ns)));
+
+ conn_->retry_executor()->add([=]() {
+ retry_timer_->scheduleTimeoutFn(
+ [=]() { conn_->cpu_executor()->add([=]() { GroupAndSend(actions, tries + 1); }); },
+ milliseconds(TimeUtil::ToMillis(delay_ns)));
+ });
}
Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
@@ -234,7 +231,7 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
if (operation_timeout_ns_.count() > 0) {
locate_timeout_ns = RemainingTimeNs();
if (locate_timeout_ns <= 0) {
- FailAll(actions_, tries);
+ FailAll(actions, tries);
return;
}
} else {
@@ -242,8 +239,8 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
}
GetRegionLocations(actions, locate_timeout_ns)
- .then([&](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
- std::lock_guard<std::mutex> lock(multi_mutex_);
+ .then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
+ std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
ActionsByServer actions_by_server;
std::vector<std::shared_ptr<Action>> locate_failed;
@@ -252,50 +249,36 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
if (loc[i].hasValue()) {
auto region_loc = loc[i].value();
// Add it to actions_by_server;
- // Concurrent
auto search =
actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
if (search != actions_by_server.end()) {
search->second->AddActionsByRegion(region_loc, action);
} else {
- // Create new key
auto server_request = std::make_shared<ServerRequest>(region_loc);
server_request->AddActionsByRegion(region_loc, action);
auto server_name = std::make_shared<ServerName>(region_loc->server_name());
actions_by_server[server_name] = server_request;
}
- locate_failed.push_back(action);
- VLOG(8) << "row [" << action->action()->row() << "] of table["
- << table_name_->namespace_() << ":" << table_name_->qualifier()
- << " found in region [" << region_loc->region_name() << "]; host["
- << region_loc->server_name().host_name() << "]; port["
+ VLOG(5) << "rowkey [" << action->action()->row() << "] of table["
+ << table_name_->ShortDebugString() << "] found in ["
+ << region_loc->region_name() << "]; RS["
+ << region_loc->server_name().host_name() << ":"
<< region_loc->server_name().port() << "];";
} else if (loc[i].hasException()) {
- VLOG(8) << "Exception occured while locating region:- "
- << loc[i].exception().getCopied()->what() << " for action index " << i;
- // TODO Feedback needed, Java API only identifies DoNotRetryIOException
- // We might receive runtime error from location-cache.cc too, we are treating both same
- if (loc[i].exception().is_compatible_with<std::runtime_error>()) {
- std::string extra = "";
- FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(),
- loc[i].exception().what().toStdString());
- return;
+ folly::exception_wrapper ew = loc[i].exception();
+ VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
+ << "for index:" << i << "; tries: " << tries
+ << "; max_attempts_: " << max_attempts_;
+ // We might receive runtime error from location-cache.cc too, we are doing FailOne and
+ // continue next one
+ if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
+ FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString());
+ } else {
+ AddError(action, loc[i].exception(), nullptr);
+ locate_failed.push_back(action);
}
- // TODO HBASE-17800 for exc check with DoNotRetryIOException
- /*
- else if (loc[i].exception().is_compatible_with<hbase::DoNotRetryIOException>()) {
- int64_t current_time = 0;
- std::string extra = "";
- FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(),
- loc[i].exception().what().toStdString());
- return;
- }*/
- AddError(action, std::make_shared<std::exception>(*loc[i].exception().getCopied()),
- nullptr);
- locate_failed.push_back(action);
}
}
-
if (!actions_by_server.empty()) {
Send(actions_by_server, tries);
}
@@ -304,17 +287,21 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
TryResubmit(locate_failed, tries);
}
})
- .onError([&](const folly::exception_wrapper &ew) {
- std::lock_guard<std::mutex> lock(multi_mutex_);
- auto exc = ew.getCopied();
- VLOG(8) << "GetRegionLocations() exception: " << ew.what().toStdString();
+ .onError([=](const folly::exception_wrapper &ew) {
+ VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
+ << "tries: " << tries << "; max_attempts_: " << max_attempts_;
+ std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
+ if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
+ FailAll(actions, tries, ew, nullptr);
+ } else {
+ TryResubmit(actions, tries);
+ }
});
return;
}
Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller::GetMultiResponse(
const ActionsByServer &actions_by_server) {
- // Concurrent.
auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
auto user = User::defaultUser();
for (const auto &action_by_server : actions_by_server) {
@@ -328,16 +315,14 @@ Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller:
return collectAll(multi_calls);
}
-void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32_t tries) {
+void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server, int32_t tries) {
int64_t remaining_ns;
if (operation_timeout_ns_.count() > 0) {
remaining_ns = RemainingTimeNs();
if (remaining_ns <= 0) {
std::vector<std::shared_ptr<Action>> failed_actions;
for (const auto &action_by_server : actions_by_server) {
- // Concurrent
for (auto &value : action_by_server.second->actions_by_region()) {
- // Concurrent
for (const auto &failed_action : value.second->actions()) {
failed_actions.push_back(failed_action);
}
@@ -357,30 +342,30 @@ void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32
GetMultiResponse(actions_by_server)
.then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
- std::lock_guard<std::mutex> lock(multi_mutex_);
- for (uint64_t num = 0; num < completed_responses.size(); ++num) {
+ std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
+ uint64_t num = 0;
+ for (const auto &action_by_server : actions_by_server) {
if (completed_responses[num].hasValue()) {
auto multi_response =
- ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value());
- for (const auto &action_by_server : actions_by_server) {
- OnComplete(action_by_server.second->actions_by_region(), tries,
- action_by_server.first, std::move(multi_response));
- }
+ ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(),
+ action_by_server.second->actions_by_region());
+ OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first,
+ std::move(multi_response));
} else if (completed_responses[num].hasException()) {
- VLOG(8) << "Received exception: "
- << completed_responses[num].exception().getCopied()->what()
- << " from server for action index " << num;
- // TODO: we should call OnError here as well.
+ folly::exception_wrapper ew = completed_responses[num].exception();
+ VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString()
+ << " from server for action index:" << num;
+ OnError(action_by_server.second->actions_by_region(), tries, ew,
+ action_by_server.first);
}
+ num++;
}
})
.onError([=](const folly::exception_wrapper &ew) {
- auto exc = ew.getCopied();
- VLOG(8) << "GetMultiResponse() exception: " << ew.what().toStdString();
- std::lock_guard<std::mutex> lock(multi_mutex_);
+ VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString();
+ std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
for (const auto &action_by_server : actions_by_server) {
- OnError(action_by_server.second->actions_by_region(), tries,
- std::make_shared<std::exception>(*exc), action_by_server.first);
+ OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first);
}
});
return;
@@ -391,46 +376,35 @@ void AsyncBatchRpcRetryingCaller::OnComplete(
const std::shared_ptr<ServerName> server_name,
const std::unique_ptr<hbase::MultiResponse> multi_response) {
std::vector<std::shared_ptr<Action>> failed_actions;
+ const auto region_results = multi_response->RegionResults();
for (const auto &action_by_region : actions_by_region) {
- auto region_result_itr = multi_response->RegionResults().find(action_by_region.first);
- if (region_result_itr == multi_response->RegionResults().end()) {
- VLOG(8) << "Region " << action_by_region.first << " not found in MultiResults.";
- // TODO Feedback needed Should we throw from here or continue for next action_by_region ?
- // Throwing at present as this looks like an inconsistency
- // Concurrent
- auto exc = std::make_shared<std::runtime_error>("Invalid search for region " +
- action_by_region.first + " in multi results");
- FailAll(action_by_region.second->actions(), tries, exc, server_name);
- return;
- // std::runtime_error(
- // "Invalid search for region " + action_by_region.first + " in multi results");
- }
- if (region_result_itr != multi_response->RegionResults().end()) {
- // Concurrent
+ auto region_result_itr = region_results.find(action_by_region.first);
+ if (region_result_itr != region_results.end()) {
for (const auto &action : action_by_region.second->actions()) {
OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second,
failed_actions);
}
- } else {
+ } else if (region_result_itr == region_results.end()) {
auto region_exc = multi_response->RegionException(action_by_region.first);
- std::shared_ptr<std::exception> pexc;
if (region_exc == nullptr) {
- VLOG(8) << "Server sent us neither results nor exceptions for " << action_by_region.first;
- pexc = std::make_shared<std::exception>(std::runtime_error("Invalid response"));
- // TODO: raise this exception to the application
+ // FailAll actions for this particular region as inconsistent server response. So we raise
+ // this exception to the application
+ std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
+ " sent us neither results nor exceptions for " +
+ action_by_region.first;
+ VLOG(1) << err_msg;
+ auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
+ FailAll(action_by_region.second->actions(), tries, ew, server_name);
} else {
- // TODO HBASE-17800 for exc check with DoNotRetryIOException
- LogException(tries, action_by_region.second, region_exc, server_name);
- location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
- *region_exc);
- std::string row_name;
- if (tries >= max_attempts_) {
- // Concurrent
- FailAll(action_by_region.second->actions(), tries, region_exc, server_name);
+ // Eg: org.apache.hadoop.hbase.NotServingRegionException:
+ LogException(tries, action_by_region.second, *region_exc, server_name);
+ if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*region_exc)) {
+ FailAll(action_by_region.second->actions(), tries, *region_exc, server_name);
return;
}
- // Concurrent
- AddError(action_by_region.second->actions(), region_exc, server_name);
+ location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
+ *region_exc);
+ AddError(action_by_region.second->actions(), *region_exc, server_name);
for (const auto &action : action_by_region.second->actions()) {
failed_actions.push_back(action);
}
@@ -440,6 +414,7 @@ void AsyncBatchRpcRetryingCaller::OnComplete(
if (!failed_actions.empty()) {
TryResubmit(failed_actions, tries);
}
+
return;
}
@@ -454,34 +429,36 @@ void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &acti
auto result_or_exc = region_result->ResultOrException(action->original_index());
auto result = std::get<0>(*result_or_exc);
auto exc = std::get<1>(*result_or_exc);
- std::shared_ptr<std::exception> pexc;
if (exc != nullptr) {
- LogException(tries, region_request, exc, server_name);
- if (tries >= max_attempts_) {
- FailOne(action, tries, exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+ LogException(tries, region_request, *exc, server_name);
+ if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*exc)) {
+ FailOne(action, tries, *exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
} else {
failed_actions.push_back(action);
}
} else if (result != nullptr) {
action2promises_[action->original_index()].setValue(std::move(result));
} else {
- VLOG(8) << "Server " << server_name->ShortDebugString()
- << " sent us neither results nor exceptions for request @ index "
- << action->original_index() << ", row " << action->action()->row() << " of "
- << region_request->region_location()->region_name();
- err_msg = "Invalid response";
- AddError(action, std::make_shared<std::runtime_error>(err_msg), server_name);
+ std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
+ " sent us neither results nor exceptions for request @ index " +
+ std::to_string(action->original_index()) + ", row " +
+ action->action()->row() + " of " +
+ region_request->region_location()->region_name();
+ VLOG(1) << err_msg;
+ auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
+ AddError(action, ew, server_name);
failed_actions.push_back(action);
}
} catch (const std::out_of_range &oor) {
- // TODO Feedback needed. Should we retry for he specific index again ?
- // This should never occur, so we are throwing a std::runtime_error from here
- VLOG(8) << "No ResultOrException found @ index " << action->original_index() << ", row "
- << action->action()->row() << " of "
- << region_request->region_location()->region_name();
- throw std::runtime_error("ResultOrException not present @ index " +
- std::to_string(action->original_index()));
+ // This should never occur. Error in logic. Throwing std::runtime_error from here. Will be
+ // retried or failed
+ std::string err_msg = "ResultOrException not present @ index " +
+ std::to_string(action->original_index()) + ", row " +
+ action->action()->row() + " of " +
+ region_request->region_location()->region_name();
+ throw std::runtime_error(err_msg);
}
return;
}
+
} /* namespace hbase */
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
index 29a0e6a..4c04b91 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
@@ -19,6 +19,7 @@
#pragma once
+#include <folly/ExceptionWrapper.h>
#include <folly/Format.h>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
@@ -107,36 +108,36 @@ class AsyncBatchRpcRetryingCaller {
int64_t RemainingTimeNs();
void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request,
- std::shared_ptr<std::exception> &error,
+ const folly::exception_wrapper &ew,
std::shared_ptr<pb::ServerName> server_name);
- void LogException(int32_t tries, std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
- std::shared_ptr<std::exception> &error,
+ void LogException(int32_t tries,
+ const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
+ const folly::exception_wrapper &ew,
std::shared_ptr<pb::ServerName> server_name);
const std::string GetExtraContextForError(std::shared_ptr<pb::ServerName> server_name);
- void AddError(const std::shared_ptr<Action> &action, std::shared_ptr<std::exception> error,
+ void AddError(const std::shared_ptr<Action> &action, const folly::exception_wrapper &ew,
std::shared_ptr<pb::ServerName> server_name);
void AddError(const std::vector<std::shared_ptr<Action>> &actions,
- std::shared_ptr<std::exception> error, std::shared_ptr<pb::ServerName> server_name);
+ const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
void FailOne(const std::shared_ptr<Action> &action, int32_t tries,
- std::shared_ptr<std::exception> error, int64_t current_time,
- const std::string extras);
+ const folly::exception_wrapper &ew, int64_t current_time, const std::string extras);
void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
- std::shared_ptr<std::exception> error, std::shared_ptr<pb::ServerName> server_name);
+ const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec);
void OnError(const ActionsByRegion &actions_by_region, int32_t tries,
- std::shared_ptr<std::exception> exc, std::shared_ptr<pb::ServerName> server_name);
+ const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
- void TryResubmit(std::vector<std::shared_ptr<Action>> actions, int32_t tries);
+ void TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations(
const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns);
@@ -146,7 +147,7 @@ class AsyncBatchRpcRetryingCaller {
folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse(
const ActionsByServer &actions_by_server);
- void Send(ActionsByServer &actions_by_server, int32_t tries);
+ void Send(const ActionsByServer &actions_by_server, int32_t tries);
void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries,
const std::shared_ptr<pb::ServerName> server_name,
@@ -179,7 +180,6 @@ class AsyncBatchRpcRetryingCaller {
std::shared_ptr<RpcClient> rpc_client_ = nullptr;
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr;
- std::mutex multi_mutex_;
+ std::recursive_mutex multi_mutex_;
};
-
-} /* namespace hbase */
+} // namespace hbase
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
new file mode 100644
index 0000000..c186276
--- /dev/null
+++ b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Memory.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
+#include <gtest/gtest.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <chrono>
+#include <functional>
+#include <string>
+
+#include "connection/rpc-client.h"
+#include "core/async-batch-rpc-retrying-caller.h"
+#include "core/async-connection.h"
+#include "core/async-rpc-retrying-caller-factory.h"
+#include "core/client.h"
+#include "core/connection-configuration.h"
+#include "core/keyvalue-codec.h"
+#include "core/region-location.h"
+#include "core/result.h"
+#include "exceptions/exception.h"
+#include "test-util/test-util.h"
+#include "utils/time-util.h"
+
+using hbase::AsyncRpcRetryingCallerFactory;
+using hbase::AsyncConnection;
+using hbase::AsyncRegionLocator;
+using hbase::ConnectionConfiguration;
+using hbase::Configuration;
+using hbase::HBaseRpcController;
+using hbase::RegionLocation;
+using hbase::RegionLocateType;
+using hbase::RpcClient;
+using hbase::RequestConverter;
+using hbase::ResponseConverter;
+using hbase::Put;
+using hbase::TimeUtil;
+using hbase::Client;
+using hbase::security::User;
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+using namespace hbase;
+
+using folly::exception_wrapper;
+
+class AsyncBatchRpcRetryTest : public ::testing::Test {
+ public:
+ static std::unique_ptr<hbase::TestUtil> test_util;
+ static void SetUpTestCase() {
+ google::InstallFailureSignalHandler();
+ test_util = std::make_unique<hbase::TestUtil>();
+ test_util->StartMiniCluster(2);
+ }
+};
+std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr;
+
+class AsyncRegionLocatorBase : public AsyncRegionLocator {
+ public:
+ AsyncRegionLocatorBase() {}
+ explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
+ : region_location_(region_location) {}
+ virtual ~AsyncRegionLocatorBase() = default;
+
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
+ const std::string &row,
+ const RegionLocateType,
+ const int64_t) override {
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ promise.setValue(region_locations_.at(row));
+ return promise.getFuture();
+ }
+
+ virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
+ region_location_ = region_location;
+ }
+
+ virtual void set_region_location(
+ const std::map<std::string, std::shared_ptr<RegionLocation>> ®_locs) {
+ for (auto reg_loc : reg_locs) {
+ region_locations_[reg_loc.first] = reg_loc.second;
+ }
+ }
+
+ void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override {
+ }
+
+ protected:
+ std::shared_ptr<RegionLocation> region_location_;
+ std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_;
+ std::map<std::string, uint32_t> mtries_;
+ std::map<std::string, uint32_t> mnum_fails_;
+
+ void InitRetryMaps(uint32_t num_fails) {
+ if (mtries_.size() == 0 && mnum_fails_.size() == 0) {
+ for (auto reg_loc : region_locations_) {
+ mtries_[reg_loc.first] = 0;
+ mnum_fails_[reg_loc.first] = num_fails;
+ }
+ }
+ }
+};
+
+class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
+ public:
+ MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
+ explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockAsyncRegionLocator() {}
+};
+
+class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+ uint32_t counter_ = 0;
+ uint32_t num_fails_ = 0;
+ uint32_t tries_ = 0;
+
+ public:
+ explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
+ : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+ explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockWrongRegionAsyncRegionLocator() {}
+
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+ const hbase::pb::TableName &tn, const std::string &row,
+ const RegionLocateType locate_type = RegionLocateType::kCurrent,
+ const int64_t locate_ns = 0) override {
+ InitRetryMaps(num_fails_);
+ auto &tries = mtries_[row];
+ auto &num_fails = mnum_fails_[row];
+ if (++tries > num_fails) {
+ return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+ }
+
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ /* set random region name, simulating invalid region */
+ auto result = std::make_shared<RegionLocation>("whatever-region-name",
+ region_locations_.at(row)->region_info(),
+ region_locations_.at(row)->server_name());
+ promise.setValue(result);
+ return promise.getFuture();
+ }
+};
+
+class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+ uint32_t tries_ = 0;
+ uint32_t num_fails_ = 0;
+ uint32_t counter_ = 0;
+
+ public:
+ explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
+ : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+ explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockFailingAsyncRegionLocator() {}
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+ const hbase::pb::TableName &tn, const std::string &row,
+ const RegionLocateType locate_type = RegionLocateType::kCurrent,
+ const int64_t locate_ns = 0) override {
+ InitRetryMaps(num_fails_);
+ auto &tries = mtries_[row];
+ auto &num_fails = mnum_fails_[row];
+ if (++tries > num_fails) {
+ return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+ }
+
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ promise.setException(std::runtime_error{"Failed to look up region location"});
+ return promise.getFuture();
+ }
+};
+
+class MockAsyncConnection : public AsyncConnection,
+ public std::enable_shared_from_this<MockAsyncConnection> {
+ public:
+ MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
+ std::shared_ptr<RpcClient> rpc_client,
+ std::shared_ptr<AsyncRegionLocator> region_locator)
+ : conn_conf_(conn_conf),
+ retry_timer_(retry_timer),
+ cpu_executor_(cpu_executor),
+ io_executor_(io_executor),
+ retry_executor_(retry_executor),
+ rpc_client_(rpc_client),
+ region_locator_(region_locator) {}
+ ~MockAsyncConnection() {}
+ void Init() {
+ caller_factory_ =
+ std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
+ }
+
+ std::shared_ptr<Configuration> conf() override { return nullptr; }
+ std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
+ std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
+ return caller_factory_;
+ }
+ std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
+ std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+ return retry_executor_;
+ }
+
+ void Close() override {}
+ std::shared_ptr<HBaseRpcController> CreateRpcController() override {
+ return std::make_shared<HBaseRpcController>();
+ }
+
+ private:
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+ std::shared_ptr<ConnectionConfiguration> conn_conf_;
+ std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+ std::shared_ptr<RpcClient> rpc_client_;
+ std::shared_ptr<AsyncRegionLocator> region_locator_;
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
+};
+
+class MockRawAsyncTableImpl {
+ public:
+ explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn,
+ std::shared_ptr<hbase::pb::TableName> tn)
+ : conn_(conn), tn_(tn) {}
+ virtual ~MockRawAsyncTableImpl() = default;
+
+ /* implement this in real RawAsyncTableImpl. */
+ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Gets(
+ const std::vector<hbase::Get> &gets) {
+ /* init request caller builder */
+ auto builder = conn_->caller_factory()->Batch();
+
+ /* call with retry to get result */
+ auto async_caller =
+ builder->table(tn_)
+ ->actions(std::make_shared<std::vector<hbase::Get>>(gets))
+ ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
+ ->operation_timeout(conn_->connection_conf()->operation_timeout())
+ ->pause(conn_->connection_conf()->pause())
+ ->max_attempts(conn_->connection_conf()->max_retries())
+ ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count())
+ ->Build();
+
+ return async_caller->Call().then([async_caller](auto r) { return r; });
+ }
+
+ private:
+ std::shared_ptr<MockAsyncConnection> conn_;
+ std::shared_ptr<hbase::pb::TableName> tn_;
+};
+
+void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+ const std::string &table_name, bool split_regions, uint32_t tries = 3,
+ uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 10000) {
+ std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400",
+ "test500", "test600", "test700", "test800", "test900"};
+ std::string tableName = (split_regions) ? ("split-" + table_name) : table_name;
+ if (split_regions)
+ AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d", keys);
+ else
+ AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(tableName);
+
+ // Create a client
+ Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ for (uint64_t i = 0; i < num_rows; i++) {
+ table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+ "value" + std::to_string(i)));
+ }
+
+ std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
+ std::vector<hbase::Get> gets;
+ for (uint64_t i = 0; i < num_rows; ++i) {
+ auto row = "test" + std::to_string(i);
+ hbase::Get get(row);
+ gets.push_back(get);
+ region_locations[row] = table->GetRegionLocation(row);
+ }
+
+ /* init region location and rpc channel */
+ auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+ auto io_executor_ = client.async_connection()->io_executor();
+ auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ auto codec = std::make_shared<hbase::KeyValueCodec>();
+ auto rpc_client =
+ std::make_shared<RpcClient>(io_executor_, codec, AsyncBatchRpcRetryTest::test_util->conf());
+ std::shared_ptr<folly::HHWheelTimer> retry_timer =
+ folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
+
+ /* init connection configuration */
+ auto connection_conf = std::make_shared<ConnectionConfiguration>(
+ TimeUtil::SecondsToNanos(20), // connect_timeout
+ TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout
+ TimeUtil::SecondsToNanos(60), // rpc_timeout
+ TimeUtil::MillisToNanos(100), // pause
+ tries, // max retries
+ 1); // start log errors count
+
+ /* set region locator */
+ region_locator->set_region_location(region_locations);
+
+ /* init hbase client connection */
+ auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+ io_executor_, retry_executor_, rpc_client,
+ region_locator);
+ conn->Init();
+
+ /* init retry caller factory */
+ auto tableImpl =
+ std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+ auto tresults = tableImpl->Gets(gets).get(milliseconds(operation_timeout_millis));
+
+ ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+ std::vector<std::shared_ptr<hbase::Result>> results{};
+ uint32_t num = 0;
+ for (auto tresult : tresults) {
+ if (tresult.hasValue()) {
+ results.push_back(tresult.value());
+ } else if (tresult.hasException()) {
+ folly::exception_wrapper ew = tresult.exception();
+ LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " << gets[num].row();
+ throw ew;
+ }
+ ++num;
+ }
+
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
+ uint32_t i = 0;
+ for (; i < num_rows; ++i) {
+ ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
+ << " must not be empty";
+ EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
+ EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
+ }
+
+ retry_timer->destroy();
+ table->Close();
+ client.Close();
+ retry_executor_->stop();
+ io_executor_->stop();
+ cpu_executor_->stop();
+}
+
+// Test successful case
+TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runMultiTest(region_locator, "table1", false);
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runMultiTest(region_locator, "table2", false, 5);
+}
+
+// Tests the RPC failing 4 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", false));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runMultiTest(region_locator, "table4", false);
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", false, 3));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(6));
+ EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", false, 5, 100, 10000));
+}
+
+// Test successful case
+TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runMultiTest(region_locator, "table1", true);
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runMultiTest(region_locator, "table2", true, 5);
+}
+
+// Tests the RPC failing 4 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", true));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runMultiTest(region_locator, "table4", true);
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", true, 3));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(6));
+ EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", true, 5, 100, 10000));
+}
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index ba213bd..9efe0b6 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -435,29 +435,27 @@ TEST_F(ClientTest, PutsWithTimestamp) {
client.Close();
}
-TEST_F(ClientTest, MultiGets) {
- // Using TestUtil to populate test data
- ClientTest::test_util->CreateTable("t", "d");
-
- // Create TableName and Row to be fetched from HBase
- auto tn = folly::to<hbase::pb::TableName>("t");
-
- // Create a client
- hbase::Client client(*ClientTest::test_util->conf());
+void SetClientParams() {
+ ClientTest::test_util->conf()->SetInt("hbase.client.cpu.thread.pool.size", 6);
+ ClientTest::test_util->conf()->SetInt("hbase.client.operation.timeout", 600000);
+ ClientTest::test_util->conf()->SetInt("hbase.client.retries.number", 7);
+ ClientTest::test_util->conf()->SetInt("hbase.client.start.log.errors.counter", 1);
+}
- // Get connection to HBase Table
- auto table = client.Table(tn);
+void PerformPuts(uint64_t num_rows, std::shared_ptr<hbase::Client> client,
+ const std::string &table_name) {
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+ auto table = client->Table(tn);
ASSERT_TRUE(table) << "Unable to get connection to Table.";
-
- uint64_t num_rows = 10000;
// Perform Puts
for (uint64_t i = 0; i < num_rows; i++) {
table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
"value" + std::to_string(i)));
}
+}
+void MakeGets(uint64_t num_rows, const std::string &row_prefix, std::vector<hbase::Get> &gets) {
// Perform the Gets
- std::vector<hbase::Get> gets;
for (uint64_t i = 0; i < num_rows; ++i) {
auto row = "test" + std::to_string(i);
hbase::Get get(row);
@@ -465,9 +463,10 @@ TEST_F(ClientTest, MultiGets) {
}
gets.push_back(hbase::Get("test2"));
gets.push_back(hbase::Get("testextra"));
+}
- auto results = table->Get(gets);
-
+void TestMultiResults(uint64_t num_rows, const std::vector<std::shared_ptr<hbase::Result>> &results,
+ const std::vector<hbase::Get> &gets) {
// Test the values, should be same as in put executed on hbase shell
ASSERT_TRUE(!results.empty()) << "Result vector shouldn't be empty.";
@@ -483,6 +482,66 @@ TEST_F(ClientTest, MultiGets) {
++i;
ASSERT_TRUE(results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must be empty";
+}
+
+TEST_F(ClientTest, MultiGets) {
+ std::string table_name = "t";
+ // Using TestUtil to populate test data
+ ClientTest::test_util->CreateTable(table_name, "d");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+ SetClientParams();
+ // Create a client
+ hbase::Client client(*ClientTest::test_util->conf());
+
+ uint64_t num_rows = 50000;
+ PerformPuts(num_rows, std::make_shared<hbase::Client>(client), table_name);
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ std::vector<hbase::Get> gets;
+ MakeGets(num_rows, "test", gets);
+
+ auto results = table->Get(gets);
+
+ TestMultiResults(num_rows, results, gets);
+
+ table->Close();
+ client.Close();
+}
+
+TEST_F(ClientTest, MultiGetsWithRegionSplits) {
+ // Using TestUtil to populate test data
+ std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400",
+ "test500", "test600", "test700", "test800", "test900"};
+ std::string table_name = "t";
+ ClientTest::test_util->CreateTable(table_name, "d", keys);
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+ SetClientParams();
+
+ // Create a client
+ hbase::Client client(*ClientTest::test_util->conf());
+
+ uint64_t num_rows = 50000;
+ PerformPuts(num_rows, std::make_shared<hbase::Client>(client), table_name);
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ std::vector<hbase::Get> gets;
+ MakeGets(num_rows, "test", gets);
+
+ auto results = table->Get(gets);
+
+ TestMultiResults(num_rows, results, gets);
table->Close();
client.Close();
diff --git a/hbase-native-client/core/multi-response.cc b/hbase-native-client/core/multi-response.cc
index f620c98..a4c2108 100644
--- a/hbase-native-client/core/multi-response.cc
+++ b/hbase-native-client/core/multi-response.cc
@@ -18,6 +18,7 @@
*/
#include "core/multi-response.h"
+#include <glog/logging.h>
#include "core/region-result.h"
using hbase::pb::RegionLoadStats;
@@ -36,35 +37,38 @@ int MultiResponse::Size() const {
void MultiResponse::AddRegionResult(const std::string& region_name, int32_t original_index,
std::shared_ptr<Result> result,
- std::shared_ptr<std::exception> exc) {
- bool region_found = false;
- for (auto itr = results_.begin(); itr != results_.end(); ++itr) {
- if (itr->first == region_name) {
- region_found = true;
- itr->second->AddResultOrException(original_index, result, exc);
- break;
- }
- }
- if (!region_found) {
+ std::shared_ptr<folly::exception_wrapper> exc) {
+ auto itr = results_.find(region_name);
+ if (itr == results_.end()) {
auto region_result = std::make_shared<RegionResult>();
region_result->AddResultOrException(original_index, result, exc);
results_[region_name] = region_result;
+ } else {
+ itr->second->AddResultOrException(original_index, result, exc);
}
}
void MultiResponse::AddRegionException(const std::string& region_name,
- std::shared_ptr<std::exception> exception) {
- exceptions_[region_name] = exception;
+ std::shared_ptr<folly::exception_wrapper> exception) {
+ VLOG(8) << "Store Region Exception:- " << exception->what() << "; Region[" << region_name << "];";
+ bool region_found = false;
+ auto itr = exceptions_.find(region_name);
+ if (itr == exceptions_.end()) {
+ auto region_result = std::make_shared<folly::exception_wrapper>();
+ exceptions_[region_name] = exception;
+ } else {
+ itr->second = exception;
+ }
}
-std::shared_ptr<std::exception> MultiResponse::RegionException(
+std::shared_ptr<folly::exception_wrapper> MultiResponse::RegionException(
const std::string& region_name) const {
auto find = exceptions_.at(region_name);
return find;
}
-const std::map<std::string, std::shared_ptr<std::exception> >& MultiResponse::RegionExceptions()
- const {
+const std::map<std::string, std::shared_ptr<folly::exception_wrapper> >&
+MultiResponse::RegionExceptions() const {
return exceptions_;
}
diff --git a/hbase-native-client/core/multi-response.h b/hbase-native-client/core/multi-response.h
index 96883fd..d38cfd6 100644
--- a/hbase-native-client/core/multi-response.h
+++ b/hbase-native-client/core/multi-response.h
@@ -20,6 +20,7 @@
#pragma once
#include <core/region-result.h>
+#include <folly/ExceptionWrapper.h>
#include <exception>
#include <map>
#include <memory>
@@ -46,17 +47,18 @@ class MultiResponse {
* @param resOrEx the result or error; will be empty for successful Put and Delete actions.
*/
void AddRegionResult(const std::string& region_name, int32_t original_index,
- std::shared_ptr<Result> result, std::shared_ptr<std::exception> exc);
+ std::shared_ptr<Result> result,
+ std::shared_ptr<folly::exception_wrapper> exc);
void AddRegionException(const std::string& region_name,
- std::shared_ptr<std::exception> exception);
+ std::shared_ptr<folly::exception_wrapper> exception);
/**
* @return the exception for the region, if any. Null otherwise.
*/
- std::shared_ptr<std::exception> RegionException(const std::string& region_name) const;
+ std::shared_ptr<folly::exception_wrapper> RegionException(const std::string& region_name) const;
- const std::map<std::string, std::shared_ptr<std::exception>>& RegionExceptions() const;
+ const std::map<std::string, std::shared_ptr<folly::exception_wrapper>>& RegionExceptions() const;
void AddStatistic(const std::string& region_name, std::shared_ptr<pb::RegionLoadStats> stat);
@@ -66,12 +68,12 @@ class MultiResponse {
private:
// map of regionName to map of Results by the original index for that Result
- std::map<std::string, std::shared_ptr<hbase::RegionResult>> results_;
+ std::map<std::string, std::shared_ptr<RegionResult>> results_;
/**
* The server can send us a failure for the region itself, instead of individual failure.
* It's a part of the protobuf definition.
*/
- std::map<std::string, std::shared_ptr<std::exception>> exceptions_;
+ std::map<std::string, std::shared_ptr<folly::exception_wrapper>> exceptions_;
};
} /* namespace hbase */
diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc
index 05ab274..206c876 100644
--- a/hbase-native-client/core/region-result.cc
+++ b/hbase-native-client/core/region-result.cc
@@ -30,7 +30,7 @@ RegionResult::RegionResult() {}
RegionResult::~RegionResult() {}
void RegionResult::AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result,
- std::shared_ptr<std::exception> exc) {
+ std::shared_ptr<folly::exception_wrapper> exc) {
auto index_found = result_or_excption_.find(index);
if (index_found == result_or_excption_.end()) {
result_or_excption_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr);
diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h
index cfd9e5a..b961634 100644
--- a/hbase-native-client/core/region-result.h
+++ b/hbase-native-client/core/region-result.h
@@ -19,6 +19,7 @@
#pragma once
+#include <folly/ExceptionWrapper.h>
#include <map>
#include <memory>
#include <string>
@@ -29,13 +30,13 @@
namespace hbase {
using ResultOrExceptionTuple =
- std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<std::exception>>;
+ std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<folly::exception_wrapper>>;
class RegionResult {
public:
RegionResult();
void AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result,
- std::shared_ptr<std::exception> exc);
+ std::shared_ptr<folly::exception_wrapper> exc);
void set_stat(std::shared_ptr<pb::RegionLoadStats> stat);
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 4f9bfb1..960c487 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -18,7 +18,6 @@
*/
#include "core/response-converter.h"
-
#include <glog/logging.h>
#include <stdexcept>
#include <string>
@@ -125,8 +124,9 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
return results;
}
-std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ptr<Request> req,
- const Response& resp) {
+std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(
+ std::shared_ptr<Request> req, const Response& resp,
+ const ServerRequest::ActionsByRegion& actions_by_region) {
auto multi_req = std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg());
auto multi_resp = std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg());
VLOG(3) << "GetResults:" << multi_resp->ShortDebugString();
@@ -148,11 +148,10 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
auto region_name = rs.value();
if (action_result.has_exception()) {
- if (action_result.exception().has_value()) {
- auto exc = std::make_shared<hbase::IOException>(action_result.exception().value());
- VLOG(8) << "Store Region Exception:- " << exc->what();
- multi_response->AddRegionException(region_name, exc);
- }
+ auto ew = ResponseConverter::GetRemoteException(action_result.exception());
+ VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region["
+ << region_name << "];";
+ multi_response->AddRegionException(region_name, ew);
continue;
}
@@ -163,14 +162,16 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
" for region " + actions.region().value());
}
+ auto multi_actions = actions_by_region.at(region_name)->actions();
+ uint64_t multi_actions_num = 0;
for (hbase::pb::ResultOrException roe : action_result.resultorexception()) {
std::shared_ptr<Result> result;
- std::shared_ptr<std::exception> exc;
+ std::shared_ptr<folly::exception_wrapper> ew;
if (roe.has_exception()) {
- if (roe.exception().has_value()) {
- exc = std::make_shared<hbase::IOException>(roe.exception().value());
- VLOG(8) << "Store ResultOrException:- " << exc->what();
- }
+ auto ew = ResponseConverter::GetRemoteException(roe.exception());
+ VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region["
+ << region_name << "];";
+ multi_response->AddRegionException(region_name, ew);
} else if (roe.has_result()) {
result = ToResult(roe.result(), resp.cell_scanner());
} else if (roe.has_service_result()) {
@@ -183,7 +184,11 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
result = std::make_shared<Result>(empty_cells, multi_resp->processed() ? true : false,
false, false);
}
- multi_response->AddRegionResult(region_name, roe.index(), std::move(result), exc);
+ // We add the original index of the multi-action so that when populating the response back we
+ // do it as per the action index
+ multi_response->AddRegionResult(
+ region_name, multi_actions[multi_actions_num]->original_index(), std::move(result), ew);
+ multi_actions_num++;
}
}
@@ -196,4 +201,21 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
}
return multi_response;
}
+
+std::shared_ptr<folly::exception_wrapper> ResponseConverter::GetRemoteException(
+ const hbase::pb::NameBytesPair& exc_resp) {
+ std::string what;
+ std::string exception_class_name = exc_resp.has_name() ? exc_resp.name() : "";
+ std::string stack_trace = exc_resp.has_value() ? exc_resp.value() : "";
+
+ what.append(exception_class_name).append(stack_trace);
+ auto remote_exception = std::make_unique<RemoteException>(what);
+ remote_exception->set_exception_class_name(exception_class_name)
+ ->set_stack_trace(stack_trace)
+ ->set_hostname("")
+ ->set_port(0);
+
+ return std::make_shared<folly::exception_wrapper>(
+ folly::make_exception_wrapper<RemoteException>(*remote_exception));
+}
} /* namespace hbase */
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index 2f8f279..edd4165 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -25,6 +25,7 @@
#include "connection/response.h"
#include "core/multi-response.h"
#include "core/result.h"
+#include "core/server-request.h"
#include "if/Client.pb.h"
#include "serde/cell-scanner.h"
@@ -56,12 +57,15 @@ class ResponseConverter {
static std::vector<std::shared_ptr<Result>> FromScanResponse(
const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner);
- static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req,
- const Response& resp);
+ static std::unique_ptr<hbase::MultiResponse> GetResults(
+ std::shared_ptr<Request> req, const Response& resp,
+ const ServerRequest::ActionsByRegion& actions_by_region);
private:
// Constructor not required. We have all static methods to extract response from PB messages.
ResponseConverter();
+ static std::shared_ptr<folly::exception_wrapper> GetRemoteException(
+ const hbase::pb::NameBytesPair& exc_resp);
};
} /* namespace hbase */
diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h
index 7f31c2b..85df9ed 100644
--- a/hbase-native-client/core/server-request.h
+++ b/hbase-native-client/core/server-request.h
@@ -44,8 +44,14 @@ class ServerRequest {
void AddActionsByRegion(std::shared_ptr<RegionLocation> region_location,
std::shared_ptr<Action> action) {
auto region_name = region_location->region_name();
- auto itr = actions_by_region_.at(region_name);
- itr->AddAction(action);
+ auto search = actions_by_region_.find(region_name);
+ if (search == actions_by_region_.end()) {
+ auto region_request = std::make_shared<RegionRequest>(region_location);
+ actions_by_region_[region_name] = region_request;
+ actions_by_region_[region_name]->AddAction(action);
+ } else {
+ search->second->AddAction(action);
+ }
}
const ActionsByRegion &actions_by_region() const { return actions_by_region_; }