You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/09/15 21:20:26 UTC
[09/25] hbase git commit: HBASE-18725 [C++] Install header files as
well as library
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-test.cc b/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-test.cc
new file mode 100644
index 0000000..b0f4afb
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-test.cc
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Memory.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
+#include <gtest/gtest.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <chrono>
+#include <functional>
+#include <string>
+
+#include "hbase/connection/rpc-client.h"
+#include "hbase/client/async-batch-rpc-retrying-caller.h"
+#include "hbase/client/async-connection.h"
+#include "hbase/client/async-rpc-retrying-caller-factory.h"
+#include "hbase/client/client.h"
+#include "hbase/client/connection-configuration.h"
+#include "hbase/client/keyvalue-codec.h"
+#include "hbase/client/region-location.h"
+#include "hbase/client/result.h"
+#include "hbase/exceptions/exception.h"
+#include "hbase/test-util/test-util.h"
+#include "hbase/utils/time-util.h"
+
+using hbase::AsyncRpcRetryingCallerFactory;
+using hbase::AsyncConnection;
+using hbase::AsyncRegionLocator;
+using hbase::ConnectionConfiguration;
+using hbase::Configuration;
+using hbase::HBaseRpcController;
+using hbase::RegionLocation;
+using hbase::RegionLocateType;
+using hbase::RpcClient;
+using hbase::RequestConverter;
+using hbase::ResponseConverter;
+using hbase::Put;
+using hbase::TimeUtil;
+using hbase::Client;
+using hbase::security::User;
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+using namespace hbase;
+
+using folly::exception_wrapper;
+
+class AsyncBatchRpcRetryTest : public ::testing::Test {
+ public:
+ static std::unique_ptr<hbase::TestUtil> test_util;
+ static std::string tableName;
+
+ static void SetUpTestCase() {
+ google::InstallFailureSignalHandler();
+ test_util = std::make_unique<hbase::TestUtil>();
+ test_util->StartMiniCluster(2);
+ std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400",
+ "test500", "test600", "test700", "test800", "test900"};
+ tableName = "split-table1";
+ test_util->CreateTable(tableName, "d", keys);
+ }
+};
+std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr;
+std::string AsyncBatchRpcRetryTest::tableName;
+
+class AsyncRegionLocatorBase : public AsyncRegionLocator {
+ public:
+ AsyncRegionLocatorBase() {}
+ explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
+ : region_location_(region_location) {}
+ virtual ~AsyncRegionLocatorBase() = default;
+
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
+ const std::string &row,
+ const RegionLocateType,
+ const int64_t) override {
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ promise.setValue(region_locations_.at(row));
+ return promise.getFuture();
+ }
+
+ virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
+ region_location_ = region_location;
+ }
+
+ virtual void set_region_location(
+ const std::map<std::string, std::shared_ptr<RegionLocation>> ®_locs) {
+ for (auto reg_loc : reg_locs) {
+ region_locations_[reg_loc.first] = reg_loc.second;
+ }
+ }
+
+ void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override {
+ }
+
+ protected:
+ std::shared_ptr<RegionLocation> region_location_;
+ std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_;
+ std::map<std::string, uint32_t> mtries_;
+ std::map<std::string, uint32_t> mnum_fails_;
+
+ void InitRetryMaps(uint32_t num_fails) {
+ if (mtries_.size() == 0 && mnum_fails_.size() == 0) {
+ for (auto reg_loc : region_locations_) {
+ mtries_[reg_loc.first] = 0;
+ mnum_fails_[reg_loc.first] = num_fails;
+ }
+ }
+ }
+};
+
+class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
+ public:
+ MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
+ explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockAsyncRegionLocator() {}
+};
+
+class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+ uint32_t counter_ = 0;
+ uint32_t num_fails_ = 0;
+ uint32_t tries_ = 0;
+
+ public:
+ explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
+ : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+ explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockWrongRegionAsyncRegionLocator() {}
+
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+ const hbase::pb::TableName &tn, const std::string &row,
+ const RegionLocateType locate_type = RegionLocateType::kCurrent,
+ const int64_t locate_ns = 0) override {
+ InitRetryMaps(num_fails_);
+ auto &tries = mtries_[row];
+ auto &num_fails = mnum_fails_[row];
+ if (++tries > num_fails) {
+ return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+ }
+
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ /* set random region name, simulating invalid region */
+ auto result = std::make_shared<RegionLocation>("whatever-region-name",
+ region_locations_.at(row)->region_info(),
+ region_locations_.at(row)->server_name());
+ promise.setValue(result);
+ return promise.getFuture();
+ }
+};
+
+class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+ uint32_t tries_ = 0;
+ uint32_t num_fails_ = 0;
+ uint32_t counter_ = 0;
+
+ public:
+ explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
+ : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+ explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockFailingAsyncRegionLocator() {}
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+ const hbase::pb::TableName &tn, const std::string &row,
+ const RegionLocateType locate_type = RegionLocateType::kCurrent,
+ const int64_t locate_ns = 0) override {
+ InitRetryMaps(num_fails_);
+ auto &tries = mtries_[row];
+ auto &num_fails = mnum_fails_[row];
+ if (++tries > num_fails) {
+ return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+ }
+
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ promise.setException(std::runtime_error{"Failed to look up region location"});
+ return promise.getFuture();
+ }
+};
+
+class MockAsyncConnection : public AsyncConnection,
+ public std::enable_shared_from_this<MockAsyncConnection> {
+ public:
+ MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
+ std::shared_ptr<RpcClient> rpc_client,
+ std::shared_ptr<AsyncRegionLocator> region_locator)
+ : conn_conf_(conn_conf),
+ retry_timer_(retry_timer),
+ cpu_executor_(cpu_executor),
+ io_executor_(io_executor),
+ retry_executor_(retry_executor),
+ rpc_client_(rpc_client),
+ region_locator_(region_locator) {}
+ ~MockAsyncConnection() {}
+ void Init() {
+ caller_factory_ =
+ std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
+ }
+
+ std::shared_ptr<Configuration> conf() override { return nullptr; }
+ std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
+ std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
+ return caller_factory_;
+ }
+ std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
+ std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+ return retry_executor_;
+ }
+
+ void Close() override {
+ retry_timer_->destroy();
+ retry_executor_->stop();
+ io_executor_->stop();
+ cpu_executor_->stop();
+ }
+ std::shared_ptr<HBaseRpcController> CreateRpcController() override {
+ return std::make_shared<HBaseRpcController>();
+ }
+
+ private:
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+ std::shared_ptr<ConnectionConfiguration> conn_conf_;
+ std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+ std::shared_ptr<RpcClient> rpc_client_;
+ std::shared_ptr<AsyncRegionLocator> region_locator_;
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
+};
+
+class MockRawAsyncTableImpl {
+ public:
+ explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn,
+ std::shared_ptr<hbase::pb::TableName> tn)
+ : conn_(conn), tn_(tn) {}
+ virtual ~MockRawAsyncTableImpl() = default;
+
+ /* implement this in real RawAsyncTableImpl. */
+ template <typename REQ, typename RESP>
+ folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) {
+ /* init request caller builder */
+ auto builder = conn_->caller_factory()->Batch<REQ, RESP>();
+
+ /* call with retry to get result */
+ auto async_caller =
+ builder->table(tn_)
+ ->actions(std::make_shared<std::vector<REQ>>(rows))
+ ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
+ ->operation_timeout(conn_->connection_conf()->operation_timeout())
+ ->pause(conn_->connection_conf()->pause())
+ ->max_attempts(conn_->connection_conf()->max_retries())
+ ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count())
+ ->Build();
+
+ return async_caller->Call().then([async_caller](auto r) { return r; });
+ }
+
+ private:
+ std::shared_ptr<MockAsyncConnection> conn_;
+ std::shared_ptr<hbase::pb::TableName> tn_;
+};
+
+std::shared_ptr<MockAsyncConnection> getAsyncConnection(
+ Client &client, uint32_t operation_timeout_millis, uint32_t tries,
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator) {
+ /* init region location and rpc channel */
+ auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+ auto io_executor_ = client.async_connection()->io_executor();
+ auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ auto codec = std::make_shared<hbase::KeyValueCodec>();
+ auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
+ AsyncBatchRpcRetryTest::test_util->conf());
+ std::shared_ptr<folly::HHWheelTimer> retry_timer =
+ folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
+
+ /* init connection configuration */
+ auto connection_conf = std::make_shared<ConnectionConfiguration>(
+ TimeUtil::SecondsToNanos(20), // connect_timeout
+ TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout
+ TimeUtil::SecondsToNanos(60), // rpc_timeout
+ TimeUtil::MillisToNanos(100), // pause
+ tries, // max retries
+ 1); // start log errors count
+
+ return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+ io_executor_, retry_executor_, rpc_client,
+ region_locator);
+}
+
+template <typename ACTION>
+std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) {
+ std::vector<std::shared_ptr<hbase::Row>> rows;
+ for (auto action : actions) {
+ std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action);
+ rows.push_back(srow);
+ }
+ return rows;
+}
+
+template <typename REQ, typename RESP>
+std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions,
+ std::vector<folly::Try<RESP>> &tresults) {
+ std::vector<std::shared_ptr<hbase::Result>> results{};
+ uint64_t num = 0;
+ for (auto tresult : tresults) {
+ if (tresult.hasValue()) {
+ results.push_back(tresult.value());
+ } else if (tresult.hasException()) {
+ folly::exception_wrapper ew = tresult.exception();
+ LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for "
+ << actions[num].row();
+ throw ew;
+ }
+ ++num;
+ }
+ return results;
+}
+
+template <typename ACTION>
+std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions(
+ uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) {
+ std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
+ for (uint64_t i = 0; i < num_rows; ++i) {
+ auto row = "test" + std::to_string(i);
+ ACTION action(row);
+ actions.push_back(action);
+ region_locations[row] = table->GetRegionLocation(row);
+ }
+ return region_locations;
+}
+
+void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+ const std::string &table_name, bool split_regions, uint32_t tries = 3,
+ uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) {
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
+
+ // Create a client
+ Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+ // Get connection to HBase Table
+ std::shared_ptr<Table> table = client.Table(tn);
+
+ for (uint64_t i = 0; i < num_rows; i++) {
+ table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+ "value" + std::to_string(i)));
+ }
+ std::vector<hbase::Get> gets;
+ auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table);
+
+ /* set region locator */
+ region_locator->set_region_location(region_locations);
+
+ /* init hbase client connection */
+ auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
+ conn->Init();
+
+ /* init retry caller factory */
+ auto tableImpl =
+ std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+ std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets);
+ auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
+ milliseconds(operation_timeout_millis));
+ ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+
+ auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults);
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
+ uint32_t i = 0;
+ for (; i < num_rows; ++i) {
+ ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
+ << " must not be empty";
+ EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
+ EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
+ }
+
+ table->Close();
+ client.Close();
+ conn->Close();
+}
+
+void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+ const std::string &table_name, bool split_regions, uint32_t tries = 3,
+ uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
+
+ // Create a client
+ Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+ // Get connection to HBase Table
+ std::shared_ptr<Table> table = client.Table(tn);
+
+ std::vector<hbase::Put> puts;
+ auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table);
+
+ /* set region locator */
+ region_locator->set_region_location(region_locations);
+
+ /* init hbase client connection */
+ auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
+ conn->Init();
+
+ /* init retry caller factory */
+ auto tableImpl =
+ std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+ std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts);
+ auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
+ milliseconds(operation_timeout_millis));
+ ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+
+ auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults);
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
+
+ table->Close();
+ client.Close();
+ conn->Close();
+}
+
+// Test successful case
+TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runMultiGets(region_locator, "table1", false);
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table2", false, 5);
+}
+
+// Tests the RPC failing 4 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table4", false);
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(6));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000));
+}
+
+//////////////////////
+// Test successful case
+TEST_F(AsyncBatchRpcRetryTest, MultiPuts) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runMultiPuts(region_locator, "table1", false);
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runMultiPuts(region_locator, "table2", false, 5);
+}
+
+// Tests the RPC failing 4 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runMultiPuts(region_locator, "table4", false);
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(6));
+ EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000));
+}
+
+ // Test successful case
+ TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runMultiGets(region_locator, "table7", true);
+ }
+
+ // Tests the RPC failing 3 times, then succeeding
+ TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table8", true, 5);
+ }
+
+ // Tests the RPC failing 4 times, throwing an exception
+ TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true));
+ }
+
+ // Tests the region location lookup failing 3 times, then succeeding
+ TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table10", true);
+ }
+
+ // Tests the region location lookup failing 5 times, throwing an exception
+ TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3));
+ }
+
+ // Tests hitting operation timeout, thus not retrying anymore
+ TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(6));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000));
+ }
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-client-scanner.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-client-scanner.cc b/hbase-native-client/src/hbase/client/async-client-scanner.cc
new file mode 100644
index 0000000..50c01ee
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-client-scanner.cc
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-client-scanner.h"
+
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+
+AsyncClientScanner::AsyncClientScanner(
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
+ std::shared_ptr<pb::TableName> table_name, std::shared_ptr<RawScanResultConsumer> consumer,
+ nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
+ nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+ : conn_(conn),
+ scan_(scan),
+ table_name_(table_name),
+ consumer_(consumer),
+ pause_(pause),
+ max_retries_(max_retries),
+ scan_timeout_nanos_(scan_timeout_nanos),
+ rpc_timeout_nanos_(rpc_timeout_nanos),
+ start_log_errors_count_(start_log_errors_count) {
+ results_cache_ = std::make_shared<ScanResultCache>();
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+}
+
+void AsyncClientScanner::Start() { OpenScanner(); }
+
+folly::Future<std::shared_ptr<OpenScannerResponse>> AsyncClientScanner::CallOpenScanner(
+ std::shared_ptr<hbase::RpcClient> rpc_client,
+ std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc) {
+ open_scanner_tries_++;
+
+ auto preq = RequestConverter::ToScanRequest(*scan_, loc->region_name(), scan_->Caching(), false);
+
+ auto self(shared_from_this());
+ VLOG(5) << "Calling RPC Client to open the scanner for region:" << loc->DebugString();
+ return rpc_client
+ ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq),
+ security::User::defaultUser(), "ClientService")
+ .then([self, loc, controller, rpc_client](const std::unique_ptr<Response>& presp) {
+ VLOG(5) << "Scan Response:" << presp->DebugString();
+ return std::make_shared<OpenScannerResponse>(rpc_client, presp, loc, controller);
+ });
+}
+
+void AsyncClientScanner::OpenScanner() {
+ auto self(shared_from_this());
+ open_scanner_tries_ = 1;
+
+ auto caller = conn_->caller_factory()
+ ->Single<std::shared_ptr<OpenScannerResponse>>()
+ ->table(table_name_)
+ ->row(scan_->StartRow())
+ ->locate_type(GetLocateType(*scan_))
+ ->rpc_timeout(rpc_timeout_nanos_)
+ ->operation_timeout(scan_timeout_nanos_)
+ ->pause(pause_)
+ ->max_retries(max_retries_)
+ ->start_log_errors_count(start_log_errors_count_)
+ ->action([&](std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc,
+ std::shared_ptr<hbase::RpcClient> rpc_client)
+ -> folly::Future<std::shared_ptr<OpenScannerResponse>> {
+ return CallOpenScanner(rpc_client, controller, loc);
+ })
+ ->Build();
+
+ caller->Call()
+ .then([this, self](std::shared_ptr<OpenScannerResponse> resp) {
+ VLOG(3) << "Opened scanner with id:" << resp->scan_resp_->scanner_id()
+ << ", region:" << resp->region_location_->DebugString() << ", starting scan";
+ StartScan(resp);
+ })
+ .onError([this, self](const folly::exception_wrapper& e) {
+ VLOG(3) << "Open scan request received error:" << e.what();
+ consumer_->OnError(e);
+ })
+ .then([caller, self](const auto r) { return r; });
+}
+
+void AsyncClientScanner::StartScan(std::shared_ptr<OpenScannerResponse> resp) {
+ auto self(shared_from_this());
+ auto caller = conn_->caller_factory()
+ ->Scan()
+ ->scanner_id(resp->scan_resp_->scanner_id())
+ ->region_location(resp->region_location_)
+ ->scanner_lease_timeout(TimeUtil::MillisToNanos(resp->scan_resp_->ttl()))
+ ->scan(scan_)
+ ->rpc_client(resp->rpc_client_)
+ ->consumer(consumer_)
+ ->results_cache(results_cache_)
+ ->rpc_timeout(rpc_timeout_nanos_)
+ ->scan_timeout(scan_timeout_nanos_)
+ ->pause(pause_)
+ ->max_retries(max_retries_)
+ ->start_log_errors_count(start_log_errors_count_)
+ ->Build();
+
+ caller->Start(resp->controller_, resp->scan_resp_, resp->cell_scanner_)
+ .then([caller, self](const bool has_more) {
+ if (has_more) {
+ // open the next scanner on the next region.
+ self->OpenScanner();
+ } else {
+ self->consumer_->OnComplete();
+ }
+ })
+ .onError([caller, self](const folly::exception_wrapper& e) { self->consumer_->OnError(e); })
+ .then([caller, self](const auto r) { return r; });
+}
+
+RegionLocateType AsyncClientScanner::GetLocateType(const Scan& scan) {
+ // TODO: In C++, there is no Scan::IncludeStartRow() and Scan::IncludeStopRow().
+ // When added, this method should be modified to return other RegionLocateTypes
+ // (see ConnectionUtils.java #getLocateType())
+ // TODO: When reversed scans are implemented, return other RegionLocateTypes
+ return RegionLocateType::kCurrent;
+}
+
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-connection.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-connection.cc b/hbase-native-client/src/hbase/client/async-connection.cc
new file mode 100644
index 0000000..f1bdebc
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-connection.cc
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-connection.h"
+#include "hbase/client/async-rpc-retrying-caller-factory.h"
+
+namespace hbase {
+
+void AsyncConnectionImpl::Init() {
+ connection_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
+ // start thread pools
+ auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
+ auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
+ cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
+ io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
+ /*
+ * We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly.
+ * Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments
+ * in async-rpc-retrying-caller.cc.
+ */
+ retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
+
+ std::shared_ptr<Codec> codec = nullptr;
+ if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
+ std::string(KeyValueCodec::kJavaClassName)) {
+ codec = std::make_shared<hbase::KeyValueCodec>();
+ } else {
+ LOG(WARNING) << "Not using RPC Cell Codec";
+ }
+ rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, cpu_executor_, codec, conf_,
+ connection_conf_->connect_timeout());
+ location_cache_ = std::make_shared<hbase::LocationCache>(conf_, io_executor_, cpu_executor_,
+ rpc_client_->connection_pool());
+ caller_factory_ =
+ std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
+}
+
+// We can't have the threads continue running after everything is done
+// that leads to an error.
+AsyncConnectionImpl::~AsyncConnectionImpl() { Close(); }
+
+void AsyncConnectionImpl::Close() {
+ if (is_closed_) return;
+
+ cpu_executor_->stop();
+ io_executor_->stop();
+ retry_executor_->stop();
+ retry_timer_->destroy();
+ if (rpc_client_.get()) rpc_client_->Close();
+ is_closed_ = true;
+}
+
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-rpc-retrying-caller-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-rpc-retrying-caller-factory.cc b/hbase-native-client/src/hbase/client/async-rpc-retrying-caller-factory.cc
new file mode 100644
index 0000000..42b4eac
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-rpc-retrying-caller-factory.cc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-rpc-retrying-caller-factory.h"
+
+namespace hbase {} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-rpc-retrying-caller.cc b/hbase-native-client/src/hbase/client/async-rpc-retrying-caller.cc
new file mode 100644
index 0000000..4c39f05
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-rpc-retrying-caller.cc
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-rpc-retrying-caller.h"
+
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/Unit.h>
+
+#include "hbase/connection/rpc-client.h"
+#include "hbase/client/async-connection.h"
+#include "hbase/client/hbase-rpc-controller.h"
+#include "hbase/client/region-location.h"
+#include "hbase/client/result.h"
+#include "hbase/exceptions/exception.h"
+#include "hbase/if/HBase.pb.h"
+#include "hbase/utils/connection-util.h"
+#include "hbase/utils/sys-util.h"
+#include "hbase/utils/time-util.h"
+
+using folly::exception_wrapper;
+
+namespace hbase {
+
+template <typename RESP>
+AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row,
+ RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause,
+ uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos,
+ std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+ : conn_(conn),
+ retry_timer_(retry_timer),
+ table_name_(table_name),
+ row_(row),
+ locate_type_(locate_type),
+ callable_(callable),
+ pause_(pause),
+ max_retries_(max_retries),
+ operation_timeout_nanos_(operation_timeout_nanos),
+ rpc_timeout_nanos_(rpc_timeout_nanos),
+ start_log_errors_count_(start_log_errors_count),
+ promise_(std::make_shared<folly::Promise<RESP>>()),
+ tries_(1) {
+ controller_ = conn_->CreateRpcController();
+ start_ns_ = TimeUtil::GetNowNanos();
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+ exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+}
+
+template <typename RESP>
+AsyncSingleRequestRpcRetryingCaller<RESP>::~AsyncSingleRequestRpcRetryingCaller() {}
+
+template <typename RESP>
+folly::Future<RESP> AsyncSingleRequestRpcRetryingCaller<RESP>::Call() {
+ auto f = promise_->getFuture();
+ LocateThenCall();
+ return f;
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
+ int64_t locate_timeout_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ locate_timeout_ns = RemainingTimeNs();
+ if (locate_timeout_ns <= 0) {
+ CompleteExceptionally();
+ return;
+ }
+ } else {
+ locate_timeout_ns = -1L;
+ }
+
+ conn_->region_locator()
+ ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
+ .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
+ .onError([this](const exception_wrapper& e) {
+ OnError(e,
+ [this, e]() -> std::string {
+ return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
+ table_name_->qualifier() + " failed with e.what()=" +
+ e.what().toStdString() + ", tries = " + std::to_string(tries_) +
+ ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " +
+ TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " +
+ TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
+ },
+ [](const exception_wrapper& error) {});
+ });
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
+ const exception_wrapper& error, Supplier<std::string> err_msg,
+ Consumer<exception_wrapper> update_cached_location) {
+ ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
+ exceptions_->push_back(twec);
+ if (!ExceptionUtil::ShouldRetry(error) || tries_ >= max_retries_) {
+ CompleteExceptionally();
+ return;
+ }
+
+ if (tries_ > start_log_errors_count_) {
+ LOG(WARNING) << err_msg();
+ } else {
+ VLOG(1) << err_msg();
+ }
+
+ int64_t delay_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+ if (max_delay_ns <= 0) {
+ CompleteExceptionally();
+ return;
+ }
+ delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
+ } else {
+ delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
+ }
+ update_cached_location(error);
+ tries_++;
+
+ /*
+ * The HHWheelTimer::scheduleTimeout() fails with an assertion from
+ * EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of
+ * the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry
+ * timer from IOThreadPool threads. It only works when executed from a single-thread pool
+ * (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should
+ * still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool.
+ * IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection
+ * establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread
+ * just hangs because it deadlocks itself.
+ */
+ conn_->retry_executor()->add([=]() {
+ retry_timer_->scheduleTimeoutFn(
+ [=]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); },
+ std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
+ });
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) {
+ int64_t call_timeout_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ call_timeout_ns = this->RemainingTimeNs();
+ if (call_timeout_ns <= 0) {
+ this->CompleteExceptionally();
+ return;
+ }
+ call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
+ } else {
+ call_timeout_ns = rpc_timeout_nanos_.count();
+ }
+
+ std::shared_ptr<RpcClient> rpc_client;
+
+ rpc_client = conn_->rpc_client();
+
+ ResetController(controller_, call_timeout_ns);
+
+ // TODO: RegionLocation should propagate through these method chains as a shared_ptr.
+ // Otherwise, it may get deleted underneat us. We are just copying for now.
+ auto loc_ptr = std::make_shared<RegionLocation>(loc);
+ callable_(controller_, loc_ptr, rpc_client)
+ .then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
+ .onError([&, loc_ptr, this](const exception_wrapper& e) {
+ OnError(
+ e,
+ [&, this, e]() -> std::string {
+ return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(),
+ loc_ptr->server_name().port()) +
+ " for '" + row_ + "' in " + loc_ptr->DebugString() + " of " +
+ table_name_->namespace_() + "::" + table_name_->qualifier() +
+ " failed with e.what()=" + e.what().toStdString() + ", tries = " +
+ std::to_string(tries_) + ", maxAttempts = " + std::to_string(max_attempts_) +
+ ", timeout = " + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
+ " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
+ },
+ [&, this](const exception_wrapper& error) {
+ conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error);
+ });
+ });
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::CompleteExceptionally() {
+ this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+}
+
+template <typename RESP>
+int64_t AsyncSingleRequestRpcRetryingCaller<RESP>::RemainingTimeNs() {
+ return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController(
+ std::shared_ptr<HBaseRpcController> controller, const int64_t& timeout_ns) {
+ controller->Reset();
+ if (timeout_ns >= 0) {
+ controller->set_call_timeout(std::chrono::milliseconds(
+ std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns))));
+ }
+}
+
+// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the
+// templetized
+// class definitions.
+class OpenScannerResponse;
+template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
+template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
+template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>;
+template class AsyncSingleRequestRpcRetryingCaller<bool>;
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-rpc-retrying-test.cc b/hbase-native-client/src/hbase/client/async-rpc-retrying-test.cc
new file mode 100644
index 0000000..6782d05
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-rpc-retrying-test.cc
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Memory.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
+#include <gmock/gmock.h>
+#include <google/protobuf/stubs/callback.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <chrono>
+#include <functional>
+#include <string>
+
+#include "hbase/connection/request.h"
+#include "hbase/connection/response.h"
+#include "hbase/connection/rpc-client.h"
+#include "hbase/client/async-connection.h"
+#include "hbase/client/async-rpc-retrying-caller-factory.h"
+#include "hbase/client/async-rpc-retrying-caller.h"
+#include "hbase/client/client.h"
+#include "hbase/client/connection-configuration.h"
+#include "hbase/client/hbase-rpc-controller.h"
+#include "hbase/client/keyvalue-codec.h"
+#include "hbase/client/region-location.h"
+#include "hbase/client/request-converter.h"
+#include "hbase/client/response-converter.h"
+#include "hbase/client/result.h"
+#include "hbase/exceptions/exception.h"
+#include "hbase/if/Client.pb.h"
+#include "hbase/if/HBase.pb.h"
+#include "hbase/test-util/test-util.h"
+#include "hbase/utils/time-util.h"
+
+using hbase::AsyncRpcRetryingCallerFactory;
+using hbase::AsyncConnection;
+using hbase::AsyncRegionLocator;
+using hbase::ConnectionConfiguration;
+using hbase::Configuration;
+using hbase::HBaseRpcController;
+using hbase::RegionLocation;
+using hbase::RegionLocateType;
+using hbase::RpcClient;
+using hbase::RequestConverter;
+using hbase::ResponseConverter;
+using hbase::ReqConverter;
+using hbase::RespConverter;
+using hbase::Put;
+using hbase::TimeUtil;
+using hbase::Client;
+using hbase::security::User;
+
+using ::testing::Return;
+using ::testing::_;
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+using namespace hbase;
+
+using folly::exception_wrapper;
+
+class AsyncRpcRetryTest : public ::testing::Test {
+ public:
+ static std::unique_ptr<hbase::TestUtil> test_util;
+
+ static void SetUpTestCase() {
+ google::InstallFailureSignalHandler();
+ test_util = std::make_unique<hbase::TestUtil>();
+ test_util->StartMiniCluster(2);
+ }
+};
+std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr;
+
+class AsyncRegionLocatorBase : public AsyncRegionLocator {
+ public:
+ AsyncRegionLocatorBase() {}
+ explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
+ : region_location_(region_location) {}
+ virtual ~AsyncRegionLocatorBase() = default;
+
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
+ const std::string &,
+ const RegionLocateType,
+ const int64_t) override {
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ promise.setValue(region_location_);
+ return promise.getFuture();
+ }
+
+ virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
+ region_location_ = region_location;
+ }
+
+ void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {}
+
+ protected:
+ std::shared_ptr<RegionLocation> region_location_;
+};
+
+class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
+ public:
+ MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
+ explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockAsyncRegionLocator() {}
+};
+
+class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+ uint32_t tries_ = 0;
+ uint32_t num_fails_ = 0;
+
+ public:
+ explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
+ : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+ explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockWrongRegionAsyncRegionLocator() {}
+
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+ const hbase::pb::TableName &tn, const std::string &row,
+ const RegionLocateType locate_type = RegionLocateType::kCurrent,
+ const int64_t locate_ns = 0) override {
+ // Fail for num_fails_ times, then delegate to the super class which will give the correct
+ // region location.
+ if (tries_++ > num_fails_) {
+ return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+ }
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ /* set random region name, simulating invalid region */
+ auto result = std::make_shared<RegionLocation>(
+ "whatever-region-name", region_location_->region_info(), region_location_->server_name());
+ promise.setValue(result);
+ return promise.getFuture();
+ }
+};
+
+class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+ uint32_t tries_ = 0;
+ uint32_t num_fails_ = 0;
+
+ public:
+ explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
+ : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+ explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+ : AsyncRegionLocatorBase(region_location) {}
+ virtual ~MockFailingAsyncRegionLocator() {}
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+ const hbase::pb::TableName &tn, const std::string &row,
+ const RegionLocateType locate_type = RegionLocateType::kCurrent,
+ const int64_t locate_ns = 0) override {
+ // Fail for num_fails_ times, then delegate to the super class which will give the correct
+ // region location.
+ if (tries_++ > num_fails_) {
+ return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+ }
+ folly::Promise<std::shared_ptr<RegionLocation>> promise;
+ promise.setException(std::runtime_error{"Failed to look up region location"});
+ return promise.getFuture();
+ }
+};
+
+class MockAsyncConnection : public AsyncConnection,
+ public std::enable_shared_from_this<MockAsyncConnection> {
+ public:
+ MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
+ std::shared_ptr<RpcClient> rpc_client,
+ std::shared_ptr<AsyncRegionLocator> region_locator)
+ : conn_conf_(conn_conf),
+ retry_timer_(retry_timer),
+ cpu_executor_(cpu_executor),
+ io_executor_(io_executor),
+ retry_executor_(retry_executor),
+ rpc_client_(rpc_client),
+ region_locator_(region_locator) {}
+ ~MockAsyncConnection() {}
+ void Init() {
+ caller_factory_ =
+ std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
+ }
+
+ std::shared_ptr<Configuration> conf() override { return nullptr; }
+ std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
+ std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
+ return caller_factory_;
+ }
+ std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
+ std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+ return retry_executor_;
+ }
+
+ void Close() override {}
+ std::shared_ptr<HBaseRpcController> CreateRpcController() override {
+ return std::make_shared<HBaseRpcController>();
+ }
+
+ private:
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+ std::shared_ptr<ConnectionConfiguration> conn_conf_;
+ std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+ std::shared_ptr<RpcClient> rpc_client_;
+ std::shared_ptr<AsyncRegionLocator> region_locator_;
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
+};
+
+template <typename CONN>
+class MockRawAsyncTableImpl {
+ public:
+ explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {}
+ virtual ~MockRawAsyncTableImpl() = default;
+
+ /* implement this in real RawAsyncTableImpl. */
+
+ /* in real RawAsyncTableImpl, this should be private. */
+ folly::Future<std::shared_ptr<hbase::Result>> GetCall(
+ std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<RegionLocation> loc, const hbase::Get &get) {
+ hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = [](
+ std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
+ std::shared_ptr<HBaseRpcController> controller,
+ std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
+ VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:"
+ << loc->DebugString();
+ return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
+ std::move(preq), User::defaultUser(), "ClientService");
+ };
+
+ return Call<hbase::Get, hbase::Request, hbase::Response, std::shared_ptr<hbase::Result>>(
+ rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call,
+ &hbase::ResponseConverter::FromGetResponse);
+ }
+
+ /* in real RawAsyncTableImpl, this should be private. */
+ template <typename REQ, typename PREQ, typename PRESP, typename RESP>
+ folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client,
+ std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<RegionLocation> loc, const REQ &req,
+ ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
+ hbase::RpcCall<PREQ, PRESP> rpc_call,
+ RespConverter<RESP, PRESP> resp_converter) {
+ promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
+ auto f = promise_->getFuture();
+ VLOG(1) << "calling rpc_call";
+ rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
+ .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) {
+ VLOG(1) << "MockRawAsyncTableImpl#call succeded: ";
+ RESP result = resp_converter(*presp);
+ promise_->setValue(result);
+ })
+ .onError([this](const exception_wrapper &e) {
+ VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what();
+ VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name();
+ promise_->setException(e);
+ });
+ return f;
+ }
+
+ private:
+ std::shared_ptr<CONN> conn_;
+ std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_;
+};
+
+void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName,
+ uint32_t operation_timeout_millis = 1200000) {
+ AsyncRpcRetryTest::test_util->CreateTable(tableName, "d");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>(tableName);
+ auto row = "test2";
+
+ // Get to be performed on above HBase Table
+ hbase::Get get(row);
+
+ // Create a client
+ Client client(*(AsyncRpcRetryTest::test_util->conf()));
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+
+ table->Put(Put{"test2"}.AddColumn("d", "2", "value2"));
+ table->Put(Put{"test2"}.AddColumn("d", "extra", "value for extra"));
+
+ /* init region location and rpc channel */
+ auto region_location = table->GetRegionLocation(row);
+
+ // auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4);
+ auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+ auto io_executor_ = client.async_connection()->io_executor();
+ auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ auto codec = std::make_shared<hbase::KeyValueCodec>();
+ auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
+ AsyncRpcRetryTest::test_util->conf());
+ // auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true);
+ std::shared_ptr<folly::HHWheelTimer> retry_timer =
+ folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
+
+ /* init connection configuration */
+ auto connection_conf = std::make_shared<ConnectionConfiguration>(
+ TimeUtil::SecondsToNanos(20), // connect_timeout
+ TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout
+ TimeUtil::SecondsToNanos(60), // rpc_timeout
+ TimeUtil::MillisToNanos(100), // pause
+ 5, // max retries
+ 9); // start log errors count
+
+ /* set region locator */
+ region_locator->set_region_location(region_location);
+
+ /* init hbase client connection */
+ auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+ io_executor_, retry_executor_, rpc_client,
+ region_locator);
+ conn->Init();
+
+ /* init retry caller factory */
+ auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn);
+
+ /* init request caller builder */
+ auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>();
+
+ /* call with retry to get result */
+
+ auto async_caller =
+ builder->table(std::make_shared<hbase::pb::TableName>(tn))
+ ->row(row)
+ ->rpc_timeout(conn->connection_conf()->read_rpc_timeout())
+ ->operation_timeout(conn->connection_conf()->operation_timeout())
+ ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc,
+ std::shared_ptr<hbase::RpcClient> rpc_client)
+ -> folly::Future<std::shared_ptr<hbase::Result>> {
+ return tableImpl->GetCall(rpc_client, controller, loc, get);
+ })
+ ->Build();
+
+ auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
+
+ auto result = async_caller->Call().get(milliseconds(500000));
+
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+ EXPECT_EQ("test2", result->Row());
+ EXPECT_EQ("value2", *(result->Value("d", "2")));
+ EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
+
+ retry_timer->destroy();
+ table->Close();
+ client.Close();
+ retry_executor_->stop();
+}
+
+// Test successful case
+TEST_F(AsyncRpcRetryTest, TestGetBasic) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runTest(region_locator, "table1");
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncRpcRetryTest, TestHandleException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runTest(region_locator, "table2");
+}
+
+// Tests the RPC failing 5 times, throwing an exception
+TEST_F(AsyncRpcRetryTest, TestFailWithException) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(5));
+ EXPECT_ANY_THROW(runTest(region_locator, "table3"));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runTest(region_locator, "table4");
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(5));
+ EXPECT_ANY_THROW(runTest(region_locator, "table5"));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ EXPECT_ANY_THROW(runTest(region_locator, "table6", 200));
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-scan-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-scan-rpc-retrying-caller.cc b/hbase-native-client/src/hbase/client/async-scan-rpc-retrying-caller.cc
new file mode 100644
index 0000000..2189128
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-scan-rpc-retrying-caller.cc
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-scan-rpc-retrying-caller.h"
+
+namespace hbase {
+
+ScanResumerImpl::ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
+ : caller_(caller), mutex_() {}
+
+void ScanResumerImpl::Resume() {
+ // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
+ // just return at the first if condition without loading the resp and numValidResuls field. If
+ // resume is called after suspend, then it is also safe to just reference resp and
+ // numValidResults after the synchronized block as no one will change it anymore.
+ std::shared_ptr<pb::ScanResponse> local_resp;
+ int64_t local_num_complete_rows;
+
+ {
+ std::unique_lock<std::mutex> mlock{mutex_};
+ if (state_ == ScanResumerState::kInitialized) {
+ // user calls this method before we call prepare, so just set the state to
+ // RESUMED, the implementation will just go on.
+ state_ = ScanResumerState::kResumed;
+ return;
+ }
+ if (state_ == ScanResumerState::kResumed) {
+ // already resumed, give up.
+ return;
+ }
+ state_ = ScanResumerState::kResumed;
+ local_resp = resp_;
+ local_num_complete_rows = num_complete_rows_;
+ }
+
+ caller_->CompleteOrNext(local_resp);
+}
+
+bool ScanResumerImpl::Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows) {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ if (state_ == ScanResumerState::kResumed) {
+ // user calls resume before we actually suspend the scan, just continue;
+ return false;
+ }
+ state_ = ScanResumerState::kSuspended;
+ resp_ = resp;
+ num_complete_rows_ = num_complete_rows;
+
+ return true;
+}
+
+ScanControllerImpl::ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
+ : caller_(caller) {}
+
+std::shared_ptr<ScanResumer> ScanControllerImpl::Suspend() {
+ PreCheck();
+ state_ = ScanControllerState::kSuspended;
+ resumer_ = std::make_shared<ScanResumerImpl>(caller_);
+ return resumer_;
+}
+
+void ScanControllerImpl::Terminate() {
+ PreCheck();
+ state_ = ScanControllerState::kTerminated;
+}
+
+// return the current state, and set the state to DESTROYED.
+ScanControllerState ScanControllerImpl::Destroy() {
+ ScanControllerState state = state_;
+ state_ = ScanControllerState::kDestroyed;
+ return state;
+}
+
+void ScanControllerImpl::PreCheck() {
+ CHECK(std::this_thread::get_id() == caller_thread_id_)
+ << "The current thread is" << std::this_thread::get_id() << ", expected thread is "
+ << caller_thread_id_ << ", you should not call this method outside OnNext or OnHeartbeat";
+
+ CHECK(state_ == ScanControllerState::kInitialized) << "Invalid Stopper state "
+ << DebugString(state_);
+}
+
+std::string ScanControllerImpl::DebugString(ScanControllerState state) {
+ switch (state) {
+ case ScanControllerState::kInitialized:
+ return "kInitialized";
+ case ScanControllerState::kSuspended:
+ return "kSuspended";
+ case ScanControllerState::kTerminated:
+ return "kTerminated";
+ case ScanControllerState::kDestroyed:
+ return "kDestroyed";
+ default:
+ return "UNKNOWN";
+ }
+}
+
+std::string ScanControllerImpl::DebugString(ScanResumerState state) {
+ switch (state) {
+ case ScanResumerState::kInitialized:
+ return "kInitialized";
+ case ScanResumerState::kSuspended:
+ return "kSuspended";
+ case ScanResumerState::kResumed:
+ return "kResumed";
+ default:
+ return "UNKNOWN";
+ }
+}
+
+AsyncScanRpcRetryingCaller::AsyncScanRpcRetryingCaller(
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<Scan> scan, int64_t scanner_id,
+ std::shared_ptr<ScanResultCache> results_cache, std::shared_ptr<RawScanResultConsumer> consumer,
+ std::shared_ptr<RegionLocation> region_location, nanoseconds scanner_lease_timeout_nanos,
+ nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
+ nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+ : conn_(conn),
+ retry_timer_(retry_timer),
+ rpc_client_(rpc_client),
+ scan_(scan),
+ scanner_id_(scanner_id),
+ results_cache_(results_cache),
+ consumer_(consumer),
+ region_location_(region_location),
+ scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos),
+ pause_(pause),
+ max_retries_(max_retries),
+ scan_timeout_nanos_(scan_timeout_nanos),
+ rpc_timeout_nanos_(rpc_timeout_nanos),
+ start_log_errors_count_(start_log_errors_count),
+ promise_(std::make_shared<folly::Promise<bool>>()),
+ tries_(1) {
+ controller_ = conn_->CreateRpcController();
+ start_ns_ = TimeUtil::GetNowNanos();
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+ exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+}
+
+folly::Future<bool> AsyncScanRpcRetryingCaller::Start(
+ std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<pb::ScanResponse> open_scan_resp,
+ const std::shared_ptr<CellScanner> cell_scanner) {
+ OnComplete(controller, open_scan_resp, cell_scanner);
+ return promise_->getFuture();
+}
+
+int64_t AsyncScanRpcRetryingCaller::RemainingTimeNs() {
+ return scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<pb::ScanResponse> resp,
+ const std::shared_ptr<CellScanner> cell_scanner) {
+ VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_;
+
+ if (controller->Failed()) {
+ OnError(controller->exception());
+ return;
+ }
+
+ bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message();
+
+ int64_t num_complete_rows_before = results_cache_->num_complete_rows();
+ try {
+ auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner);
+
+ auto results = results_cache_->AddAndGet(raw_results, is_heartbeat);
+
+ auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this());
+
+ if (results.size() > 0) {
+ UpdateNextStartRowWhenError(*results[results.size() - 1]);
+ VLOG(5) << "Calling consumer->OnNext()";
+ consumer_->OnNext(results, scan_controller);
+ } else if (is_heartbeat) {
+ consumer_->OnHeartbeat(scan_controller);
+ }
+
+ ScanControllerState state = scan_controller->Destroy();
+ if (state == ScanControllerState::kTerminated) {
+ if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
+ // we have more results in region but user request to stop the scan, so we need to close the
+ // scanner explicitly.
+ CloseScanner();
+ }
+ CompleteNoMoreResults();
+ return;
+ }
+
+ int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before;
+ if (state == ScanControllerState::kSuspended) {
+ if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) {
+ return;
+ }
+ }
+ } catch (const std::runtime_error& e) {
+ // We can not retry here. The server has responded normally and the call sequence has been
+ // increased so a new scan with the same call sequence will cause an
+ // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
+ LOG(WARNING) << "Received exception in reading the scan response:" << e.what();
+ CompleteWhenError(true);
+ return;
+ }
+
+ CompleteOrNext(resp);
+}
+
+void AsyncScanRpcRetryingCaller::CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp) {
+ VLOG(5) << "Scan: CompleteOrNext, scanner_id" << scanner_id_
+ << ", response:" << resp->ShortDebugString();
+
+ if (resp->has_more_results() && !resp->more_results()) {
+ // RS tells us there is no more data for the whole scan
+ CompleteNoMoreResults();
+ return;
+ }
+ // TODO: Implement Scan::limit(), and check the limit here
+
+ if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
+ // TODO: check whether Scan is reversed here
+ CompleteWhenNoMoreResultsInRegion();
+ return;
+ }
+ Next();
+}
+
+void AsyncScanRpcRetryingCaller::CompleteExceptionally(bool close_scanner) {
+ VLOG(5) << "Scan: CompleteExceptionally";
+ results_cache_->Clear();
+ if (close_scanner) {
+ CloseScanner();
+ }
+ this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+}
+
+void AsyncScanRpcRetryingCaller::CompleteNoMoreResults() {
+ // In master code, scanners auto-close if we have exhausted the region. It may not be the case
+ // in branch-1 code. If this is backported, make sure that the scanner is closed.
+ VLOG(5) << "Scan: CompleteNoMoreResults, scanner_id:" << scanner_id_;
+ promise_->setValue(false);
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWhenNoMoreResultsInRegion() {
+ VLOG(5) << "Scan: CompleteWhenNoMoreResultsInRegion, scanner_id:" << scanner_id_;
+ // In master code, scanners auto-close if we have exhausted the region. It may not be the case
+ // in branch-1 code. If this is backported, make sure that the scanner is closed.
+ if (NoMoreResultsForScan(*scan_, region_location_->region_info())) {
+ CompleteNoMoreResults();
+ } else {
+ CompleteWithNextStartRow(region_location_->region_info().end_key(), true);
+ }
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWithNextStartRow(std::string row, bool inclusive) {
+ VLOG(5) << "Scan: CompleteWithNextStartRow: region scan is complete, move to next region";
+ scan_->SetStartRow(row);
+ // TODO: set inclusive if it is reverse scans
+ promise_->setValue(true);
+}
+
+void AsyncScanRpcRetryingCaller::UpdateNextStartRowWhenError(const Result& result) {
+ next_start_row_when_error_ = optional<std::string>(result.Row());
+ include_next_start_row_when_error_ = result.Partial();
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWhenError(bool close_scanner) {
+ VLOG(5) << "Scan: CompleteWhenError, scanner_id:" << scanner_id_;
+ results_cache_->Clear();
+ if (close_scanner) {
+ CloseScanner();
+ }
+ if (next_start_row_when_error_) {
+ // TODO: HBASE-17583 adds include start / stop row to the Scan. Once we rebase and implement
+ // those options in Scan , we can start using that here.
+ scan_->SetStartRow(include_next_start_row_when_error_
+ ? *next_start_row_when_error_
+ : BytesUtil::CreateClosestRowAfter(*next_start_row_when_error_));
+ }
+ promise_->setValue(true);
+}
+
+void AsyncScanRpcRetryingCaller::OnError(const folly::exception_wrapper& error) {
+ VLOG(5) << "Scan: OnError, scanner_id:" << scanner_id_;
+ if (tries_ > start_log_errors_count_ || VLOG_IS_ON(5)) {
+ LOG(WARNING) << "Call to " << region_location_->server_name().ShortDebugString()
+ << " for scanner id = " << scanner_id_ << " for "
+ << region_location_->region_info().ShortDebugString()
+ << " failed, , tries = " << tries_ << ", maxAttempts = " << max_attempts_
+ << ", timeout = " << TimeUtil::ToMillis(scan_timeout_nanos_).count()
+ << " ms, time elapsed = " << TimeUtil::ElapsedMillis(start_ns_) << " ms"
+ << error.what().toStdString();
+ }
+
+ bool scanner_closed = ExceptionUtil::IsScannerClosed(error);
+ ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
+ exceptions_->push_back(twec);
+ if (tries_ >= max_retries_) {
+ CompleteExceptionally(!scanner_closed);
+ return;
+ }
+
+ int64_t delay_ns;
+ if (scan_timeout_nanos_.count() > 0) {
+ int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+ if (max_delay_ns <= 0) {
+ CompleteExceptionally(!scanner_closed);
+ return;
+ }
+ delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
+ } else {
+ delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
+ }
+
+ if (scanner_closed) {
+ CompleteWhenError(false);
+ return;
+ }
+
+ if (ExceptionUtil::IsScannerOutOfOrder(error)) {
+ CompleteWhenError(true);
+ return;
+ }
+ if (!ExceptionUtil::ShouldRetry(error)) {
+ CompleteExceptionally(true);
+ return;
+ }
+ tries_++;
+
+ auto self(shared_from_this());
+ conn_->retry_executor()->add([&]() {
+ retry_timer_->scheduleTimeoutFn(
+ [self]() { self->conn_->cpu_executor()->add([&]() { self->Call(); }); },
+ std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
+ });
+}
+
+bool AsyncScanRpcRetryingCaller::NoMoreResultsForScan(const Scan& scan,
+ const pb::RegionInfo& info) {
+ if (BytesUtil::IsEmptyStopRow(info.end_key())) {
+ return true;
+ }
+ if (BytesUtil::IsEmptyStopRow(scan.StopRow())) {
+ return false;
+ }
+ int32_t c = BytesUtil::CompareTo(info.end_key(), scan.StopRow());
+ // 1. if our stop row is less than the endKey of the region
+ // 2. if our stop row is equal to the endKey of the region and we do not include the stop row
+ // for scan.
+ return c > 0 ||
+ (c == 0 /* && !scan.IncludeStopRow()*/); // TODO: Scans always exclude StopRow for now.
+}
+
+void AsyncScanRpcRetryingCaller::Next() {
+ VLOG(5) << "Scan: Next";
+ next_call_seq_++;
+ tries_ = 1;
+ exceptions_->clear();
+ start_ns_ = TimeUtil::GetNowNanos();
+ Call();
+}
+
+void AsyncScanRpcRetryingCaller::Call() {
+ VLOG(5) << "Scan: Call";
+ auto self(shared_from_this());
+ // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
+ // less than the scan timeout. If the server does not respond in time(usually this will not
+ // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
+ // resending the next request and the only way to fix this is to close the scanner and open a
+ // new one.
+ int64_t call_timeout_nanos;
+ if (scan_timeout_nanos_.count() > 0) {
+ int64_t remaining_nanos = scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+ if (remaining_nanos <= 0) {
+ CompleteExceptionally(true);
+ return;
+ }
+ call_timeout_nanos = remaining_nanos;
+ } else {
+ call_timeout_nanos = 0L;
+ }
+
+ ResetController(controller_, call_timeout_nanos);
+
+ auto req =
+ RequestConverter::ToScanRequest(scanner_id_, scan_->Caching(), false, next_call_seq_, false);
+
+ // do the RPC call
+ rpc_client_
+ ->AsyncCall(region_location_->server_name().host_name(),
+ region_location_->server_name().port(), std::move(req),
+ security::User::defaultUser(), "ClientService")
+ .via(conn_->cpu_executor().get())
+ .then([self, this](const std::unique_ptr<Response>& resp) {
+ auto scan_resp = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
+ return OnComplete(controller_, scan_resp, resp->cell_scanner());
+ })
+ .onError([self, this](const folly::exception_wrapper& e) { OnError(e); });
+}
+
+void AsyncScanRpcRetryingCaller::CloseScanner() {
+ auto self(shared_from_this());
+ ResetController(controller_, rpc_timeout_nanos_.count());
+
+ VLOG(5) << "Closing scanner with scanner_id:" << folly::to<std::string>(scanner_id_);
+
+ // Do a close scanner RPC. Fire and forget.
+ auto req = RequestConverter::ToScanRequest(scanner_id_, 0, true);
+ rpc_client_
+ ->AsyncCall(region_location_->server_name().host_name(),
+ region_location_->server_name().port(), std::move(req),
+ security::User::defaultUser(), "ClientService")
+ .onError([self, this](const folly::exception_wrapper& e) -> std::unique_ptr<Response> {
+ LOG(WARNING) << "Call to " + region_location_->server_name().ShortDebugString() +
+ " for closing scanner_id = " + folly::to<std::string>(scanner_id_) +
+ " for " + region_location_->region_info().ShortDebugString() +
+ " failed, ignore, probably already closed. Exception:" +
+ e.what().toStdString();
+ return nullptr;
+ });
+}
+
+void AsyncScanRpcRetryingCaller::ResetController(std::shared_ptr<HBaseRpcController> controller,
+ const int64_t& timeout_nanos) {
+ controller->Reset();
+ if (timeout_nanos >= 0) {
+ controller->set_call_timeout(
+ milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_nanos))));
+ }
+}
+
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-table-result-scanner.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-table-result-scanner.cc b/hbase-native-client/src/hbase/client/async-table-result-scanner.cc
new file mode 100644
index 0000000..3812b3a
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-table-result-scanner.cc
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-table-result-scanner.h"
+
+#include <vector>
+
+namespace hbase {
+AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size)
+ : max_cache_size_(max_cache_size) {
+ closed_ = false;
+ cache_size_ = 0;
+}
+
+AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); }
+
+void AsyncTableResultScanner::Close() {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ closed_ = true;
+ while (!queue_.empty()) {
+ queue_.pop();
+ }
+ cache_size_ = 0;
+ if (resumer_ != nullptr) {
+ resumer_->Resume();
+ }
+ cond_.notify_all();
+}
+
+std::shared_ptr<Result> AsyncTableResultScanner::Next() {
+ VLOG(5) << "AsyncTableResultScanner: Next()";
+
+ std::shared_ptr<Result> result = nullptr;
+ std::shared_ptr<ScanResumer> local_resumer = nullptr;
+ {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ while (queue_.empty()) {
+ if (closed_) {
+ return nullptr;
+ }
+ if (error_) {
+ throw error_;
+ }
+ cond_.wait(mlock);
+ }
+ result = queue_.front();
+ queue_.pop();
+
+ cache_size_ -= EstimatedSizeWithSharedPtr(result);
+ if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) {
+ VLOG(1) << std::this_thread::get_id() << " resume scan prefetching";
+ local_resumer = resumer_;
+ resumer_ = nullptr;
+ }
+ }
+
+ // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that
+ // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC
+ // in the same event thread before returning from the previous call. This seems like the
+ // wrong thing to do(™), but we cannot fix that now. Since the call back can end up calling
+ // this::OnNext(), we should unlock the mutex.
+ if (local_resumer != nullptr) {
+ local_resumer->Resume();
+ }
+ return result;
+}
+
+void AsyncTableResultScanner::AddToCache(const std::vector<std::shared_ptr<Result>> &results) {
+ VLOG(5) << "AsyncTableResultScanner: AddToCache()";
+ for (const auto r : results) {
+ queue_.push(r);
+ cache_size_ += EstimatedSizeWithSharedPtr(r);
+ }
+}
+
+template <typename T>
+inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr<T> t) {
+ return t->EstimatedSize() + sizeof(std::shared_ptr<T>);
+}
+
+void AsyncTableResultScanner::OnNext(const std::vector<std::shared_ptr<Result>> &results,
+ std::shared_ptr<ScanController> controller) {
+ VLOG(5) << "AsyncTableResultScanner: OnNext()";
+ {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ if (closed_) {
+ controller->Terminate();
+ return;
+ }
+ AddToCache(results);
+
+ if (cache_size_ >= max_cache_size_) {
+ StopPrefetch(controller);
+ }
+ }
+ cond_.notify_all();
+}
+
+void AsyncTableResultScanner::StopPrefetch(std::shared_ptr<ScanController> controller) {
+ VLOG(1) << std::this_thread::get_id()
+ << ": stop prefetching when scanning as the cache size " +
+ folly::to<std::string>(cache_size_) + " is greater than the max_cache_size " +
+ folly::to<std::string>(max_cache_size_);
+
+ resumer_ = controller->Suspend();
+ num_prefetch_stopped_++;
+}
+
+/**
+ * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+ * onNext.
+ * <p>
+ * This method give you a chance to terminate a slow scan operation.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ * instance is only valid within the scope of onHeartbeat method. You can only call its
+ * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+ */
+void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr<ScanController> controller) {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ if (closed_) {
+ controller->Terminate();
+ }
+}
+
+/**
+ * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+ * <p>
+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+ */
+void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) {
+ LOG(WARNING) << "Scanner received error" << error.what();
+ std::unique_lock<std::mutex> mlock(mutex_);
+ error_ = error;
+ cond_.notify_all();
+}
+
+/**
+ * Indicate that the scan operation is completed normally.
+ */
+void AsyncTableResultScanner::OnComplete() {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ closed_ = true;
+ cond_.notify_all();
+}
+} // namespace hbase