You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:50 UTC
[hbase] 122/133: HBASE-18507 [C++] Support for MultiPuts in
AsyncBatchRpcRetryingCaller class (Sudeep Sunthankar)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 18eba56d1b37233ea74c4f1a023d2c1410c0db01
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Sep 1 10:55:57 2017 -0700
HBASE-18507 [C++] Support for MultiPuts in AsyncBatchRpcRetryingCaller class (Sudeep Sunthankar)
---
hbase-native-client/core/action.h | 9 +-
.../core/async-batch-rpc-retrying-caller.cc | 128 ++++++----
.../core/async-batch-rpc-retrying-caller.h | 12 +-
.../core/async-batch-rpc-retrying-test.cc | 275 +++++++++++++++------
.../core/async-rpc-retrying-caller-factory.h | 20 +-
hbase-native-client/core/client-test.cc | 148 +++++++++++
hbase-native-client/core/raw-async-table.cc | 31 ++-
hbase-native-client/core/raw-async-table.h | 7 +-
hbase-native-client/core/request-converter.cc | 18 +-
hbase-native-client/core/table.cc | 13 +
hbase-native-client/core/table.h | 6 +-
11 files changed, 506 insertions(+), 161 deletions(-)
diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h
index 21a0181..a00f079 100644
--- a/hbase-native-client/core/action.h
+++ b/hbase-native-client/core/action.h
@@ -20,22 +20,21 @@
#pragma once
#include <memory>
-#include "core/get.h"
+#include "core/row.h"
namespace hbase {
-
class Action {
public:
- Action(std::shared_ptr<hbase::Get> action, int32_t original_index)
+ Action(std::shared_ptr<hbase::Row> action, int32_t original_index)
: action_(action), original_index_(original_index) {}
~Action() {}
int32_t original_index() const { return original_index_; }
- std::shared_ptr<hbase::Get> action() const { return action_; }
+ std::shared_ptr<hbase::Row> action() const { return action_; }
private:
- std::shared_ptr<hbase::Get> action_;
+ std::shared_ptr<hbase::Row> action_;
int32_t original_index_;
int64_t nonce_ = -1;
int32_t replica_id_ = -1;
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
index 0d67b17..dfbf7e7 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
@@ -32,11 +32,12 @@ using std::chrono::milliseconds;
namespace hbase {
-AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller(
+template <typename REQ, typename RESP>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::AsyncBatchRpcRetryingCaller(
std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<TableName> table_name, const std::vector<hbase::Get> &actions,
- nanoseconds pause_ns, int32_t max_attempts, nanoseconds operation_timeout_ns,
- nanoseconds rpc_timeout_ns, int32_t start_log_errors_count)
+ std::shared_ptr<TableName> table_name, const std::vector<REQ> &actions, nanoseconds pause_ns,
+ int32_t max_attempts, nanoseconds operation_timeout_ns, nanoseconds rpc_timeout_ns,
+ int32_t start_log_errors_count)
: conn_(conn),
retry_timer_(retry_timer),
table_name_(table_name),
@@ -56,29 +57,31 @@ AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller(
max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts);
uint32_t index = 0;
for (auto row : actions) {
- actions_.push_back(std::make_shared<Action>(std::make_shared<hbase::Get>(row), index));
- Promise<std::shared_ptr<Result>> prom{};
- action2promises_.insert(
- std::pair<uint64_t, Promise<std::shared_ptr<Result>>>(index, std::move(prom)));
+ actions_.push_back(std::make_shared<Action>(row, index));
+ Promise<RESP> prom{};
+ action2promises_.insert(std::pair<uint64_t, Promise<RESP>>(index, std::move(prom)));
action2futures_.push_back(action2promises_[index++].getFuture());
}
}
-AsyncBatchRpcRetryingCaller::~AsyncBatchRpcRetryingCaller() {}
+template <typename REQ, typename RESP>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::~AsyncBatchRpcRetryingCaller() {}
-Future<std::vector<Try<std::shared_ptr<Result>>>> AsyncBatchRpcRetryingCaller::Call() {
+template <typename REQ, typename RESP>
+Future<std::vector<Try<RESP>>> AsyncBatchRpcRetryingCaller<REQ, RESP>::Call() {
GroupAndSend(actions_, 1);
return collectAll(action2futures_);
}
-int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() {
+template <typename REQ, typename RESP>
+int64_t AsyncBatchRpcRetryingCaller<REQ, RESP>::RemainingTimeNs() {
return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_);
}
-void AsyncBatchRpcRetryingCaller::LogException(int32_t tries,
- std::shared_ptr<RegionRequest> region_request,
- const folly::exception_wrapper &ew,
- std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
+ int32_t tries, std::shared_ptr<RegionRequest> region_request,
+ const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
if (tries > start_log_errors_count_) {
std::string regions;
regions += region_request->region_location()->region_name() + ", ";
@@ -88,7 +91,8 @@ void AsyncBatchRpcRetryingCaller::LogException(int32_t tries,
}
}
-void AsyncBatchRpcRetryingCaller::LogException(
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
if (tries > start_log_errors_count_) {
@@ -102,29 +106,35 @@ void AsyncBatchRpcRetryingCaller::LogException(
}
}
-const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError(
+template <typename REQ, typename RESP>
+const std::string AsyncBatchRpcRetryingCaller<REQ, RESP>::GetExtraContextForError(
std::shared_ptr<ServerName> server_name) {
return server_name ? server_name->ShortDebugString() : "";
}
-void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr<Action> &action,
- const folly::exception_wrapper &ew,
- std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(const std::shared_ptr<Action> &action,
+ const folly::exception_wrapper &ew,
+ std::shared_ptr<ServerName> server_name) {
ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
AddAction2Error(action->original_index(), twec);
}
-void AsyncBatchRpcRetryingCaller::AddError(const std::vector<std::shared_ptr<Action>> &actions,
- const folly::exception_wrapper &ew,
- std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(
+ const std::vector<std::shared_ptr<Action>> &actions, const folly::exception_wrapper &ew,
+ std::shared_ptr<ServerName> server_name) {
for (const auto action : actions) {
AddError(action, ew, server_name);
}
}
-void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, int32_t tries,
- const folly::exception_wrapper &ew, int64_t current_time,
- const std::string extras) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailOne(const std::shared_ptr<Action> &action,
+ int32_t tries,
+ const folly::exception_wrapper &ew,
+ int64_t current_time,
+ const std::string extras) {
auto action_index = action->original_index();
auto itr = action2promises_.find(action_index);
if (itr != action2promises_.end()) {
@@ -138,16 +148,18 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action,
RetriesExhaustedException(tries - 1, action2errors_[action_index]));
}
-void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
- int32_t tries, const folly::exception_wrapper &ew,
- std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
+ const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
+ const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
for (const auto action : actions) {
FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
}
}
-void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
- int32_t tries) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
+ const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
for (const auto action : actions) {
auto action_index = action->original_index();
auto itr = action2promises_.find(action_index);
@@ -159,8 +171,9 @@ void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Acti
}
}
-void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index,
- const ThrowableWithExtraContext &twec) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddAction2Error(
+ uint64_t action_index, const ThrowableWithExtraContext &twec) {
auto erritr = action2errors_.find(action_index);
if (erritr != action2errors_.end()) {
erritr->second->push_back(twec);
@@ -171,9 +184,11 @@ void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index,
return;
}
-void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries,
- const folly::exception_wrapper &ew,
- std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnError(const ActionsByRegion &actions_by_region,
+ int32_t tries,
+ const folly::exception_wrapper &ew,
+ std::shared_ptr<ServerName> server_name) {
std::vector<std::shared_ptr<Action>> copied_actions;
std::vector<std::shared_ptr<RegionRequest>> region_requests;
for (const auto &action_by_region : actions_by_region) {
@@ -192,8 +207,9 @@ void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_regi
TryResubmit(copied_actions, tries);
}
-void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr<Action>> &actions,
- int32_t tries) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::TryResubmit(
+ const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
int64_t delay_ns;
if (operation_timeout_ns_.count() > 0) {
int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
@@ -213,9 +229,10 @@ void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr<
});
}
+template <typename REQ, typename RESP>
Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
-AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector<std::shared_ptr<Action>> &actions,
- int64_t locate_timeout_ns) {
+AsyncBatchRpcRetryingCaller<REQ, RESP>::GetRegionLocations(
+ const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns) {
auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{};
for (auto const &action : actions) {
locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(),
@@ -225,8 +242,9 @@ AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector<std::shared_pt
return collectAll(locs);
}
-void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions,
- int32_t tries) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend(
+ const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
int64_t locate_timeout_ns;
if (operation_timeout_ns_.count() > 0) {
locate_timeout_ns = RemainingTimeNs();
@@ -300,8 +318,9 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
return;
}
-Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller::GetMultiResponse(
- const ActionsByServer &actions_by_server) {
+template <typename REQ, typename RESP>
+Future<std::vector<Try<std::unique_ptr<Response>>>>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::GetMultiResponse(const ActionsByServer &actions_by_server) {
auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
auto user = User::defaultUser();
for (const auto &action_by_server : actions_by_server) {
@@ -315,7 +334,9 @@ Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller:
return collectAll(multi_calls);
}
-void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server, int32_t tries) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server,
+ int32_t tries) {
int64_t remaining_ns;
if (operation_timeout_ns_.count() > 0) {
remaining_ns = RemainingTimeNs();
@@ -371,7 +392,8 @@ void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server,
return;
}
-void AsyncBatchRpcRetryingCaller::OnComplete(
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
const ActionsByRegion &actions_by_region, int32_t tries,
const std::shared_ptr<ServerName> server_name,
const std::unique_ptr<hbase::MultiResponse> multi_response) {
@@ -418,12 +440,12 @@ void AsyncBatchRpcRetryingCaller::OnComplete(
return;
}
-void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &action,
- const std::shared_ptr<RegionRequest> ®ion_request,
- int32_t tries,
- const std::shared_ptr<ServerName> &server_name,
- const std::shared_ptr<RegionResult> ®ion_result,
- std::vector<std::shared_ptr<Action>> &failed_actions) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
+ const std::shared_ptr<Action> &action, const std::shared_ptr<RegionRequest> ®ion_request,
+ int32_t tries, const std::shared_ptr<ServerName> &server_name,
+ const std::shared_ptr<RegionResult> ®ion_result,
+ std::vector<std::shared_ptr<Action>> &failed_actions) {
std::string err_msg;
try {
auto result_or_exc = region_result->ResultOrException(action->original_index());
@@ -461,4 +483,6 @@ void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &acti
return;
}
+template class AsyncBatchRpcRetryingCaller<std::shared_ptr<hbase::Row>,
+ std::shared_ptr<hbase::Result>>;
} /* namespace hbase */
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
index 194c439..9194b04 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
@@ -84,6 +84,7 @@ struct ServerNameHash {
}
};
+template <typename REQ, typename RESP>
class AsyncBatchRpcRetryingCaller {
public:
using ActionsByServer =
@@ -94,15 +95,14 @@ class AsyncBatchRpcRetryingCaller {
AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
std::shared_ptr<folly::HHWheelTimer> retry_timer,
std::shared_ptr<pb::TableName> table_name,
- const std::vector<hbase::Get> &actions,
- std::chrono::nanoseconds pause_ns, int32_t max_attempts,
- std::chrono::nanoseconds operation_timeout_ns,
+ const std::vector<REQ> &actions, std::chrono::nanoseconds pause_ns,
+ int32_t max_attempts, std::chrono::nanoseconds operation_timeout_ns,
std::chrono::nanoseconds rpc_timeout_ns,
int32_t start_log_errors_count);
~AsyncBatchRpcRetryingCaller();
- folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call();
+ folly::Future<std::vector<folly::Try<RESP>>> Call();
private:
int64_t RemainingTimeNs();
@@ -172,8 +172,8 @@ class AsyncBatchRpcRetryingCaller {
int64_t start_ns_ = TimeUtil::GetNowNanos();
int32_t tries_ = 1;
- std::map<uint64_t, folly::Promise<std::shared_ptr<Result>>> action2promises_;
- std::vector<folly::Future<std::shared_ptr<Result>>> action2futures_;
+ std::map<uint64_t, folly::Promise<RESP>> action2promises_;
+ std::vector<folly::Future<RESP>> action2futures_;
std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_;
std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr;
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
index cad03e1..b8a0b81 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
@@ -230,7 +230,12 @@ class MockAsyncConnection : public AsyncConnection,
return retry_executor_;
}
- void Close() override {}
+ void Close() override {
+ retry_timer_->destroy();
+ retry_executor_->stop();
+ io_executor_->stop();
+ cpu_executor_->stop();
+ }
std::shared_ptr<HBaseRpcController> CreateRpcController() override {
return std::make_shared<HBaseRpcController>();
}
@@ -254,15 +259,15 @@ class MockRawAsyncTableImpl {
virtual ~MockRawAsyncTableImpl() = default;
/* implement this in real RawAsyncTableImpl. */
- folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Gets(
- const std::vector<hbase::Get> &gets) {
+ template <typename REQ, typename RESP>
+ folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) {
/* init request caller builder */
- auto builder = conn_->caller_factory()->Batch();
+ auto builder = conn_->caller_factory()->Batch<REQ, RESP>();
/* call with retry to get result */
auto async_caller =
builder->table(tn_)
- ->actions(std::make_shared<std::vector<hbase::Get>>(gets))
+ ->actions(std::make_shared<std::vector<REQ>>(rows))
->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
->operation_timeout(conn_->connection_conf()->operation_timeout())
->pause(conn_->connection_conf()->pause())
@@ -278,9 +283,7 @@ class MockRawAsyncTableImpl {
std::shared_ptr<hbase::pb::TableName> tn_;
};
-void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
- const std::string &table_name, bool split_regions, uint32_t tries = 3,
- uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
+std::string createTestTable(bool split_regions, const std::string &table_name) {
std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400",
"test500", "test600", "test700", "test800", "test900"};
std::string tableName = (split_regions) ? ("split-" + table_name) : table_name;
@@ -289,30 +292,12 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
} else {
AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d");
}
+ return tableName;
+}
- // Create TableName and Row to be fetched from HBase
- auto tn = folly::to<hbase::pb::TableName>(tableName);
-
- // Create a client
- Client client(*AsyncBatchRpcRetryTest::test_util->conf());
-
- // Get connection to HBase Table
- auto table = client.Table(tn);
-
- for (uint64_t i = 0; i < num_rows; i++) {
- table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
- "value" + std::to_string(i)));
- }
-
- std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
- std::vector<hbase::Get> gets;
- for (uint64_t i = 0; i < num_rows; ++i) {
- auto row = "test" + std::to_string(i);
- hbase::Get get(row);
- gets.push_back(get);
- region_locations[row] = table->GetRegionLocation(row);
- }
-
+std::shared_ptr<MockAsyncConnection> getAsyncConnection(
+ Client &client, uint32_t operation_timeout_millis, uint32_t tries,
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator) {
/* init region location and rpc channel */
auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
auto io_executor_ = client.async_connection()->io_executor();
@@ -332,35 +317,90 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
tries, // max retries
1); // start log errors count
- /* set region locator */
- region_locator->set_region_location(region_locations);
-
- /* init hbase client connection */
- auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
- io_executor_, retry_executor_, rpc_client,
- region_locator);
- conn->Init();
-
- /* init retry caller factory */
- auto tableImpl =
- std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+ return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+ io_executor_, retry_executor_, rpc_client,
+ region_locator);
+}
- auto tresults = tableImpl->Gets(gets).get(milliseconds(operation_timeout_millis));
+template <typename ACTION>
+std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) {
+ std::vector<std::shared_ptr<hbase::Row>> rows;
+ for (auto action : actions) {
+ std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action);
+ rows.push_back(srow);
+ }
+ return rows;
+}
- ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+template <typename REQ, typename RESP>
+std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions,
+ std::vector<folly::Try<RESP>> &tresults) {
std::vector<std::shared_ptr<hbase::Result>> results{};
- uint32_t num = 0;
+ uint64_t num = 0;
for (auto tresult : tresults) {
if (tresult.hasValue()) {
results.push_back(tresult.value());
} else if (tresult.hasException()) {
folly::exception_wrapper ew = tresult.exception();
- LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " << gets[num].row();
+ LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for "
+ << actions[num].row();
throw ew;
}
++num;
}
+ return results;
+}
+
+template <typename ACTION>
+std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions(
+ uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) {
+ std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
+ for (uint64_t i = 0; i < num_rows; ++i) {
+ auto row = "test" + std::to_string(i);
+ ACTION action(row);
+ actions.push_back(action);
+ region_locations[row] = table->GetRegionLocation(row);
+ }
+ return region_locations;
+}
+
+void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+ const std::string &table_name, bool split_regions, uint32_t tries = 3,
+ uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) {
+ auto tableName = createTestTable(split_regions, table_name);
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(tableName);
+
+ // Create a client
+ Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+ // Get connection to HBase Table
+ std::shared_ptr<Table> table = client.Table(tn);
+
+ for (uint64_t i = 0; i < num_rows; i++) {
+ table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+ "value" + std::to_string(i)));
+ }
+ std::vector<hbase::Get> gets;
+ auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table);
+
+ /* set region locator */
+ region_locator->set_region_location(region_locations);
+ /* init hbase client connection */
+ auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
+ conn->Init();
+
+ /* init retry caller factory */
+ auto tableImpl =
+ std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+ std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets);
+ auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
+ milliseconds(operation_timeout_millis));
+ ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+
+ auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults);
// Test the values, should be same as in put executed on hbase shell
ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
uint32_t i = 0;
@@ -371,101 +411,184 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
}
- retry_timer->destroy();
table->Close();
client.Close();
- retry_executor_->stop();
- io_executor_->stop();
- cpu_executor_->stop();
+ conn->Close();
+}
+
+void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+ const std::string &table_name, bool split_regions, uint32_t tries = 3,
+ uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
+ auto tableName = createTestTable(split_regions, table_name);
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(tableName);
+
+ // Create a client
+ Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+ // Get connection to HBase Table
+ std::shared_ptr<Table> table = client.Table(tn);
+
+ std::vector<hbase::Put> puts;
+ auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table);
+
+ /* set region locator */
+ region_locator->set_region_location(region_locations);
+
+ /* init hbase client connection */
+ auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
+ conn->Init();
+
+ /* init retry caller factory */
+ auto tableImpl =
+ std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+ std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts);
+ auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
+ milliseconds(operation_timeout_millis));
+ ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+
+ auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults);
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
+
+ table->Close();
+ client.Close();
+ conn->Close();
}
// Test successful case
TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockAsyncRegionLocator>());
- runMultiTest(region_locator, "table1", false);
+ runMultiGets(region_locator, "table1", false);
}
// Tests the RPC failing 3 times, then succeeding
TEST_F(AsyncBatchRpcRetryTest, HandleException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
- runMultiTest(region_locator, "table2", false, 5);
+ runMultiGets(region_locator, "table2", false, 5);
}
// Tests the RPC failing 4 times, throwing an exception
TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", false));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false));
}
// Tests the region location lookup failing 3 times, then succeeding
TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(3));
- runMultiTest(region_locator, "table4", false);
+ runMultiGets(region_locator, "table4", false);
}
// Tests the region location lookup failing 5 times, throwing an exception
TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", false, 3));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3));
}
// Tests hitting operation timeout, thus not retrying anymore
TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(6));
- EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", false, 5, 100, 1000));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000));
}
-/*
- TODO: Below tests are failing with frequently with segfaults coming from
- JNI internals indicating that we are doing something wrong in the JNI boundary.
- However, we were not able to debug furhter yet. Disable the tests for now, and
- come back later to fix the issue.
-
+//////////////////////
// Test successful case
-TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, MultiPuts) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockAsyncRegionLocator>());
- runMultiTest(region_locator, "table7", true);
+ runMultiPuts(region_locator, "table1", false);
}
// Tests the RPC failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
- runMultiTest(region_locator, "table8", true, 5);
+ runMultiPuts(region_locator, "table2", false, 5);
}
// Tests the RPC failing 4 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiTest(region_locator, "table9", true));
+ EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false));
}
// Tests the region location lookup failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(3));
- runMultiTest(region_locator, "table10", true);
+ runMultiPuts(region_locator, "table4", false);
}
// Tests the region location lookup failing 5 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(4));
- EXPECT_ANY_THROW(runMultiTest(region_locator, "table11", true, 3));
+ EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3));
}
// Tests hitting operation timeout, thus not retrying anymore
-TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(6));
- EXPECT_ANY_THROW(runMultiTest(region_locator, "table12", true, 5, 100, 1000));
+ EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000));
}
-*/
+
+//////////////////////
+/*
+ TODO: Below tests are failing with frequently with segfaults coming from
+ JNI internals indicating that we are doing something wrong in the JNI boundary.
+ However, we were not able to debug furhter yet. Disable the tests for now, and
+ come back later to fix the issue.
+
+ // Test successful case
+ TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runMultiGets(region_locator, "table7", true);
+ }
+
+ // Tests the RPC failing 3 times, then succeeding
+ TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table8", true, 5);
+ }
+
+ // Tests the RPC failing 4 times, throwing an exception
+ TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true));
+ }
+
+ // Tests the region location lookup failing 3 times, then succeeding
+ TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table10", true);
+ }
+
+ // Tests the region location lookup failing 5 times, throwing an exception
+ TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3));
+ }
+
+ // Tests hitting operation timeout, thus not retrying anymore
+ TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(6));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000));
+ }
+ */
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 1af6e72..188f469 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -135,7 +135,8 @@ class SingleRequestCallerBuilder
Callable<RESP> callable_;
}; // end of SingleRequestCallerBuilder
-class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder> {
+template <typename REQ, typename RESP>
+class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder<REQ, RESP>> {
public:
explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn,
std::shared_ptr<folly::HHWheelTimer> retry_timer)
@@ -143,14 +144,14 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde
virtual ~BatchCallerBuilder() = default;
- typedef std::shared_ptr<BatchCallerBuilder> SharedThisPtr;
+ typedef std::shared_ptr<BatchCallerBuilder<REQ, RESP>> SharedThisPtr;
SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
table_name_ = table_name;
return shared_this();
}
- SharedThisPtr actions(std::shared_ptr<std::vector<hbase::Get>> actions) {
+ SharedThisPtr actions(std::shared_ptr<std::vector<REQ>> actions) {
actions_ = actions;
return shared_this();
}
@@ -180,10 +181,10 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde
return shared_this();
}
- folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call() { return Build()->Call(); }
+ folly::Future<std::vector<folly::Try<RESP>>> Call() { return Build()->Call(); }
- std::shared_ptr<AsyncBatchRpcRetryingCaller> Build() {
- return std::make_shared<AsyncBatchRpcRetryingCaller>(
+ std::shared_ptr<AsyncBatchRpcRetryingCaller<REQ, RESP>> Build() {
+ return std::make_shared<AsyncBatchRpcRetryingCaller<REQ, RESP>>(
conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_,
operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
}
@@ -197,7 +198,7 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde
std::shared_ptr<AsyncConnection> conn_;
std::shared_ptr<folly::HHWheelTimer> retry_timer_;
std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr;
- std::shared_ptr<std::vector<hbase::Get>> actions_ = nullptr;
+ std::shared_ptr<std::vector<REQ>> actions_ = nullptr;
std::chrono::nanoseconds pause_ns_;
int32_t max_attempts_ = 0;
std::chrono::nanoseconds operation_timeout_nanos_;
@@ -329,8 +330,9 @@ class AsyncRpcRetryingCallerFactory {
return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_);
}
- std::shared_ptr<BatchCallerBuilder> Batch() {
- return std::make_shared<BatchCallerBuilder>(conn_, retry_timer_);
+ template <typename REQ, typename RESP>
+ std::shared_ptr<BatchCallerBuilder<REQ, RESP>> Batch() {
+ return std::make_shared<BatchCallerBuilder<REQ, RESP>>(conn_, retry_timer_);
}
std::shared_ptr<ScanCallerBuilder> Scan() {
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index 1c9b709..3f72880 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -547,3 +547,151 @@ TEST_F(ClientTest, MultiGetsWithRegionSplits) {
table->Close();
client.Close();
}
+
+void PerformMultiPuts(uint64_t num_rows, std::shared_ptr<hbase::Client> client,
+ const std::string &table_name) {
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+ auto table = client->Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+ std::vector<hbase::Put> puts;
+ // Perform Puts
+ for (uint64_t i = 0; i < num_rows; i++) {
+ puts.push_back(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+ "value" + std::to_string(i)));
+ }
+ table->Put(puts);
+}
+
+void PerformMultiPuts(std::vector<hbase::Put> &puts, std::shared_ptr<Table> table) {
+ table->Put(puts);
+}
+
+TEST_F(ClientTest, MultiGetsWithMultiPuts) {
+ std::string table_name = "t";
+ // Using TestUtil to populate test data
+ ClientTest::test_util->CreateTable(table_name, "d");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+ SetClientParams();
+ // Create a client
+ hbase::Client client(*ClientTest::test_util->conf());
+
+ uint64_t num_rows = 50000;
+ PerformMultiPuts(num_rows, std::make_shared<hbase::Client>(client), table_name);
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ std::vector<hbase::Get> gets;
+ MakeGets(num_rows, "test", gets);
+
+ auto results = table->Get(gets);
+
+ TestMultiResults(num_rows, results, gets);
+
+ table->Close();
+ client.Close();
+}
+
+TEST_F(ClientTest, MultiGetsWithMultiPutsAndSplitRegions) {
+ // Using TestUtil to populate test data
+ std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400",
+ "test500", "test600", "test700", "test800", "test900"};
+ std::string table_name = "t";
+ ClientTest::test_util->CreateTable(table_name, "d", keys);
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+ SetClientParams();
+
+ // Create a client
+ hbase::Client client(*ClientTest::test_util->conf());
+
+ uint64_t num_rows = 50000;
+ PerformMultiPuts(num_rows, std::make_shared<hbase::Client>(client), table_name);
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ std::vector<hbase::Get> gets;
+ MakeGets(num_rows, "test", gets);
+
+ auto results = table->Get(gets);
+
+ TestMultiResults(num_rows, results, gets);
+
+ table->Close();
+ client.Close();
+}
+
+TEST_F(ClientTest, MultiPuts) {
+ std::string table_name = "t";
+ // Using TestUtil to populate test data
+ ClientTest::test_util->CreateTable(table_name, "d");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+ SetClientParams();
+ // Create a client
+ hbase::Client client(*ClientTest::test_util->conf());
+ std::shared_ptr<Table> table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ uint64_t num_rows = 80000;
+ uint64_t batch_num_rows = 10000;
+ std::vector<hbase::Put> puts;
+ for (uint64_t i = 0; i < num_rows;) {
+ puts.clear();
+ // accumulate batch_num_rows at a time
+ for (uint64_t j = 0; j < batch_num_rows && i < num_rows; ++j) {
+ hbase::Put put("test" + std::to_string(i));
+ put.AddColumn("d", std::to_string(i), "value" + std::to_string(i));
+ puts.push_back(put);
+ i++;
+ }
+ PerformMultiPuts(puts, table);
+ }
+ table->Close();
+ client.Close();
+}
+
+TEST_F(ClientTest, MultiPutsWithRegionSplits) {
+ // Using TestUtil to populate test data
+ std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400",
+ "test500", "test600", "test700", "test800", "test900"};
+ std::string table_name = "t";
+ ClientTest::test_util->CreateTable(table_name, "d", keys);
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+ SetClientParams();
+
+ // Create a client
+ hbase::Client client(*ClientTest::test_util->conf());
+ std::shared_ptr<Table> table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ uint64_t num_rows = 80000;
+ uint64_t batch_num_rows = 10000;
+ std::vector<hbase::Put> puts;
+ for (uint64_t i = 0; i < num_rows;) {
+ puts.clear();
+ // accumulate batch_num_rows at a time
+ for (uint64_t j = 0; j < batch_num_rows && i < num_rows; ++j) {
+ hbase::Put put("test" + std::to_string(i));
+ put.AddColumn("d", std::to_string(i), "value" + std::to_string(i));
+ puts.push_back(put);
+ i++;
+ }
+ PerformMultiPuts(puts, table);
+ }
+ table->Close();
+ client.Close();
+}
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index 53ab526..409883f 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -197,18 +197,26 @@ folly::Future<std::shared_ptr<Result>> RawAsyncTable::Append(const hbase::Append
return caller->Call().then([caller](const auto r) { return r; });
}
+
folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Get(
const std::vector<hbase::Get>& gets) {
- return this->Batch(gets);
+ std::vector<std::shared_ptr<hbase::Row>> rows;
+ for (auto get : gets) {
+ std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Get>(get);
+ rows.push_back(srow);
+ }
+ return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(
+ rows, connection_conf_->read_rpc_timeout());
}
-folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Batch(
- const std::vector<hbase::Get>& gets) {
+template <typename REQ, typename RESP>
+folly::Future<std::vector<folly::Try<RESP>>> RawAsyncTable::Batch(
+ const std::vector<REQ>& rows, std::chrono::nanoseconds timeout) {
auto caller = connection_->caller_factory()
- ->Batch()
+ ->Batch<REQ, RESP>()
->table(table_name_)
- ->actions(std::make_shared<std::vector<hbase::Get>>(gets))
- ->rpc_timeout(connection_conf_->read_rpc_timeout())
+ ->actions(std::make_shared<std::vector<REQ>>(rows))
+ ->rpc_timeout(timeout)
->operation_timeout(connection_conf_->operation_timeout())
->pause(connection_conf_->pause())
->max_attempts(connection_conf_->max_retries())
@@ -237,4 +245,15 @@ std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Sc
}
return new_scan;
}
+
+folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Put(
+ const std::vector<hbase::Put>& puts) {
+ std::vector<std::shared_ptr<hbase::Row>> rows;
+ for (auto put : puts) {
+ std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Put>(put);
+ rows.push_back(srow);
+ }
+ return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(
+ rows, connection_conf_->write_rpc_timeout());
+}
} // namespace hbase
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index e651f8a..97eef7f 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -83,8 +83,11 @@ class RawAsyncTable {
folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get(
const std::vector<hbase::Get>& gets);
- folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Batch(
- const std::vector<hbase::Get>& gets);
+ template <typename REQ, typename RESP>
+ folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ>& rows,
+ std::chrono::nanoseconds timeout);
+ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Put(
+ const std::vector<hbase::Put>& puts);
private:
/* Data */
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index 47c09d1..f48f228 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -173,14 +173,23 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest(
auto pb_action = pb_region_action->add_action();
auto pget = region_action->action();
// We store only hbase::Get in hbase::Action as of now. It will be changed later on.
- CHECK(pget) << "Unexpected. action can't be null";
- auto pb_get = RequestConverter::ToGet(*pget);
- pb_action->set_allocated_get(pb_get.release());
+ CHECK(pget) << "Unexpected. action can't be null.";
+ std::string error_msg("");
+ if (typeid(*pget) == typeid(hbase::Get)) {
+ auto getp = dynamic_cast<hbase::Get *>(pget.get());
+ pb_action->set_allocated_get(RequestConverter::ToGet(*getp).release());
+ } else if (typeid(*pget) == typeid(hbase::Put)) {
+ auto putp = dynamic_cast<hbase::Put *>(pget.get());
+ pb_action->set_allocated_mutation(
+ RequestConverter::ToMutation(MutationType::MutationProto_MutationType_PUT, *putp, -1)
+ .release());
+ } else {
+ throw std::runtime_error("Unexpected action type encountered.");
+ }
pb_action->set_index(action_num);
action_num++;
}
}
-
return pb_req;
}
@@ -355,4 +364,5 @@ std::unique_ptr<Request> RequestConverter::AppendToMutateRequest(const Append &a
VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
return pb_req;
}
+
} /* namespace hbase */
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 3b7a87b..f93a029 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -128,4 +128,17 @@ std::vector<std::shared_ptr<hbase::Result>> Table::Get(const std::vector<hbase::
return results;
}
+void Table::Put(const std::vector<hbase::Put> &puts) {
+ auto tresults = async_table_->Put(puts).get(operation_timeout());
+ uint32_t num = 0;
+ for (auto tresult : tresults) {
+ if (tresult.hasException()) {
+ LOG(ERROR) << "Caught exception:- " << tresult.exception().what() << " for "
+ << puts[num++].row();
+ throw tresult.exception();
+ }
+ }
+ return;
+}
+
} /* namespace hbase */
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index cc37182..6340494 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -119,11 +119,15 @@ class Table {
* @param - append Append object to perform HBase Append operation.
*/
std::shared_ptr<hbase::Result> Append(const hbase::Append &append);
- // TODO: Batch Puts
std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan);
/**
+ * @brief - Multi Puts.
+ * @param - puts vector of hbase::Put.
+ */
+ void Put(const std::vector<hbase::Put> &puts);
+ /**
* @brief - Close the client connection.
*/
void Close();