You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/05/16 18:43:18 UTC
hbase git commit: HBASE-17576 [C++] Implement request retry mechanism
over RPC for Multi calls. (Sudeep Sunthankar)
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 018f1eab2 -> ccfc68251
HBASE-17576 [C++] Implement request retry mechanism over RPC for Multi calls. (Sudeep Sunthankar)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ccfc6825
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ccfc6825
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ccfc6825
Branch: refs/heads/HBASE-14850
Commit: ccfc68251658f4b30081553be62797d18540a8b4
Parents: 018f1ea
Author: Enis Soztutar <en...@apache.org>
Authored: Tue May 16 11:43:04 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue May 16 11:43:04 2017 -0700
----------------------------------------------------------------------
hbase-native-client/core/BUCK | 2 +
hbase-native-client/core/action.h | 13 +-
.../core/async-batch-rpc-retrying-caller.cc | 501 +++++++++++++++++++
.../core/async-batch-rpc-retrying-caller.h | 183 +++++++
.../core/async-rpc-retrying-caller-factory.h | 84 +++-
.../core/async-rpc-retrying-caller.cc | 4 +-
.../core/async-rpc-retrying-test.cc | 4 +-
hbase-native-client/core/client-test.cc | 53 ++
hbase-native-client/core/raw-async-table.cc | 22 +-
hbase-native-client/core/raw-async-table.h | 9 +-
hbase-native-client/core/request-converter.cc | 13 +-
hbase-native-client/core/simple-client.cc | 22 +-
hbase-native-client/core/table.cc | 18 +-
hbase-native-client/core/table.h | 3 +-
14 files changed, 897 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 412ee3b..e9fc716 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -53,6 +53,7 @@ cxx_library(
"region-result.h",
"row.h",
"server-request.h",
+ "async-batch-rpc-retrying-caller.h",
],
srcs=[
"async-connection.cc",
@@ -77,6 +78,7 @@ cxx_library(
"zk-util.cc",
"multi-response.cc",
"region-result.cc",
+ "async-batch-rpc-retrying-caller.cc",
],
deps=[
"//exceptions:exceptions",
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/action.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h
index 3511683..21a0181 100644
--- a/hbase-native-client/core/action.h
+++ b/hbase-native-client/core/action.h
@@ -20,24 +20,23 @@
#pragma once
#include <memory>
-#include "core/row.h"
+#include "core/get.h"
-using hbase::Row;
namespace hbase {
class Action {
public:
- Action(std::shared_ptr<Row> action, int original_index)
+ Action(std::shared_ptr<hbase::Get> action, int32_t original_index)
: action_(action), original_index_(original_index) {}
~Action() {}
- int64_t original_index() const { return original_index_; }
+ int32_t original_index() const { return original_index_; }
- std::shared_ptr<Row> action() const { return action_; }
+ std::shared_ptr<hbase::Get> action() const { return action_; }
private:
- std::shared_ptr<Row> action_;
- int64_t original_index_;
+ std::shared_ptr<hbase::Get> action_;
+ int32_t original_index_;
int64_t nonce_ = -1;
int32_t replica_id_ = -1;
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
new file mode 100644
index 0000000..f3be637
--- /dev/null
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-batch-rpc-retrying-caller.h"
+#include <glog/logging.h>
+#include <limits>
+
+using folly::Future;
+using folly::Promise;
+using folly::Try;
+
+using folly::Future;
+using folly::Promise;
+using folly::Try;
+using hbase::Action;
+using hbase::LocationCache;
+using hbase::MultiResponse;
+using hbase::RegionLocation;
+using hbase::RegionRequest;
+using hbase::RequestConverter;
+using hbase::Result;
+using hbase::RpcClient;
+using hbase::ServerRequest;
+using hbase::pb::ServerName;
+using hbase::pb::TableName;
+using hbase::security::User;
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+using wangle::CPUThreadPoolExecutor;
+
+namespace hbase {
+
+AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller(
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<TableName> table_name, const std::vector<hbase::Get> &actions,
+ nanoseconds pause_ns, int32_t max_attempts, nanoseconds operation_timeout_ns,
+ nanoseconds rpc_timeout_ns, int32_t start_log_errors_count)
+ : conn_(conn),
+ retry_timer_(retry_timer),
+ table_name_(table_name),
+ pause_ns_(pause_ns),
+ operation_timeout_ns_(operation_timeout_ns),
+ rpc_timeout_ns_(rpc_timeout_ns),
+ start_log_errors_count_(start_log_errors_count) {
+ CHECK(conn_ != nullptr);
+ CHECK(retry_timer_ != nullptr);
+ location_cache_ = conn_->region_locator();
+ rpc_client_ = conn_->rpc_client();
+ cpu_pool_ = conn_->cpu_executor();
+ CHECK(location_cache_ != nullptr);
+ CHECK(rpc_client_ != nullptr);
+ CHECK(cpu_pool_ != nullptr);
+
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts);
+ uint32_t index = 0;
+ for (auto row : actions) {
+ actions_.push_back(std::make_shared<Action>(std::make_shared<hbase::Get>(row), index));
+ Promise<std::shared_ptr<Result>> prom{};
+ action2promises_.insert(
+ std::pair<uint64_t, Promise<std::shared_ptr<Result>>>(index, std::move(prom)));
+ action2futures_.push_back(action2promises_[index++].getFuture());
+ }
+}
+
+AsyncBatchRpcRetryingCaller::~AsyncBatchRpcRetryingCaller() {}
+
+Future<std::vector<Try<std::shared_ptr<Result>>>> AsyncBatchRpcRetryingCaller::Call() {
+ GroupAndSend(actions_, 1);
+ return collectAll(action2futures_);
+}
+
+int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() {
+ return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+void AsyncBatchRpcRetryingCaller::LogException(int32_t tries,
+ std::shared_ptr<RegionRequest> region_request,
+ std::shared_ptr<std::exception> &error,
+ std::shared_ptr<ServerName> server_name) {
+ if (tries > start_log_errors_count_) {
+ std::string regions;
+ regions += region_request->region_location()->region_name() + ", ";
+ LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
+ << table_name_->qualifier() << " from " << server_name->host_name()
+ << " failed, tries=" << tries << ":- " << error->what();
+ }
+}
+
+void AsyncBatchRpcRetryingCaller::LogException(
+ int32_t tries, std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
+ std::shared_ptr<std::exception> &error, std::shared_ptr<ServerName> server_name) {
+ if (tries > start_log_errors_count_) {
+ std::string regions;
+ for (const auto region_request : region_requests) {
+ regions += region_request->region_location()->region_name() + ", ";
+ }
+ LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
+ << table_name_->qualifier() << " from " << server_name->host_name()
+ << " failed, tries=" << tries << error->what();
+ }
+}
+
+const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError(
+ std::shared_ptr<ServerName> server_name) {
+ return server_name ? server_name->ShortDebugString() : "";
+}
+
+// TODO HBASE-17800 pass folly ew instead of std::exception
+void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr<Action> &action,
+ std::shared_ptr<std::exception> error,
+ std::shared_ptr<ServerName> server_name) {
+ folly::exception_wrapper ew;
+ ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+ AddAction2Error(action->original_index(), twec);
+}
+
+void AsyncBatchRpcRetryingCaller::AddError(const std::vector<std::shared_ptr<Action>> &actions,
+ std::shared_ptr<std::exception> error,
+ std::shared_ptr<ServerName> server_name) {
+ for (const auto action : actions) {
+ AddError(action, error, server_name);
+ }
+}
+
+// TODO HBASE-17800 pass folly ew instead of std::exception
+void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, int32_t tries,
+ std::shared_ptr<std::exception> error,
+ int64_t current_time, const std::string extras) {
+ auto action_index = action->original_index();
+ auto itr = action2promises_.find(action_index);
+ if (itr != action2promises_.end()) {
+ if (itr->second.isFulfilled()) {
+ return;
+ }
+ }
+ folly::exception_wrapper ew;
+ ThrowableWithExtraContext twec(ew, current_time, extras);
+ AddAction2Error(action_index, twec);
+ action2promises_[action_index].setException(
+ RetriesExhaustedException(tries - 1, action2errors_[action_index]));
+}
+
+void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
+ int32_t tries, std::shared_ptr<std::exception> error,
+ std::shared_ptr<ServerName> server_name) {
+ for (const auto action : actions) {
+ FailOne(action, tries, error, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+ }
+}
+
+void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
+ int32_t tries) {
+ for (const auto action : actions) {
+ auto action_index = action->original_index();
+ auto itr = action2promises_.find(action_index);
+ if (itr->second.isFulfilled()) {
+ return;
+ }
+ action2promises_[action_index].setException(
+ RetriesExhaustedException(tries - 1, action2errors_[action_index]));
+ }
+}
+
+void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index,
+ const ThrowableWithExtraContext &twec) {
+ auto erritr = action2errors_.find(action_index);
+ if (erritr != action2errors_.end()) {
+ erritr->second->push_back(twec);
+ } else {
+ action2errors_[action_index] = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+ action2errors_[action_index]->push_back(twec);
+ }
+ return;
+}
+
+void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries,
+ std::shared_ptr<std::exception> exc,
+ std::shared_ptr<ServerName> server_name) {
+ std::vector<std::shared_ptr<Action>> copied_actions;
+ std::vector<std::shared_ptr<RegionRequest>> region_requests;
+ for (const auto &action_by_region : actions_by_region) {
+ region_requests.push_back(action_by_region.second);
+ // Concurrent
+ for (const auto &action : action_by_region.second->actions()) {
+ copied_actions.push_back(action);
+ }
+ }
+ // TODO HBASE-17800 for exc check with DoNotRetryIOException
+ LogException(tries, region_requests, exc, server_name);
+ if (tries >= max_attempts_) {
+ FailAll(copied_actions, tries, exc, server_name);
+ return;
+ }
+ AddError(copied_actions, exc, server_name);
+ TryResubmit(copied_actions, tries);
+}
+
+void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector<std::shared_ptr<Action>> actions,
+ int32_t tries) {
+ int64_t delay_ns;
+ if (operation_timeout_ns_.count() > 0) {
+ int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+ if (max_delay_ns <= 0) {
+ VLOG(8) << "Fail All from onError";
+ FailAll(actions, tries);
+ return;
+ }
+ delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1));
+ } else {
+ delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1);
+ }
+ // TODO This gives segfault @ present, when retried
+ // retry_timer_->scheduleTimeoutFn([&]() { GroupAndSend(actions, tries + 1); },
+ // milliseconds(TimeUtil::ToMillis(delay_ns)));
+}
+
+Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
+AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector<std::shared_ptr<Action>> &actions,
+ int64_t locate_timeout_ns) {
+ auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{};
+ for (auto const &action : actions) {
+ locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(),
+ RegionLocateType::kCurrent, locate_timeout_ns));
+ }
+
+ return collectAll(locs);
+}
+
+void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions,
+ int32_t tries) {
+ int64_t locate_timeout_ns;
+ if (operation_timeout_ns_.count() > 0) {
+ locate_timeout_ns = RemainingTimeNs();
+ if (locate_timeout_ns <= 0) {
+ FailAll(actions_, tries);
+ return;
+ }
+ } else {
+ locate_timeout_ns = -1L;
+ }
+
+ GetRegionLocations(actions, locate_timeout_ns)
+ .then([&](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
+ std::lock_guard<std::mutex> lock(multi_mutex_);
+ ActionsByServer actions_by_server;
+ std::vector<std::shared_ptr<Action>> locate_failed;
+
+ for (uint64_t i = 0; i < loc.size(); ++i) {
+ auto action = actions[i];
+ if (loc[i].hasValue()) {
+ auto region_loc = loc[i].value();
+ // Add it to actions_by_server;
+ // Concurrent
+ auto search =
+ actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
+ if (search != actions_by_server.end()) {
+ search->second->AddActionsByRegion(region_loc, action);
+ } else {
+ // Create new key
+ auto server_request = std::make_shared<ServerRequest>(region_loc);
+ server_request->AddActionsByRegion(region_loc, action);
+ auto server_name = std::make_shared<ServerName>(region_loc->server_name());
+ actions_by_server[server_name] = server_request;
+ }
+ locate_failed.push_back(action);
+ VLOG(8) << "row [" << action->action()->row() << "] of table["
+ << table_name_->namespace_() << ":" << table_name_->qualifier()
+ << " found in region [" << region_loc->region_name() << "]; host["
+ << region_loc->server_name().host_name() << "]; port["
+ << region_loc->server_name().port() << "];";
+ } else if (loc[i].hasException()) {
+ VLOG(8) << "Exception occured while locating region:- "
+ << loc[i].exception().getCopied()->what() << " for action index " << i;
+ // TODO Feedback needed, Java API only identifies DoNotRetryIOException
+ // We might receive runtime error from location-cache.cc too, we are treating both same
+ if (loc[i].exception().is_compatible_with<std::runtime_error>()) {
+ std::string extra = "";
+ FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(),
+ loc[i].exception().what().toStdString());
+ return;
+ }
+ // TODO HBASE-17800 for exc check with DoNotRetryIOException
+ /*
+ else if (loc[i].exception().is_compatible_with<hbase::DoNotRetryIOException>()) {
+ int64_t current_time = 0;
+ std::string extra = "";
+ FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(),
+ loc[i].exception().what().toStdString());
+ return;
+ }*/
+ AddError(action, std::make_shared<std::exception>(*loc[i].exception().getCopied()),
+ nullptr);
+ locate_failed.push_back(action);
+ }
+ }
+
+ if (!actions_by_server.empty()) {
+ Send(actions_by_server, tries);
+ }
+
+ if (!locate_failed.empty()) {
+ TryResubmit(locate_failed, tries);
+ }
+ })
+ .onError([&](const folly::exception_wrapper &ew) {
+ std::lock_guard<std::mutex> lock(multi_mutex_);
+ auto exc = ew.getCopied();
+ VLOG(8) << "GetRegionLocations() exception: " << ew.what().toStdString();
+ });
+ return;
+}
+
+Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller::GetMultiResponse(
+ const ActionsByServer &actions_by_server) {
+ // Concurrent.
+ auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
+ auto user = User::defaultUser();
+ for (const auto &action_by_server : actions_by_server) {
+ std::unique_ptr<Request> multi_req =
+ RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region());
+ auto host = action_by_server.first->host_name();
+ int port = action_by_server.first->port();
+ multi_calls.push_back(
+ rpc_client_->AsyncCall(host, port, std::move(multi_req), user, "ClientService"));
+ }
+ return collectAll(multi_calls);
+}
+
+void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32_t tries) {
+ int64_t remaining_ns;
+ if (operation_timeout_ns_.count() > 0) {
+ remaining_ns = RemainingTimeNs();
+ if (remaining_ns <= 0) {
+ std::vector<std::shared_ptr<Action>> failed_actions;
+ for (const auto &action_by_server : actions_by_server) {
+ // Concurrent
+ for (auto &value : action_by_server.second->actions_by_region()) {
+ // Concurrent
+ for (const auto &failed_action : value.second->actions()) {
+ failed_actions.push_back(failed_action);
+ }
+ }
+ }
+ FailAll(failed_actions, tries);
+ return;
+ }
+ } else {
+ remaining_ns = std::numeric_limits<int64_t>::max();
+ }
+
+ std::vector<std::shared_ptr<Request>> multi_reqv;
+ for (const auto &action_by_server : actions_by_server)
+ multi_reqv.push_back(
+ std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region())));
+
+ GetMultiResponse(actions_by_server)
+ .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
+ std::lock_guard<std::mutex> lock(multi_mutex_);
+ for (uint64_t num = 0; num < completed_responses.size(); ++num) {
+ if (completed_responses[num].hasValue()) {
+ auto multi_response =
+ ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value());
+ for (const auto &action_by_server : actions_by_server) {
+ OnComplete(action_by_server.second->actions_by_region(), tries,
+ action_by_server.first, std::move(multi_response));
+ }
+ } else if (completed_responses[num].hasException()) {
+ VLOG(8) << "Received exception: "
+ << completed_responses[num].exception().getCopied()->what()
+ << " from server for action index " << num;
+ // TODO: we should call OnError here as well.
+ }
+ }
+ })
+ .onError([=](const folly::exception_wrapper &ew) {
+ auto exc = ew.getCopied();
+ VLOG(8) << "GetMultiResponse() exception: " << ew.what().toStdString();
+ std::lock_guard<std::mutex> lock(multi_mutex_);
+ for (const auto &action_by_server : actions_by_server) {
+ OnError(action_by_server.second->actions_by_region(), tries,
+ std::make_shared<std::exception>(*exc), action_by_server.first);
+ }
+ });
+ return;
+}
+
+void AsyncBatchRpcRetryingCaller::OnComplete(
+ const ActionsByRegion &actions_by_region, int32_t tries,
+ const std::shared_ptr<ServerName> server_name,
+ const std::unique_ptr<hbase::MultiResponse> multi_response) {
+ std::vector<std::shared_ptr<Action>> failed_actions;
+ for (const auto &action_by_region : actions_by_region) {
+ auto region_result_itr = multi_response->RegionResults().find(action_by_region.first);
+ if (region_result_itr == multi_response->RegionResults().end()) {
+ VLOG(8) << "Region " << action_by_region.first << " not found in MultiResults.";
+ // TODO Feedback needed Should we throw from here or continue for next action_by_region ?
+ // Throwing at present as this looks like an inconsistency
+ // Concurrent
+ auto exc = std::make_shared<std::runtime_error>("Invalid search for region " +
+ action_by_region.first + " in multi results");
+ FailAll(action_by_region.second->actions(), tries, exc, server_name);
+ return;
+ // std::runtime_error(
+ // "Invalid search for region " + action_by_region.first + " in multi results");
+ }
+ if (region_result_itr != multi_response->RegionResults().end()) {
+ // Concurrent
+ for (const auto &action : action_by_region.second->actions()) {
+ OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second,
+ failed_actions);
+ }
+ } else {
+ auto region_exc = multi_response->RegionException(action_by_region.first);
+ std::shared_ptr<std::exception> pexc;
+ if (region_exc == nullptr) {
+ VLOG(8) << "Server sent us neither results nor exceptions for " << action_by_region.first;
+ pexc = std::make_shared<std::exception>(std::runtime_error("Invalid response"));
+ // TODO: raise this exception to the application
+ } else {
+ // TODO HBASE-17800 for exc check with DoNotRetryIOException
+ LogException(tries, action_by_region.second, region_exc, server_name);
+ location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
+ *region_exc);
+ std::string row_name;
+ if (tries >= max_attempts_) {
+ // Concurrent
+ FailAll(action_by_region.second->actions(), tries, region_exc, server_name);
+ return;
+ }
+ // Concurrent
+ AddError(action_by_region.second->actions(), region_exc, server_name);
+ for (const auto &action : action_by_region.second->actions()) {
+ failed_actions.push_back(action);
+ }
+ }
+ }
+ }
+ if (!failed_actions.empty()) {
+ TryResubmit(failed_actions, tries);
+ }
+ return;
+}
+
+void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &action,
+ const std::shared_ptr<RegionRequest> ®ion_request,
+ int32_t tries,
+ const std::shared_ptr<ServerName> &server_name,
+ const std::shared_ptr<RegionResult> ®ion_result,
+ std::vector<std::shared_ptr<Action>> &failed_actions) {
+ std::string err_msg;
+ try {
+ auto result_or_exc = region_result->ResultOrException(action->original_index());
+ auto result = std::get<0>(*result_or_exc);
+ auto exc = std::get<1>(*result_or_exc);
+ std::shared_ptr<std::exception> pexc;
+ if (exc != nullptr) {
+ LogException(tries, region_request, exc, server_name);
+ if (tries >= max_attempts_) {
+ FailOne(action, tries, exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+ } else {
+ failed_actions.push_back(action);
+ }
+ } else if (result != nullptr) {
+ action2promises_[action->original_index()].setValue(std::move(result));
+ } else {
+ VLOG(8) << "Server " << server_name->ShortDebugString()
+ << " sent us neither results nor exceptions for request @ index "
+ << action->original_index() << ", row " << action->action()->row() << " of "
+ << region_request->region_location()->region_name();
+ err_msg = "Invalid response";
+ AddError(action, std::make_shared<std::runtime_error>(err_msg), server_name);
+ failed_actions.push_back(action);
+ }
+ } catch (const std::out_of_range &oor) {
+ // TODO Feedback needed. Should we retry for he specific index again ?
+ // This should never occur, so we are throwing a std::runtime_error from here
+ VLOG(8) << "No ResultOrException found @ index " << action->original_index() << ", row "
+ << action->action()->row() << " of "
+ << region_request->region_location()->region_name();
+ throw std::runtime_error("ResultOrException not present @ index " +
+ std::to_string(action->original_index()));
+ }
+ return;
+}
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-batch-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
new file mode 100644
index 0000000..6803a0e
--- /dev/null
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <folly/Format.h>
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
+#include <folly/futures/Try.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/async/HHWheelTimer.h>
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+
+#include <algorithm>
+#include <chrono>
+#include <functional>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <stdexcept>
+#include <string>
+#include <tuple>
+#include <type_traits>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "connection/rpc-client.h"
+#include "core/action.h"
+#include "core/async-connection.h"
+#include "core/location-cache.h"
+#include "core/multi-response.h"
+#include "core/region-location.h"
+#include "core/region-request.h"
+#include "core/region-result.h"
+#include "core/request-converter.h"
+#include "core/response-converter.h"
+#include "core/result.h"
+#include "core/row.h"
+#include "core/server-request.h"
+#include "exceptions/exception.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+#include "security/user.h"
+#include "utils/connection-util.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+namespace hbase {
+/* Equals function for ServerName */
+struct ServerNameEquals {
+ bool operator()(const std::shared_ptr<ServerName> &lhs,
+ const std::shared_ptr<ServerName> &rhs) const {
+ return (lhs->start_code() == rhs->start_code() && lhs->host_name() == rhs->host_name() &&
+ lhs->port() == rhs->port());
+ }
+};
+
+struct ServerNameHash {
+ /** hash */
+ std::size_t operator()(const std::shared_ptr<ServerName> &sn) const {
+ std::size_t h = 0;
+ boost::hash_combine(h, sn->start_code());
+ boost::hash_combine(h, sn->host_name());
+ boost::hash_combine(h, sn->port());
+ return h;
+ }
+};
+
+class AsyncBatchRpcRetryingCaller {
+ public:
+ using ActionsByServer =
+ std::unordered_map<std::shared_ptr<ServerName>, std::shared_ptr<ServerRequest>,
+ ServerNameHash, ServerNameEquals>;
+ using ActionsByRegion = ServerRequest::ActionsByRegion;
+
+ AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<pb::TableName> table_name,
+ const std::vector<hbase::Get> &actions, nanoseconds pause_ns,
+ int32_t max_attempts, nanoseconds operation_timeout_ns,
+ nanoseconds rpc_timeout_ns, int32_t start_log_errors_count);
+
+ ~AsyncBatchRpcRetryingCaller();
+
+ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call();
+
+ private:
+ int64_t RemainingTimeNs();
+
+ void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request,
+ std::shared_ptr<std::exception> &error,
+ std::shared_ptr<ServerName> server_name);
+
+ void LogException(int32_t tries, std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
+ std::shared_ptr<std::exception> &error,
+ std::shared_ptr<ServerName> server_name);
+
+ const std::string GetExtraContextForError(std::shared_ptr<ServerName> server_name);
+
+ void AddError(const std::shared_ptr<Action> &action, std::shared_ptr<std::exception> error,
+ std::shared_ptr<ServerName> server_name);
+
+ void AddError(const std::vector<std::shared_ptr<Action>> &actions,
+ std::shared_ptr<std::exception> error, std::shared_ptr<ServerName> server_name);
+
+ void FailOne(const std::shared_ptr<Action> &action, int32_t tries,
+ std::shared_ptr<std::exception> error, int64_t current_time,
+ const std::string extras);
+
+ void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
+ std::shared_ptr<std::exception> error, std::shared_ptr<ServerName> server_name);
+
+ void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
+
+ void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec);
+
+ void OnError(const ActionsByRegion &actions_by_region, int32_t tries,
+ std::shared_ptr<std::exception> exc, std::shared_ptr<ServerName> server_name);
+
+ void TryResubmit(std::vector<std::shared_ptr<Action>> actions, int32_t tries);
+
+ folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations(
+ const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns);
+
+ void GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
+
+ folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse(
+ const ActionsByServer &actions_by_server);
+
+ void Send(ActionsByServer &actions_by_server, int32_t tries);
+
+ void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries,
+ const std::shared_ptr<ServerName> server_name,
+ const std::unique_ptr<MultiResponse> multi_results);
+
+ void OnComplete(const std::shared_ptr<Action> &action,
+ const std::shared_ptr<RegionRequest> ®ion_request, int32_t tries,
+ const std::shared_ptr<ServerName> &server_name,
+ const std::shared_ptr<RegionResult> ®ion_result,
+ std::vector<std::shared_ptr<Action>> &failed_actions);
+
+ private:
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+ std::shared_ptr<hbase::AsyncConnection> conn_;
+ std::shared_ptr<pb::TableName> table_name_;
+ std::vector<std::shared_ptr<Action>> actions_;
+ nanoseconds pause_ns_;
+ int32_t max_attempts_ = 0;
+ nanoseconds operation_timeout_ns_;
+ nanoseconds rpc_timeout_ns_;
+ int32_t start_log_errors_count_ = 0;
+
+ int64_t start_ns_ = TimeUtil::GetNowNanos();
+ int32_t tries_ = 1;
+ std::map<uint64_t, folly::Promise<std::shared_ptr<Result>>> action2promises_;
+ std::vector<folly::Future<std::shared_ptr<Result>>> action2futures_;
+ std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_;
+
+ std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr;
+ std::shared_ptr<RpcClient> rpc_client_ = nullptr;
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr;
+
+ std::mutex multi_mutex_;
+};
+
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-rpc-retrying-caller-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 5a80a06..f1ffdac 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -19,20 +19,19 @@
#pragma once
#include <folly/Logging.h>
-#include <folly/io/IOBuf.h>
#include <folly/io/async/EventBase.h>
#include <chrono>
#include <memory>
#include <string>
+#include <vector>
#include "connection/rpc-client.h"
+#include "core/async-batch-rpc-retrying-caller.h"
#include "core/async-rpc-retrying-caller.h"
+#include "core/row.h"
#include "if/Client.pb.h"
#include "if/HBase.pb.h"
-using hbase::pb::TableName;
-using std::chrono::nanoseconds;
-
namespace hbase {
class AsyncConnection;
@@ -58,7 +57,7 @@ class SingleRequestCallerBuilder
typedef SingleRequestCallerBuilder<RESP> GenenericThisType;
typedef std::shared_ptr<GenenericThisType> SharedThisPtr;
- SharedThisPtr table(std::shared_ptr<TableName> table_name) {
+ SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
table_name_ = table_name;
return shared_this();
}
@@ -119,7 +118,7 @@ class SingleRequestCallerBuilder
private:
std::shared_ptr<AsyncConnection> conn_;
std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<TableName> table_name_;
+ std::shared_ptr<pb::TableName> table_name_;
nanoseconds rpc_timeout_nanos_;
nanoseconds operation_timeout_nanos_;
nanoseconds pause_;
@@ -130,6 +129,75 @@ class SingleRequestCallerBuilder
Callable<RESP> callable_;
}; // end of SingleRequestCallerBuilder
+class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder> {
+ public:
+ explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer)
+ : conn_(conn), retry_timer_(retry_timer) {}
+
+ virtual ~BatchCallerBuilder() = default;
+
+ typedef std::shared_ptr<BatchCallerBuilder> SharedThisPtr;
+
+ SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
+ table_name_ = table_name;
+ return shared_this();
+ }
+
+ SharedThisPtr actions(std::shared_ptr<std::vector<hbase::Get>> actions) {
+ actions_ = actions;
+ return shared_this();
+ }
+
+ SharedThisPtr operation_timeout(nanoseconds operation_timeout_nanos) {
+ operation_timeout_nanos_ = operation_timeout_nanos;
+ return shared_this();
+ }
+
+ SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) {
+ rpc_timeout_nanos_ = rpc_timeout_nanos;
+ return shared_this();
+ }
+
+ SharedThisPtr pause(nanoseconds pause_ns) {
+ pause_ns_ = pause_ns;
+ return shared_this();
+ }
+
+ SharedThisPtr max_attempts(int32_t max_attempts) {
+ max_attempts_ = max_attempts;
+ return shared_this();
+ }
+
+ SharedThisPtr start_log_errors_count(int32_t start_log_errors_count) {
+ start_log_errors_count_ = start_log_errors_count;
+ return shared_this();
+ }
+
+ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call() { return Build()->Call(); }
+
+ std::shared_ptr<AsyncBatchRpcRetryingCaller> Build() {
+ return std::make_shared<AsyncBatchRpcRetryingCaller>(
+ conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_,
+ operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
+ }
+
+ private:
+ SharedThisPtr shared_this() {
+ return std::enable_shared_from_this<BatchCallerBuilder>::shared_from_this();
+ }
+
+ private:
+ std::shared_ptr<AsyncConnection> conn_;
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+ std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr;
+ std::shared_ptr<std::vector<hbase::Get>> actions_ = nullptr;
+ nanoseconds pause_ns_;
+ int32_t max_attempts_ = 0;
+ nanoseconds operation_timeout_nanos_;
+ nanoseconds rpc_timeout_nanos_;
+ int32_t start_log_errors_count_ = 0;
+};
class AsyncRpcRetryingCallerFactory {
private:
std::shared_ptr<AsyncConnection> conn_;
@@ -146,6 +214,10 @@ class AsyncRpcRetryingCallerFactory {
std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() {
return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_);
}
+
+ std::shared_ptr<BatchCallerBuilder> Batch() {
+ return std::make_shared<BatchCallerBuilder>(conn_, retry_timer_);
+ }
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index 7e211f7..f8b237b 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -143,9 +143,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
*/
conn_->retry_executor()->add([&]() {
retry_timer_->scheduleTimeoutFn(
- [this]() {
- conn_->cpu_executor()->add([&]() { LocateThenCall(); });
- },
+ [this]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); },
milliseconds(TimeUtil::ToMillis(delay_ns)));
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index ff28e79..487c34c 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -159,9 +159,9 @@ class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
uint32_t num_fails_ = 0;
public:
- explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
+ explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
: AsyncRegionLocatorBase(), num_fails_(num_fails) {}
- explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
: AsyncRegionLocatorBase(region_location) {}
virtual ~MockFailingAsyncRegionLocator() {}
folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index 274168f..1c6ec4a 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -234,3 +234,56 @@ TEST_F(ClientTest, PutsWithTimestamp) {
table->Close();
client.Close();
}
+
+TEST_F(ClientTest, MultiGets) {
+ // Using TestUtil to populate test data
+ ClientTest::test_util->CreateTable("t", "d");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>("t");
+
+ // Create a client
+ hbase::Client client(*ClientTest::test_util->conf());
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ uint64_t num_rows = 10000;
+ // Perform Puts
+ for (uint64_t i = 0; i < num_rows; i++) {
+ table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+ "value" + std::to_string(i)));
+ }
+
+ // Perform the Gets
+ std::vector<hbase::Get> gets;
+ for (uint64_t i = 0; i < num_rows; ++i) {
+ auto row = "test" + std::to_string(i);
+ hbase::Get get(row);
+ gets.push_back(get);
+ }
+ gets.push_back(hbase::Get("test2"));
+ gets.push_back(hbase::Get("testextra"));
+
+ auto results = table->Get(gets);
+
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!results.empty()) << "Result vector shouldn't be empty.";
+
+ uint32_t i = 0;
+ for (; i < num_rows; ++i) {
+ ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
+ << " must not be empty";
+ EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
+ EXPECT_EQ("value" + std::to_string(i), *results[i]->Value("d", std::to_string(i)).get());
+ }
+ // We are inserting test2 twice so the below test should pass
+ ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must not be empty";
+
+ ++i;
+ ASSERT_TRUE(results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must be empty";
+
+ table->Close();
+ client.Close();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index 2bc9f36..9e0d4a3 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -91,4 +91,24 @@ Future<Unit> RawAsyncTable::Put(const hbase::Put& put) {
return caller->Call().then([caller](const auto r) { return r; });
}
-} /* namespace hbase */
+Future<std::vector<Try<std::shared_ptr<Result>>>> RawAsyncTable::Get(
+ const std::vector<hbase::Get>& gets) {
+ return this->Batch(gets);
+}
+
+Future<std::vector<Try<std::shared_ptr<Result>>>> RawAsyncTable::Batch(
+ const std::vector<hbase::Get>& gets) {
+ auto caller = connection_->caller_factory()
+ ->Batch()
+ ->table(table_name_)
+ ->actions(std::make_shared<std::vector<hbase::Get>>(gets))
+ ->rpc_timeout(connection_conf_->read_rpc_timeout())
+ ->operation_timeout(connection_conf_->operation_timeout())
+ ->pause(connection_conf_->pause())
+ ->max_attempts(connection_conf_->max_retries())
+ ->start_log_errors_count(connection_conf_->start_log_errors_count())
+ ->Build();
+
+ return caller->Call().then([caller](auto r) { return r; });
+}
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/raw-async-table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index 978a2b8..e26d46e 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -20,11 +20,11 @@
#include <folly/futures/Future.h>
#include <folly/futures/Unit.h>
-
#include <chrono>
#include <memory>
#include <string>
-
+#include <vector>
+#include "core/async-batch-rpc-retrying-caller.h"
#include "core/async-connection.h"
#include "core/async-rpc-retrying-caller-factory.h"
#include "core/async-rpc-retrying-caller.h"
@@ -34,6 +34,7 @@
#include "core/result.h"
using folly::Future;
+using folly::Try;
using folly::Unit;
using hbase::pb::TableName;
using std::chrono::nanoseconds;
@@ -59,6 +60,9 @@ class RawAsyncTable {
Future<Unit> Put(const hbase::Put& put);
void Close() {}
+ Future<std::vector<Try<std::shared_ptr<Result>>>> Get(const std::vector<hbase::Get>& gets);
+ Future<std::vector<Try<std::shared_ptr<Result>>>> Batch(const std::vector<hbase::Get>& gets);
+
private:
/* Data */
std::shared_ptr<AsyncConnection> connection_;
@@ -78,5 +82,4 @@ class RawAsyncTable {
std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(std::string row,
nanoseconds rpc_timeout);
};
-
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/request-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index 4c12ee7..c90e1ab 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -50,7 +50,6 @@ std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get,
auto pb_msg = std::static_pointer_cast<GetRequest>(pb_req->req_msg());
RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release());
-
return pb_req;
}
@@ -114,12 +113,12 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest(
int action_num = 0;
for (const auto ®ion_action : action_by_region.second->actions()) {
auto pb_action = pb_region_action->add_action();
- auto action = region_action->action();
- if (auto pget = std::dynamic_pointer_cast<Get>(action)) {
- auto pb_get = RequestConverter::ToGet(*pget.get());
- pb_action->set_allocated_get(pb_get.release());
- pb_action->set_index(action_num);
- }
+ auto pget = region_action->action();
+ // We store only hbase::Get in hbase::Action as of now. It will be changed later on.
+ CHECK(pget) << "Unexpected. action can't be null";
+ auto pb_get = RequestConverter::ToGet(*pget);
+ pb_action->set_allocated_get(pb_get.release());
+ pb_action->set_index(action_num);
action_num++;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index b417353..3a7d62b 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -26,6 +26,7 @@
#include <iostream>
#include <thread>
+#include "connection/rpc-client.h"
#include "core/client.h"
#include "core/get.h"
#include "core/put.h"
@@ -75,14 +76,16 @@ int main(int argc, char *argv[]) {
conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads);
auto row = FLAGS_row;
+
auto tn = std::make_shared<TableName>(folly::to<TableName>(FLAGS_table));
auto num_puts = FLAGS_num_rows;
auto client = std::make_unique<Client>(*conf);
auto table = client->Table(*tn);
- // Do the Put requests
auto start_ns = TimeUtil::GetNowNanos();
+
+ // Do the Put requests
for (uint64_t i = 0; i < num_puts; i++) {
table->Put(*MakePut(Row(FLAGS_row, i)));
}
@@ -102,6 +105,23 @@ int main(int argc, char *argv[]) {
LOG(INFO) << "Successfully sent " << num_puts << " Get requests in "
<< TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ // Do the Multi-Gets
+ std::vector<hbase::Get> gets;
+ for (uint64_t i = 0; i < num_puts; ++i) {
+ hbase::Get get(Row(FLAGS_row, i));
+ gets.push_back(get);
+ }
+
+ start_ns = TimeUtil::GetNowNanos();
+ auto results = table->Get(gets);
+
+ if (FLAGS_display_results) {
+ for (const auto &result : results) LOG(INFO) << result->DebugString();
+ }
+
+ LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+
table->Close();
client->Close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 8ace4af..a2f31d9 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -19,7 +19,6 @@
#include "core/table.h"
-#include <folly/futures/Future.h>
#include <chrono>
#include <limits>
#include <utility>
@@ -33,7 +32,6 @@
#include "serde/server-name.h"
#include "utils/time-util.h"
-using folly::Future;
using hbase::pb::TableName;
using hbase::security::User;
using std::chrono::milliseconds;
@@ -69,4 +67,20 @@ std::shared_ptr<RegionLocation> Table::GetRegionLocation(const std::string &row)
return async_connection_->region_locator()->LocateRegion(*table_name_, row).get();
}
+std::vector<std::shared_ptr<hbase::Result>> Table::Get(const std::vector<hbase::Get> &gets) {
+ auto tresults = async_table_->Get(gets).get(operation_timeout());
+ std::vector<std::shared_ptr<hbase::Result>> results{};
+ uint32_t num = 0;
+ for (auto tresult : tresults) {
+ if (tresult.hasValue()) {
+ results.push_back(tresult.value());
+ } else if (tresult.hasException()) {
+ LOG(ERROR) << "Caught exception:- " << tresult.exception().getCopied()->what() << " for "
+ << gets[num++].row();
+ throw tresult.exception().getCopied();
+ }
+ }
+ return results;
+}
+
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index cbb95b7..142baae 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -54,8 +54,7 @@ class Table {
*/
std::shared_ptr<hbase::Result> Get(const hbase::Get &get);
- // TODO: next jira
- // std::vector<std::unique_ptr<hbase::Result>> Get(const std::vector<hbase::Get> &gets);
+ std::vector<std::shared_ptr<hbase::Result>> Get(const std::vector<hbase::Get> &gets);
/**
* @brief - Puts some data in the table.