You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/09/15 21:20:39 UTC
[22/25] hbase git commit: HBASE-18725 [C++] Install header files as
well as library
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
deleted file mode 100644
index 188f469..0000000
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Logging.h>
-#include <folly/io/async/EventBase.h>
-#include <chrono>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "connection/rpc-client.h"
-#include "core/async-batch-rpc-retrying-caller.h"
-#include "core/async-rpc-retrying-caller.h"
-#include "core/async-scan-rpc-retrying-caller.h"
-#include "core/raw-scan-result-consumer.h"
-#include "core/region-location.h"
-#include "core/row.h"
-#include "core/scan-result-cache.h"
-#include "core/scan.h"
-
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-
-namespace hbase {
-
-class AsyncConnection;
-
-template <typename RESP>
-class SingleRequestCallerBuilder
- : public std::enable_shared_from_this<SingleRequestCallerBuilder<RESP>> {
- public:
- explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn,
- std::shared_ptr<folly::HHWheelTimer> retry_timer)
- : conn_(conn),
- retry_timer_(retry_timer),
- table_name_(nullptr),
- rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
- pause_(conn->connection_conf()->pause()),
- operation_timeout_nanos_(conn->connection_conf()->operation_timeout()),
- max_retries_(conn->connection_conf()->max_retries()),
- start_log_errors_count_(conn->connection_conf()->start_log_errors_count()),
- locate_type_(RegionLocateType::kCurrent) {}
-
- virtual ~SingleRequestCallerBuilder() = default;
-
- typedef SingleRequestCallerBuilder<RESP> GenericThisType;
- typedef std::shared_ptr<GenericThisType> SharedThisPtr;
-
- SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
- table_name_ = table_name;
- return shared_this();
- }
-
- SharedThisPtr rpc_timeout(std::chrono::nanoseconds rpc_timeout_nanos) {
- rpc_timeout_nanos_ = rpc_timeout_nanos;
- return shared_this();
- }
-
- SharedThisPtr operation_timeout(std::chrono::nanoseconds operation_timeout_nanos) {
- operation_timeout_nanos_ = operation_timeout_nanos;
- return shared_this();
- }
-
- SharedThisPtr pause(std::chrono::nanoseconds pause) {
- pause_ = pause;
- return shared_this();
- }
-
- SharedThisPtr max_retries(uint32_t max_retries) {
- max_retries_ = max_retries;
- return shared_this();
- }
-
- SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) {
- start_log_errors_count_ = start_log_errors_count;
- return shared_this();
- }
-
- SharedThisPtr row(const std::string& row) {
- row_ = row;
- return shared_this();
- }
-
- SharedThisPtr locate_type(RegionLocateType locate_type) {
- locate_type_ = locate_type;
- return shared_this();
- }
-
- SharedThisPtr action(Callable<RESP> callable) {
- callable_ = callable;
- return shared_this();
- }
-
- folly::Future<RESP> Call() { return Build()->Call(); }
-
- std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<RESP>> Build() {
- return std::make_shared<AsyncSingleRequestRpcRetryingCaller<RESP>>(
- conn_, retry_timer_, table_name_, row_, locate_type_, callable_, pause_, max_retries_,
- operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
- }
-
- private:
- SharedThisPtr shared_this() {
- return std::enable_shared_from_this<GenericThisType>::shared_from_this();
- }
-
- private:
- std::shared_ptr<AsyncConnection> conn_;
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<pb::TableName> table_name_;
- std::chrono::nanoseconds rpc_timeout_nanos_;
- std::chrono::nanoseconds operation_timeout_nanos_;
- std::chrono::nanoseconds pause_;
- uint32_t max_retries_;
- uint32_t start_log_errors_count_;
- std::string row_;
- RegionLocateType locate_type_;
- Callable<RESP> callable_;
-}; // end of SingleRequestCallerBuilder
-
-template <typename REQ, typename RESP>
-class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder<REQ, RESP>> {
- public:
- explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn,
- std::shared_ptr<folly::HHWheelTimer> retry_timer)
- : conn_(conn), retry_timer_(retry_timer) {}
-
- virtual ~BatchCallerBuilder() = default;
-
- typedef std::shared_ptr<BatchCallerBuilder<REQ, RESP>> SharedThisPtr;
-
- SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
- table_name_ = table_name;
- return shared_this();
- }
-
- SharedThisPtr actions(std::shared_ptr<std::vector<REQ>> actions) {
- actions_ = actions;
- return shared_this();
- }
-
- SharedThisPtr operation_timeout(std::chrono::nanoseconds operation_timeout_nanos) {
- operation_timeout_nanos_ = operation_timeout_nanos;
- return shared_this();
- }
-
- SharedThisPtr rpc_timeout(std::chrono::nanoseconds rpc_timeout_nanos) {
- rpc_timeout_nanos_ = rpc_timeout_nanos;
- return shared_this();
- }
-
- SharedThisPtr pause(std::chrono::nanoseconds pause_ns) {
- pause_ns_ = pause_ns;
- return shared_this();
- }
-
- SharedThisPtr max_attempts(int32_t max_attempts) {
- max_attempts_ = max_attempts;
- return shared_this();
- }
-
- SharedThisPtr start_log_errors_count(int32_t start_log_errors_count) {
- start_log_errors_count_ = start_log_errors_count;
- return shared_this();
- }
-
- folly::Future<std::vector<folly::Try<RESP>>> Call() { return Build()->Call(); }
-
- std::shared_ptr<AsyncBatchRpcRetryingCaller<REQ, RESP>> Build() {
- return std::make_shared<AsyncBatchRpcRetryingCaller<REQ, RESP>>(
- conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_,
- operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
- }
-
- private:
- SharedThisPtr shared_this() {
- return std::enable_shared_from_this<BatchCallerBuilder>::shared_from_this();
- }
-
- private:
- std::shared_ptr<AsyncConnection> conn_;
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr;
- std::shared_ptr<std::vector<REQ>> actions_ = nullptr;
- std::chrono::nanoseconds pause_ns_;
- int32_t max_attempts_ = 0;
- std::chrono::nanoseconds operation_timeout_nanos_;
- std::chrono::nanoseconds rpc_timeout_nanos_;
- int32_t start_log_errors_count_ = 0;
-};
-
-class ScanCallerBuilder : public std::enable_shared_from_this<ScanCallerBuilder> {
- public:
- explicit ScanCallerBuilder(std::shared_ptr<AsyncConnection> conn,
- std::shared_ptr<folly::HHWheelTimer> retry_timer)
- : conn_(conn),
- retry_timer_(retry_timer),
- rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
- pause_(conn->connection_conf()->pause()),
- scan_timeout_nanos_(conn->connection_conf()->scan_timeout()),
- max_retries_(conn->connection_conf()->max_retries()),
- start_log_errors_count_(conn->connection_conf()->start_log_errors_count()),
- scanner_id_(-1) {}
-
- virtual ~ScanCallerBuilder() = default;
-
- typedef ScanCallerBuilder GenericThisType;
- typedef std::shared_ptr<ScanCallerBuilder> SharedThisPtr;
-
- SharedThisPtr rpc_client(std::shared_ptr<hbase::RpcClient> rpc_client) {
- rpc_client_ = rpc_client;
- return shared_this();
- }
-
- SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) {
- rpc_timeout_nanos_ = rpc_timeout_nanos;
- return shared_this();
- }
-
- SharedThisPtr scan_timeout(nanoseconds scan_timeout_nanos) {
- scan_timeout_nanos_ = scan_timeout_nanos;
- return shared_this();
- }
-
- SharedThisPtr scanner_lease_timeout(nanoseconds scanner_lease_timeout_nanos) {
- scanner_lease_timeout_nanos_ = scanner_lease_timeout_nanos;
- return shared_this();
- }
-
- SharedThisPtr pause(nanoseconds pause) {
- pause_ = pause;
- return shared_this();
- }
-
- SharedThisPtr max_retries(uint32_t max_retries) {
- max_retries_ = max_retries;
- return shared_this();
- }
-
- SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) {
- start_log_errors_count_ = start_log_errors_count;
- return shared_this();
- }
-
- SharedThisPtr region_location(std::shared_ptr<RegionLocation> region_location) {
- region_location_ = region_location;
- return shared_this();
- }
-
- SharedThisPtr scanner_id(int64_t scanner_id) {
- scanner_id_ = scanner_id;
- return shared_this();
- }
-
- SharedThisPtr scan(std::shared_ptr<Scan> scan) {
- scan_ = scan;
- return shared_this();
- }
-
- SharedThisPtr results_cache(std::shared_ptr<ScanResultCache> results_cache) {
- results_cache_ = results_cache;
- return shared_this();
- }
-
- SharedThisPtr consumer(std::shared_ptr<RawScanResultConsumer> consumer) {
- consumer_ = consumer;
- return shared_this();
- }
-
- std::shared_ptr<AsyncScanRpcRetryingCaller> Build() {
- return std::make_shared<AsyncScanRpcRetryingCaller>(
- conn_, retry_timer_, rpc_client_, scan_, scanner_id_, results_cache_, consumer_,
- region_location_, scanner_lease_timeout_nanos_, pause_, max_retries_, scan_timeout_nanos_,
- rpc_timeout_nanos_, start_log_errors_count_);
- }
-
- private:
- SharedThisPtr shared_this() {
- return std::enable_shared_from_this<GenericThisType>::shared_from_this();
- }
-
- private:
- std::shared_ptr<AsyncConnection> conn_;
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<hbase::RpcClient> rpc_client_;
- std::shared_ptr<Scan> scan_;
- nanoseconds rpc_timeout_nanos_;
- nanoseconds scan_timeout_nanos_;
- nanoseconds scanner_lease_timeout_nanos_;
- nanoseconds pause_;
- uint32_t max_retries_;
- uint32_t start_log_errors_count_;
- std::shared_ptr<RegionLocation> region_location_;
- int64_t scanner_id_;
- std::shared_ptr<RawScanResultConsumer> consumer_;
- std::shared_ptr<ScanResultCache> results_cache_;
-}; // end of ScanCallerBuilder
-
-class AsyncRpcRetryingCallerFactory {
- private:
- std::shared_ptr<AsyncConnection> conn_;
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-
- public:
- explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn,
- std::shared_ptr<folly::HHWheelTimer> retry_timer)
- : conn_(conn), retry_timer_(retry_timer) {}
-
- virtual ~AsyncRpcRetryingCallerFactory() = default;
-
- template <typename RESP>
- std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() {
- return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_);
- }
-
- template <typename REQ, typename RESP>
- std::shared_ptr<BatchCallerBuilder<REQ, RESP>> Batch() {
- return std::make_shared<BatchCallerBuilder<REQ, RESP>>(conn_, retry_timer_);
- }
-
- std::shared_ptr<ScanCallerBuilder> Scan() {
- return std::make_shared<ScanCallerBuilder>(conn_, retry_timer_);
- }
-};
-
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
deleted file mode 100644
index 8e60991..0000000
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-rpc-retrying-caller.h"
-
-#include <folly/Conv.h>
-#include <folly/ExceptionWrapper.h>
-#include <folly/Format.h>
-#include <folly/Logging.h>
-#include <folly/Unit.h>
-
-#include "connection/rpc-client.h"
-#include "core/async-connection.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/region-location.h"
-#include "core/result.h"
-#include "exceptions/exception.h"
-#include "if/HBase.pb.h"
-#include "utils/connection-util.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
-
-using folly::exception_wrapper;
-
-namespace hbase {
-
-template <typename RESP>
-AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
- std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row,
- RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause,
- uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos,
- std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
- : conn_(conn),
- retry_timer_(retry_timer),
- table_name_(table_name),
- row_(row),
- locate_type_(locate_type),
- callable_(callable),
- pause_(pause),
- max_retries_(max_retries),
- operation_timeout_nanos_(operation_timeout_nanos),
- rpc_timeout_nanos_(rpc_timeout_nanos),
- start_log_errors_count_(start_log_errors_count),
- promise_(std::make_shared<folly::Promise<RESP>>()),
- tries_(1) {
- controller_ = conn_->CreateRpcController();
- start_ns_ = TimeUtil::GetNowNanos();
- max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
- exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
-}
-
-template <typename RESP>
-AsyncSingleRequestRpcRetryingCaller<RESP>::~AsyncSingleRequestRpcRetryingCaller() {}
-
-template <typename RESP>
-folly::Future<RESP> AsyncSingleRequestRpcRetryingCaller<RESP>::Call() {
- auto f = promise_->getFuture();
- LocateThenCall();
- return f;
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
- int64_t locate_timeout_ns;
- if (operation_timeout_nanos_.count() > 0) {
- locate_timeout_ns = RemainingTimeNs();
- if (locate_timeout_ns <= 0) {
- CompleteExceptionally();
- return;
- }
- } else {
- locate_timeout_ns = -1L;
- }
-
- conn_->region_locator()
- ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
- .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
- .onError([this](const exception_wrapper& e) {
- OnError(e,
- [this, e]() -> std::string {
- return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
- table_name_->qualifier() + " failed with e.what()=" +
- e.what().toStdString() + ", tries = " + std::to_string(tries_) +
- ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " +
- TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " +
- TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
- },
- [](const exception_wrapper& error) {});
- });
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
- const exception_wrapper& error, Supplier<std::string> err_msg,
- Consumer<exception_wrapper> update_cached_location) {
- ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
- exceptions_->push_back(twec);
- if (!ExceptionUtil::ShouldRetry(error) || tries_ >= max_retries_) {
- CompleteExceptionally();
- return;
- }
-
- if (tries_ > start_log_errors_count_) {
- LOG(WARNING) << err_msg();
- } else {
- VLOG(1) << err_msg();
- }
-
- int64_t delay_ns;
- if (operation_timeout_nanos_.count() > 0) {
- int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
- if (max_delay_ns <= 0) {
- CompleteExceptionally();
- return;
- }
- delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
- } else {
- delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
- }
- update_cached_location(error);
- tries_++;
-
- /*
- * The HHWheelTimer::scheduleTimeout() fails with an assertion from
- * EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of
- * the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry
- * timer from IOThreadPool threads. It only works when executed from a single-thread pool
- * (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should
- * still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool.
- * IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection
- * establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread
- * just hangs because it deadlocks itself.
- */
- conn_->retry_executor()->add([=]() {
- retry_timer_->scheduleTimeoutFn(
- [=]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); },
- std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
- });
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) {
- int64_t call_timeout_ns;
- if (operation_timeout_nanos_.count() > 0) {
- call_timeout_ns = this->RemainingTimeNs();
- if (call_timeout_ns <= 0) {
- this->CompleteExceptionally();
- return;
- }
- call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
- } else {
- call_timeout_ns = rpc_timeout_nanos_.count();
- }
-
- std::shared_ptr<RpcClient> rpc_client;
-
- rpc_client = conn_->rpc_client();
-
- ResetController(controller_, call_timeout_ns);
-
- // TODO: RegionLocation should propagate through these method chains as a shared_ptr.
- // Otherwise, it may get deleted underneat us. We are just copying for now.
- auto loc_ptr = std::make_shared<RegionLocation>(loc);
- callable_(controller_, loc_ptr, rpc_client)
- .then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
- .onError([&, loc_ptr, this](const exception_wrapper& e) {
- OnError(
- e,
- [&, this, e]() -> std::string {
- return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(),
- loc_ptr->server_name().port()) +
- " for '" + row_ + "' in " + loc_ptr->DebugString() + " of " +
- table_name_->namespace_() + "::" + table_name_->qualifier() +
- " failed with e.what()=" + e.what().toStdString() + ", tries = " +
- std::to_string(tries_) + ", maxAttempts = " + std::to_string(max_attempts_) +
- ", timeout = " + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
- " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
- },
- [&, this](const exception_wrapper& error) {
- conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error);
- });
- });
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::CompleteExceptionally() {
- this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
-}
-
-template <typename RESP>
-int64_t AsyncSingleRequestRpcRetryingCaller<RESP>::RemainingTimeNs() {
- return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
-}
-
-template <typename RESP>
-void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController(
- std::shared_ptr<HBaseRpcController> controller, const int64_t& timeout_ns) {
- controller->Reset();
- if (timeout_ns >= 0) {
- controller->set_call_timeout(std::chrono::milliseconds(
- std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns))));
- }
-}
-
-// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the
-// templetized
-// class definitions.
-class OpenScannerResponse;
-template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
-template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
-template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>;
-template class AsyncSingleRequestRpcRetryingCaller<bool>;
-} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
deleted file mode 100644
index c7e28d0..0000000
--- a/hbase-native-client/core/async-rpc-retrying-caller.h
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/HHWheelTimer.h>
-
-#include <algorithm>
-#include <chrono>
-#include <functional>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <vector>
-#include "core/async-connection.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/region-location.h"
-#include "exceptions/exception.h"
-#include "if/HBase.pb.h"
-
-namespace hbase {
-
-template <typename T>
-using Supplier = std::function<T()>;
-
-template <typename T>
-using Consumer = std::function<void(T)>;
-
-template <typename R, typename S, typename... I>
-using ReqConverter = std::function<R(const S&, const I&...)>;
-
-template <typename R, typename S>
-using RespConverter = std::function<R(const S&)>;
-
-template <typename RESP>
-using RpcCallback = std::function<void(const RESP&)>;
-
-template <typename REQ, typename RESP>
-using RpcCall = std::function<folly::Future<std::unique_ptr<RESP>>(
- std::shared_ptr<RpcClient>, std::shared_ptr<RegionLocation>,
- std::shared_ptr<HBaseRpcController>, std::unique_ptr<REQ>)>;
-
-template <typename RESP>
-using Callable =
- std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>,
- std::shared_ptr<RegionLocation>, std::shared_ptr<RpcClient>)>;
-
-template <typename RESP>
-class AsyncSingleRequestRpcRetryingCaller {
- public:
- AsyncSingleRequestRpcRetryingCaller(
- std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row,
- RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause,
- uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos,
- std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
-
- virtual ~AsyncSingleRequestRpcRetryingCaller();
-
- folly::Future<RESP> Call();
-
- private:
- void LocateThenCall();
-
- void OnError(const folly::exception_wrapper& error, Supplier<std::string> err_msg,
- Consumer<folly::exception_wrapper> update_cached_location);
-
- void Call(const RegionLocation& loc);
-
- void CompleteExceptionally();
-
- int64_t RemainingTimeNs();
-
- static void ResetController(std::shared_ptr<HBaseRpcController> controller,
- const int64_t& timeout_ns);
-
- private:
- std::shared_ptr<AsyncConnection> conn_;
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<hbase::pb::TableName> table_name_;
- std::string row_;
- RegionLocateType locate_type_;
- Callable<RESP> callable_;
- std::chrono::nanoseconds pause_;
- uint32_t max_retries_;
- std::chrono::nanoseconds operation_timeout_nanos_;
- std::chrono::nanoseconds rpc_timeout_nanos_;
- uint32_t start_log_errors_count_;
- std::shared_ptr<folly::Promise<RESP>> promise_;
- std::shared_ptr<HBaseRpcController> controller_;
- uint64_t start_ns_;
- uint32_t tries_;
- std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
- uint32_t max_attempts_;
-};
-} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
deleted file mode 100644
index 2eb82a9..0000000
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <folly/Logging.h>
-#include <folly/Memory.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/ScopedEventBaseThread.h>
-#include <gmock/gmock.h>
-#include <google/protobuf/stubs/callback.h>
-#include <wangle/concurrent/IOThreadPoolExecutor.h>
-
-#include <chrono>
-#include <functional>
-#include <string>
-
-#include "connection/request.h"
-#include "connection/response.h"
-#include "connection/rpc-client.h"
-#include "core/async-connection.h"
-#include "core/async-rpc-retrying-caller-factory.h"
-#include "core/async-rpc-retrying-caller.h"
-#include "core/client.h"
-#include "core/connection-configuration.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/keyvalue-codec.h"
-#include "core/region-location.h"
-#include "core/request-converter.h"
-#include "core/response-converter.h"
-#include "core/result.h"
-#include "exceptions/exception.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-#include "test-util/test-util.h"
-#include "utils/time-util.h"
-
-using hbase::AsyncRpcRetryingCallerFactory;
-using hbase::AsyncConnection;
-using hbase::AsyncRegionLocator;
-using hbase::ConnectionConfiguration;
-using hbase::Configuration;
-using hbase::HBaseRpcController;
-using hbase::RegionLocation;
-using hbase::RegionLocateType;
-using hbase::RpcClient;
-using hbase::RequestConverter;
-using hbase::ResponseConverter;
-using hbase::ReqConverter;
-using hbase::RespConverter;
-using hbase::Put;
-using hbase::TimeUtil;
-using hbase::Client;
-using hbase::security::User;
-
-using ::testing::Return;
-using ::testing::_;
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-using namespace hbase;
-
-using folly::exception_wrapper;
-
-class AsyncRpcRetryTest : public ::testing::Test {
- public:
- static std::unique_ptr<hbase::TestUtil> test_util;
-
- static void SetUpTestCase() {
- google::InstallFailureSignalHandler();
- test_util = std::make_unique<hbase::TestUtil>();
- test_util->StartMiniCluster(2);
- }
-};
-std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr;
-
-class AsyncRegionLocatorBase : public AsyncRegionLocator {
- public:
- AsyncRegionLocatorBase() {}
- explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
- : region_location_(region_location) {}
- virtual ~AsyncRegionLocatorBase() = default;
-
- folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
- const std::string &,
- const RegionLocateType,
- const int64_t) override {
- folly::Promise<std::shared_ptr<RegionLocation>> promise;
- promise.setValue(region_location_);
- return promise.getFuture();
- }
-
- virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
- region_location_ = region_location;
- }
-
- void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {}
-
- protected:
- std::shared_ptr<RegionLocation> region_location_;
-};
-
-class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
- public:
- MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
- explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
- : AsyncRegionLocatorBase(region_location) {}
- virtual ~MockAsyncRegionLocator() {}
-};
-
-class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
- private:
- uint32_t tries_ = 0;
- uint32_t num_fails_ = 0;
-
- public:
- explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
- : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
- explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
- : AsyncRegionLocatorBase(region_location) {}
- virtual ~MockWrongRegionAsyncRegionLocator() {}
-
- folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
- const hbase::pb::TableName &tn, const std::string &row,
- const RegionLocateType locate_type = RegionLocateType::kCurrent,
- const int64_t locate_ns = 0) override {
- // Fail for num_fails_ times, then delegate to the super class which will give the correct
- // region location.
- if (tries_++ > num_fails_) {
- return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
- }
- folly::Promise<std::shared_ptr<RegionLocation>> promise;
- /* set random region name, simulating invalid region */
- auto result = std::make_shared<RegionLocation>(
- "whatever-region-name", region_location_->region_info(), region_location_->server_name());
- promise.setValue(result);
- return promise.getFuture();
- }
-};
-
-class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
- private:
- uint32_t tries_ = 0;
- uint32_t num_fails_ = 0;
-
- public:
- explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
- : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
- explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
- : AsyncRegionLocatorBase(region_location) {}
- virtual ~MockFailingAsyncRegionLocator() {}
- folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
- const hbase::pb::TableName &tn, const std::string &row,
- const RegionLocateType locate_type = RegionLocateType::kCurrent,
- const int64_t locate_ns = 0) override {
- // Fail for num_fails_ times, then delegate to the super class which will give the correct
- // region location.
- if (tries_++ > num_fails_) {
- return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
- }
- folly::Promise<std::shared_ptr<RegionLocation>> promise;
- promise.setException(std::runtime_error{"Failed to look up region location"});
- return promise.getFuture();
- }
-};
-
-class MockAsyncConnection : public AsyncConnection,
- public std::enable_shared_from_this<MockAsyncConnection> {
- public:
- MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
- std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
- std::shared_ptr<RpcClient> rpc_client,
- std::shared_ptr<AsyncRegionLocator> region_locator)
- : conn_conf_(conn_conf),
- retry_timer_(retry_timer),
- cpu_executor_(cpu_executor),
- io_executor_(io_executor),
- retry_executor_(retry_executor),
- rpc_client_(rpc_client),
- region_locator_(region_locator) {}
- ~MockAsyncConnection() {}
- void Init() {
- caller_factory_ =
- std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
- }
-
- std::shared_ptr<Configuration> conf() override { return nullptr; }
- std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
- std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
- return caller_factory_;
- }
- std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
- std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
- std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
- return retry_executor_;
- }
-
- void Close() override {}
- std::shared_ptr<HBaseRpcController> CreateRpcController() override {
- return std::make_shared<HBaseRpcController>();
- }
-
- private:
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<ConnectionConfiguration> conn_conf_;
- std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
- std::shared_ptr<RpcClient> rpc_client_;
- std::shared_ptr<AsyncRegionLocator> region_locator_;
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
-};
-
-template <typename CONN>
-class MockRawAsyncTableImpl {
- public:
- explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {}
- virtual ~MockRawAsyncTableImpl() = default;
-
- /* implement this in real RawAsyncTableImpl. */
-
- /* in real RawAsyncTableImpl, this should be private. */
- folly::Future<std::shared_ptr<hbase::Result>> GetCall(
- std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<RegionLocation> loc, const hbase::Get &get) {
- hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = [](
- std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
- std::shared_ptr<HBaseRpcController> controller,
- std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
- VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:"
- << loc->DebugString();
- return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
- std::move(preq), User::defaultUser(), "ClientService");
- };
-
- return Call<hbase::Get, hbase::Request, hbase::Response, std::shared_ptr<hbase::Result>>(
- rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call,
- &hbase::ResponseConverter::FromGetResponse);
- }
-
- /* in real RawAsyncTableImpl, this should be private. */
- template <typename REQ, typename PREQ, typename PRESP, typename RESP>
- folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client,
- std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<RegionLocation> loc, const REQ &req,
- ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
- hbase::RpcCall<PREQ, PRESP> rpc_call,
- RespConverter<RESP, PRESP> resp_converter) {
- promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
- auto f = promise_->getFuture();
- VLOG(1) << "calling rpc_call";
- rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
- .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) {
- VLOG(1) << "MockRawAsyncTableImpl#call succeded: ";
- RESP result = resp_converter(*presp);
- promise_->setValue(result);
- })
- .onError([this](const exception_wrapper &e) {
- VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what();
- VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name();
- promise_->setException(e);
- });
- return f;
- }
-
- private:
- std::shared_ptr<CONN> conn_;
- std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_;
-};
-
-void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName,
- uint32_t operation_timeout_millis = 1200000) {
- AsyncRpcRetryTest::test_util->CreateTable(tableName, "d");
-
- // Create TableName and Row to be fetched from HBase
- auto tn = folly::to<hbase::pb::TableName>(tableName);
- auto row = "test2";
-
- // Get to be performed on above HBase Table
- hbase::Get get(row);
-
- // Create a client
- Client client(*(AsyncRpcRetryTest::test_util->conf()));
-
- // Get connection to HBase Table
- auto table = client.Table(tn);
-
- table->Put(Put{"test2"}.AddColumn("d", "2", "value2"));
- table->Put(Put{"test2"}.AddColumn("d", "extra", "value for extra"));
-
- /* init region location and rpc channel */
- auto region_location = table->GetRegionLocation(row);
-
- // auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4);
- auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
- auto io_executor_ = client.async_connection()->io_executor();
- auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
- auto codec = std::make_shared<hbase::KeyValueCodec>();
- auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
- AsyncRpcRetryTest::test_util->conf());
- // auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true);
- std::shared_ptr<folly::HHWheelTimer> retry_timer =
- folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
-
- /* init connection configuration */
- auto connection_conf = std::make_shared<ConnectionConfiguration>(
- TimeUtil::SecondsToNanos(20), // connect_timeout
- TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout
- TimeUtil::SecondsToNanos(60), // rpc_timeout
- TimeUtil::MillisToNanos(100), // pause
- 5, // max retries
- 9); // start log errors count
-
- /* set region locator */
- region_locator->set_region_location(region_location);
-
- /* init hbase client connection */
- auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
- io_executor_, retry_executor_, rpc_client,
- region_locator);
- conn->Init();
-
- /* init retry caller factory */
- auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn);
-
- /* init request caller builder */
- auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>();
-
- /* call with retry to get result */
-
- auto async_caller =
- builder->table(std::make_shared<hbase::pb::TableName>(tn))
- ->row(row)
- ->rpc_timeout(conn->connection_conf()->read_rpc_timeout())
- ->operation_timeout(conn->connection_conf()->operation_timeout())
- ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
- std::shared_ptr<hbase::RegionLocation> loc,
- std::shared_ptr<hbase::RpcClient> rpc_client)
- -> folly::Future<std::shared_ptr<hbase::Result>> {
- return tableImpl->GetCall(rpc_client, controller, loc, get);
- })
- ->Build();
-
- auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
-
- auto result = async_caller->Call().get(milliseconds(500000));
-
- // Test the values, should be same as in put executed on hbase shell
- ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
- EXPECT_EQ("test2", result->Row());
- EXPECT_EQ("value2", *(result->Value("d", "2")));
- EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
-
- retry_timer->destroy();
- table->Close();
- client.Close();
- retry_executor_->stop();
-}
-
-// Test successful case
-TEST_F(AsyncRpcRetryTest, TestGetBasic) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockAsyncRegionLocator>());
- runTest(region_locator, "table1");
-}
-
-// Tests the RPC failing 3 times, then succeeding
-TEST_F(AsyncRpcRetryTest, TestHandleException) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
- runTest(region_locator, "table2");
-}
-
-// Tests the RPC failing 5 times, throwing an exception
-TEST_F(AsyncRpcRetryTest, TestFailWithException) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockWrongRegionAsyncRegionLocator>(5));
- EXPECT_ANY_THROW(runTest(region_locator, "table3"));
-}
-
-// Tests the region location lookup failing 3 times, then succeeding
-TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(3));
- runTest(region_locator, "table4");
-}
-
-// Tests the region location lookup failing 5 times, throwing an exception
-TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(5));
- EXPECT_ANY_THROW(runTest(region_locator, "table5"));
-}
-
-// Tests hitting operation timeout, thus not retrying anymore
-TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) {
- std::shared_ptr<AsyncRegionLocatorBase> region_locator(
- std::make_shared<MockFailingAsyncRegionLocator>(3));
- EXPECT_ANY_THROW(runTest(region_locator, "table6", 200));
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.cc b/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
deleted file mode 100644
index a1e8362..0000000
--- a/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-scan-rpc-retrying-caller.h"
-
-namespace hbase {
-
-ScanResumerImpl::ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
- : caller_(caller), mutex_() {}
-
-void ScanResumerImpl::Resume() {
- // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
- // just return at the first if condition without loading the resp and numValidResuls field. If
- // resume is called after suspend, then it is also safe to just reference resp and
- // numValidResults after the synchronized block as no one will change it anymore.
- std::shared_ptr<pb::ScanResponse> local_resp;
- int64_t local_num_complete_rows;
-
- {
- std::unique_lock<std::mutex> mlock{mutex_};
- if (state_ == ScanResumerState::kInitialized) {
- // user calls this method before we call prepare, so just set the state to
- // RESUMED, the implementation will just go on.
- state_ = ScanResumerState::kResumed;
- return;
- }
- if (state_ == ScanResumerState::kResumed) {
- // already resumed, give up.
- return;
- }
- state_ = ScanResumerState::kResumed;
- local_resp = resp_;
- local_num_complete_rows = num_complete_rows_;
- }
-
- caller_->CompleteOrNext(local_resp);
-}
-
-bool ScanResumerImpl::Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows) {
- std::unique_lock<std::mutex> mlock(mutex_);
- if (state_ == ScanResumerState::kResumed) {
- // user calls resume before we actually suspend the scan, just continue;
- return false;
- }
- state_ = ScanResumerState::kSuspended;
- resp_ = resp;
- num_complete_rows_ = num_complete_rows;
-
- return true;
-}
-
-ScanControllerImpl::ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
- : caller_(caller) {}
-
-std::shared_ptr<ScanResumer> ScanControllerImpl::Suspend() {
- PreCheck();
- state_ = ScanControllerState::kSuspended;
- resumer_ = std::make_shared<ScanResumerImpl>(caller_);
- return resumer_;
-}
-
-void ScanControllerImpl::Terminate() {
- PreCheck();
- state_ = ScanControllerState::kTerminated;
-}
-
-// return the current state, and set the state to DESTROYED.
-ScanControllerState ScanControllerImpl::Destroy() {
- ScanControllerState state = state_;
- state_ = ScanControllerState::kDestroyed;
- return state;
-}
-
-void ScanControllerImpl::PreCheck() {
- CHECK(std::this_thread::get_id() == caller_thread_id_)
- << "The current thread is" << std::this_thread::get_id() << ", expected thread is "
- << caller_thread_id_ << ", you should not call this method outside OnNext or OnHeartbeat";
-
- CHECK(state_ == ScanControllerState::kInitialized) << "Invalid Stopper state "
- << DebugString(state_);
-}
-
-std::string ScanControllerImpl::DebugString(ScanControllerState state) {
- switch (state) {
- case ScanControllerState::kInitialized:
- return "kInitialized";
- case ScanControllerState::kSuspended:
- return "kSuspended";
- case ScanControllerState::kTerminated:
- return "kTerminated";
- case ScanControllerState::kDestroyed:
- return "kDestroyed";
- default:
- return "UNKNOWN";
- }
-}
-
-std::string ScanControllerImpl::DebugString(ScanResumerState state) {
- switch (state) {
- case ScanResumerState::kInitialized:
- return "kInitialized";
- case ScanResumerState::kSuspended:
- return "kSuspended";
- case ScanResumerState::kResumed:
- return "kResumed";
- default:
- return "UNKNOWN";
- }
-}
-
-AsyncScanRpcRetryingCaller::AsyncScanRpcRetryingCaller(
- std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<Scan> scan, int64_t scanner_id,
- std::shared_ptr<ScanResultCache> results_cache, std::shared_ptr<RawScanResultConsumer> consumer,
- std::shared_ptr<RegionLocation> region_location, nanoseconds scanner_lease_timeout_nanos,
- nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
- nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
- : conn_(conn),
- retry_timer_(retry_timer),
- rpc_client_(rpc_client),
- scan_(scan),
- scanner_id_(scanner_id),
- results_cache_(results_cache),
- consumer_(consumer),
- region_location_(region_location),
- scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos),
- pause_(pause),
- max_retries_(max_retries),
- scan_timeout_nanos_(scan_timeout_nanos),
- rpc_timeout_nanos_(rpc_timeout_nanos),
- start_log_errors_count_(start_log_errors_count),
- promise_(std::make_shared<folly::Promise<bool>>()),
- tries_(1) {
- controller_ = conn_->CreateRpcController();
- start_ns_ = TimeUtil::GetNowNanos();
- max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
- exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
-}
-
-folly::Future<bool> AsyncScanRpcRetryingCaller::Start(
- std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<pb::ScanResponse> open_scan_resp,
- const std::shared_ptr<CellScanner> cell_scanner) {
- OnComplete(controller, open_scan_resp, cell_scanner);
- return promise_->getFuture();
-}
-
-int64_t AsyncScanRpcRetryingCaller::RemainingTimeNs() {
- return scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
-}
-
-void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<pb::ScanResponse> resp,
- const std::shared_ptr<CellScanner> cell_scanner) {
- VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_;
-
- if (controller->Failed()) {
- OnError(controller->exception());
- return;
- }
-
- bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message();
-
- int64_t num_complete_rows_before = results_cache_->num_complete_rows();
- try {
- auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner);
-
- auto results = results_cache_->AddAndGet(raw_results, is_heartbeat);
-
- auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this());
-
- if (results.size() > 0) {
- UpdateNextStartRowWhenError(*results[results.size() - 1]);
- VLOG(5) << "Calling consumer->OnNext()";
- consumer_->OnNext(results, scan_controller);
- } else if (is_heartbeat) {
- consumer_->OnHeartbeat(scan_controller);
- }
-
- ScanControllerState state = scan_controller->Destroy();
- if (state == ScanControllerState::kTerminated) {
- if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
- // we have more results in region but user request to stop the scan, so we need to close the
- // scanner explicitly.
- CloseScanner();
- }
- CompleteNoMoreResults();
- return;
- }
-
- int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before;
- if (state == ScanControllerState::kSuspended) {
- if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) {
- return;
- }
- }
- } catch (const std::runtime_error& e) {
- // We can not retry here. The server has responded normally and the call sequence has been
- // increased so a new scan with the same call sequence will cause an
- // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
- LOG(WARNING) << "Received exception in reading the scan response:" << e.what();
- CompleteWhenError(true);
- return;
- }
-
- CompleteOrNext(resp);
-}
-
-void AsyncScanRpcRetryingCaller::CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp) {
- VLOG(5) << "Scan: CompleteOrNext, scanner_id" << scanner_id_
- << ", response:" << resp->ShortDebugString();
-
- if (resp->has_more_results() && !resp->more_results()) {
- // RS tells us there is no more data for the whole scan
- CompleteNoMoreResults();
- return;
- }
- // TODO: Implement Scan::limit(), and check the limit here
-
- if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
- // TODO: check whether Scan is reversed here
- CompleteWhenNoMoreResultsInRegion();
- return;
- }
- Next();
-}
-
-void AsyncScanRpcRetryingCaller::CompleteExceptionally(bool close_scanner) {
- VLOG(5) << "Scan: CompleteExceptionally";
- results_cache_->Clear();
- if (close_scanner) {
- CloseScanner();
- }
- this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
-}
-
-void AsyncScanRpcRetryingCaller::CompleteNoMoreResults() {
- // In master code, scanners auto-close if we have exhausted the region. It may not be the case
- // in branch-1 code. If this is backported, make sure that the scanner is closed.
- VLOG(5) << "Scan: CompleteNoMoreResults, scanner_id:" << scanner_id_;
- promise_->setValue(false);
-}
-
-void AsyncScanRpcRetryingCaller::CompleteWhenNoMoreResultsInRegion() {
- VLOG(5) << "Scan: CompleteWhenNoMoreResultsInRegion, scanner_id:" << scanner_id_;
- // In master code, scanners auto-close if we have exhausted the region. It may not be the case
- // in branch-1 code. If this is backported, make sure that the scanner is closed.
- if (NoMoreResultsForScan(*scan_, region_location_->region_info())) {
- CompleteNoMoreResults();
- } else {
- CompleteWithNextStartRow(region_location_->region_info().end_key(), true);
- }
-}
-
-void AsyncScanRpcRetryingCaller::CompleteWithNextStartRow(std::string row, bool inclusive) {
- VLOG(5) << "Scan: CompleteWithNextStartRow: region scan is complete, move to next region";
- scan_->SetStartRow(row);
- // TODO: set inclusive if it is reverse scans
- promise_->setValue(true);
-}
-
-void AsyncScanRpcRetryingCaller::UpdateNextStartRowWhenError(const Result& result) {
- next_start_row_when_error_ = optional<std::string>(result.Row());
- include_next_start_row_when_error_ = result.Partial();
-}
-
-void AsyncScanRpcRetryingCaller::CompleteWhenError(bool close_scanner) {
- VLOG(5) << "Scan: CompleteWhenError, scanner_id:" << scanner_id_;
- results_cache_->Clear();
- if (close_scanner) {
- CloseScanner();
- }
- if (next_start_row_when_error_) {
- // TODO: HBASE-17583 adds include start / stop row to the Scan. Once we rebase and implement
- // those options in Scan , we can start using that here.
- scan_->SetStartRow(include_next_start_row_when_error_
- ? *next_start_row_when_error_
- : BytesUtil::CreateClosestRowAfter(*next_start_row_when_error_));
- }
- promise_->setValue(true);
-}
-
-void AsyncScanRpcRetryingCaller::OnError(const folly::exception_wrapper& error) {
- VLOG(5) << "Scan: OnError, scanner_id:" << scanner_id_;
- if (tries_ > start_log_errors_count_ || VLOG_IS_ON(5)) {
- LOG(WARNING) << "Call to " << region_location_->server_name().ShortDebugString()
- << " for scanner id = " << scanner_id_ << " for "
- << region_location_->region_info().ShortDebugString()
- << " failed, , tries = " << tries_ << ", maxAttempts = " << max_attempts_
- << ", timeout = " << TimeUtil::ToMillis(scan_timeout_nanos_).count()
- << " ms, time elapsed = " << TimeUtil::ElapsedMillis(start_ns_) << " ms"
- << error.what().toStdString();
- }
-
- bool scanner_closed = ExceptionUtil::IsScannerClosed(error);
- ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
- exceptions_->push_back(twec);
- if (tries_ >= max_retries_) {
- CompleteExceptionally(!scanner_closed);
- return;
- }
-
- int64_t delay_ns;
- if (scan_timeout_nanos_.count() > 0) {
- int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
- if (max_delay_ns <= 0) {
- CompleteExceptionally(!scanner_closed);
- return;
- }
- delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
- } else {
- delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
- }
-
- if (scanner_closed) {
- CompleteWhenError(false);
- return;
- }
-
- if (ExceptionUtil::IsScannerOutOfOrder(error)) {
- CompleteWhenError(true);
- return;
- }
- if (!ExceptionUtil::ShouldRetry(error)) {
- CompleteExceptionally(true);
- return;
- }
- tries_++;
-
- auto self(shared_from_this());
- conn_->retry_executor()->add([&]() {
- retry_timer_->scheduleTimeoutFn(
- [self]() { self->conn_->cpu_executor()->add([&]() { self->Call(); }); },
- std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
- });
-}
-
-bool AsyncScanRpcRetryingCaller::NoMoreResultsForScan(const Scan& scan,
- const pb::RegionInfo& info) {
- if (BytesUtil::IsEmptyStopRow(info.end_key())) {
- return true;
- }
- if (BytesUtil::IsEmptyStopRow(scan.StopRow())) {
- return false;
- }
- int32_t c = BytesUtil::CompareTo(info.end_key(), scan.StopRow());
- // 1. if our stop row is less than the endKey of the region
- // 2. if our stop row is equal to the endKey of the region and we do not include the stop row
- // for scan.
- return c > 0 ||
- (c == 0 /* && !scan.IncludeStopRow()*/); // TODO: Scans always exclude StopRow for now.
-}
-
-void AsyncScanRpcRetryingCaller::Next() {
- VLOG(5) << "Scan: Next";
- next_call_seq_++;
- tries_ = 1;
- exceptions_->clear();
- start_ns_ = TimeUtil::GetNowNanos();
- Call();
-}
-
-void AsyncScanRpcRetryingCaller::Call() {
- VLOG(5) << "Scan: Call";
- auto self(shared_from_this());
- // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
- // less than the scan timeout. If the server does not respond in time(usually this will not
- // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
- // resending the next request and the only way to fix this is to close the scanner and open a
- // new one.
- int64_t call_timeout_nanos;
- if (scan_timeout_nanos_.count() > 0) {
- int64_t remaining_nanos = scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
- if (remaining_nanos <= 0) {
- CompleteExceptionally(true);
- return;
- }
- call_timeout_nanos = remaining_nanos;
- } else {
- call_timeout_nanos = 0L;
- }
-
- ResetController(controller_, call_timeout_nanos);
-
- auto req =
- RequestConverter::ToScanRequest(scanner_id_, scan_->Caching(), false, next_call_seq_, false);
-
- // do the RPC call
- rpc_client_
- ->AsyncCall(region_location_->server_name().host_name(),
- region_location_->server_name().port(), std::move(req),
- security::User::defaultUser(), "ClientService")
- .via(conn_->cpu_executor().get())
- .then([self, this](const std::unique_ptr<Response>& resp) {
- auto scan_resp = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
- return OnComplete(controller_, scan_resp, resp->cell_scanner());
- })
- .onError([self, this](const folly::exception_wrapper& e) { OnError(e); });
-}
-
-void AsyncScanRpcRetryingCaller::CloseScanner() {
- auto self(shared_from_this());
- ResetController(controller_, rpc_timeout_nanos_.count());
-
- VLOG(5) << "Closing scanner with scanner_id:" << folly::to<std::string>(scanner_id_);
-
- // Do a close scanner RPC. Fire and forget.
- auto req = RequestConverter::ToScanRequest(scanner_id_, 0, true);
- rpc_client_
- ->AsyncCall(region_location_->server_name().host_name(),
- region_location_->server_name().port(), std::move(req),
- security::User::defaultUser(), "ClientService")
- .onError([self, this](const folly::exception_wrapper& e) -> std::unique_ptr<Response> {
- LOG(WARNING) << "Call to " + region_location_->server_name().ShortDebugString() +
- " for closing scanner_id = " + folly::to<std::string>(scanner_id_) +
- " for " + region_location_->region_info().ShortDebugString() +
- " failed, ignore, probably already closed. Exception:" +
- e.what().toStdString();
- return nullptr;
- });
-}
-
-void AsyncScanRpcRetryingCaller::ResetController(std::shared_ptr<HBaseRpcController> controller,
- const int64_t& timeout_nanos) {
- controller->Reset();
- if (timeout_nanos >= 0) {
- controller->set_call_timeout(
- milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_nanos))));
- }
-}
-
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-scan-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.h b/hbase-native-client/core/async-scan-rpc-retrying-caller.h
deleted file mode 100644
index 9555e80..0000000
--- a/hbase-native-client/core/async-scan-rpc-retrying-caller.h
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Conv.h>
-#include <folly/ExceptionWrapper.h>
-#include <folly/Format.h>
-#include <folly/Logging.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/EventBase.h>
-#include <folly/io/async/HHWheelTimer.h>
-
-#include <algorithm>
-#include <chrono>
-#include <functional>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "connection/rpc-client.h"
-#include "core/async-connection.h"
-#include "core/hbase-rpc-controller.h"
-#include "core/raw-scan-result-consumer.h"
-#include "core/region-location.h"
-#include "core/request-converter.h"
-#include "core/response-converter.h"
-#include "core/result.h"
-#include "core/scan-result-cache.h"
-#include "core/scan.h"
-#include "exceptions/exception.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-#include "utils/bytes-util.h"
-#include "utils/connection-util.h"
-#include "utils/optional.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
-
-using std::chrono::nanoseconds;
-using std::chrono::milliseconds;
-
-namespace hbase {
-
-class AsyncScanRpcRetryingCaller;
-
-// The resume method is allowed to be called in another thread so here we also use the
-// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
-// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
-// and when user calls resume method, we will change the state to RESUMED. But the resume method
-// could be called in other thread, and in fact, user could just do this:
-// controller.suspend().resume()
-// This is strange but valid. This means the scan could be resumed before we call the prepare
-// method to do the actual suspend work. So in the resume method, we will check if the state is
-// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
-// method, if the state is RESUMED already, we will just return an let the scan go on.
-// Notice that, the public methods of this class is supposed to be called by upper layer only, and
-// package private methods can only be called within the implementation of
-// AsyncScanSingleRegionRpcRetryingCaller.
-// TODO: Unlike the Java counter part, we do not do scan lease renewals in a background thread.
-// Since there is also no async scan API exposed to the users, only ScanResultConsumer is the
-// AsyncTableResultScanner which will only pause the scanner if the result cache is maxed. The
-// application is expected to consume the scan results before the scanner lease timeout.
-class ScanResumerImpl : public ScanResumer {
- public:
- explicit ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
-
- virtual ~ScanResumerImpl() = default;
-
- /**
- * Resume the scan. You are free to call it multiple time but only the first call will take
- * effect.
- */
- void Resume() override;
-
- // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
- // for more details.
- bool Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows);
-
- private:
- // INITIALIZED -> SUSPENDED -> RESUMED
- // INITIALIZED -> RESUMED
- ScanResumerState state_ = ScanResumerState::kInitialized;
- std::mutex mutex_;
- std::shared_ptr<pb::ScanResponse> resp_ = nullptr;
- int64_t num_complete_rows_ = 0;
- std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
-};
-
-class ScanControllerImpl : public ScanController {
- public:
- virtual ~ScanControllerImpl() = default;
-
- explicit ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
-
- /**
- * Suspend the scan.
- * <p>
- * This means we will stop fetching data in background, i.e., will not call onNext any more
- * before you resume the scan.
- * @return A resumer used to resume the scan later.
- */
- std::shared_ptr<ScanResumer> Suspend();
-
- /**
- * Terminate the scan.
- * <p>
- * This is useful when you have got enough results and want to stop the scan in onNext method,
- * or you want to stop the scan in onHeartbeat method because it has spent too many time.
- */
- void Terminate();
-
- // return the current state, and set the state to DESTROYED.
- ScanControllerState Destroy();
-
- std::shared_ptr<ScanResumerImpl> resumer() { return resumer_; }
-
- private:
- void PreCheck();
-
- std::string DebugString(ScanControllerState state);
-
- std::string DebugString(ScanResumerState state);
-
- private:
- // Make sure the methods are only called in this thread.
- std::thread::id caller_thread_id_ = std::this_thread::get_id();
- // INITIALIZED -> SUSPENDED -> DESTROYED
- // INITIALIZED -> TERMINATED -> DESTROYED
- // INITIALIZED -> DESTROYED
- // If the state is incorrect we will throw IllegalStateException.
- ScanControllerState state_ = ScanControllerState::kInitialized;
- std::shared_ptr<ScanResumerImpl> resumer_ = nullptr;
- std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
-};
-
-class AsyncScanRpcRetryingCaller : public std::enable_shared_from_this<AsyncScanRpcRetryingCaller> {
- public:
- AsyncScanRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
- std::shared_ptr<folly::HHWheelTimer> retry_timer,
- std::shared_ptr<hbase::RpcClient> rpc_client,
- std::shared_ptr<Scan> scan, int64_t scanner_id,
- std::shared_ptr<ScanResultCache> results_cache,
- std::shared_ptr<RawScanResultConsumer> consumer,
- std::shared_ptr<RegionLocation> region_location,
- nanoseconds scanner_lease_timeout_nanos, nanoseconds pause,
- uint32_t max_retries, nanoseconds scan_timeout_nanos,
- nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
-
- folly::Future<bool> Start(std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<pb::ScanResponse> open_scan_resp,
- const std::shared_ptr<CellScanner> cell_scanner);
-
- private:
- int64_t RemainingTimeNs();
- void OnComplete(std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<pb::ScanResponse> resp,
- const std::shared_ptr<CellScanner> cell_scanner);
-
- void CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp);
-
- void CompleteExceptionally(bool close_scanner);
-
- void CompleteNoMoreResults();
-
- void CompleteWhenNoMoreResultsInRegion();
-
- void CompleteWithNextStartRow(std::string row, bool inclusive);
-
- void UpdateNextStartRowWhenError(const Result& result);
-
- void CompleteWhenError(bool close_scanner);
-
- void OnError(const folly::exception_wrapper& e);
-
- bool NoMoreResultsForScan(const Scan& scan, const pb::RegionInfo& info);
-
- void Next();
-
- void Call();
-
- void CloseScanner();
-
- void ResetController(std::shared_ptr<HBaseRpcController> controller,
- const int64_t& timeout_nanos);
-
- private:
- std::shared_ptr<AsyncConnection> conn_;
- std::shared_ptr<folly::HHWheelTimer> retry_timer_;
- std::shared_ptr<hbase::RpcClient> rpc_client_;
- std::shared_ptr<Scan> scan_;
- int64_t scanner_id_;
- std::shared_ptr<ScanResultCache> results_cache_;
- std::shared_ptr<RawScanResultConsumer> consumer_;
- std::shared_ptr<RegionLocation> region_location_;
- nanoseconds scanner_lease_timeout_nanos_;
- nanoseconds pause_;
- uint32_t max_retries_;
- nanoseconds scan_timeout_nanos_;
- nanoseconds rpc_timeout_nanos_;
- uint32_t start_log_errors_count_;
- std::shared_ptr<folly::Promise<bool>> promise_;
- std::shared_ptr<HBaseRpcController> controller_;
- optional<std::string> next_start_row_when_error_ = optional<std::string>();
- bool include_next_start_row_when_error_ = true;
- uint64_t start_ns_;
- uint32_t tries_;
- std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
- uint32_t max_attempts_;
- int64_t next_call_seq_ = -1L;
-
- friend class ScanResumerImpl;
- friend class ScanControllerImpl;
-};
-
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-table-result-scanner.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-table-result-scanner.cc b/hbase-native-client/core/async-table-result-scanner.cc
deleted file mode 100644
index b1935ae..0000000
--- a/hbase-native-client/core/async-table-result-scanner.cc
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/async-table-result-scanner.h"
-
-#include <vector>
-
-namespace hbase {
-AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size)
- : max_cache_size_(max_cache_size) {
- closed_ = false;
- cache_size_ = 0;
-}
-
-AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); }
-
-void AsyncTableResultScanner::Close() {
- std::unique_lock<std::mutex> mlock(mutex_);
- closed_ = true;
- while (!queue_.empty()) {
- queue_.pop();
- }
- cache_size_ = 0;
- if (resumer_ != nullptr) {
- resumer_->Resume();
- }
- cond_.notify_all();
-}
-
-std::shared_ptr<Result> AsyncTableResultScanner::Next() {
- VLOG(5) << "AsyncTableResultScanner: Next()";
-
- std::shared_ptr<Result> result = nullptr;
- std::shared_ptr<ScanResumer> local_resumer = nullptr;
- {
- std::unique_lock<std::mutex> mlock(mutex_);
- while (queue_.empty()) {
- if (closed_) {
- return nullptr;
- }
- if (error_) {
- throw error_;
- }
- cond_.wait(mlock);
- }
- result = queue_.front();
- queue_.pop();
-
- cache_size_ -= EstimatedSizeWithSharedPtr(result);
- if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) {
- VLOG(1) << std::this_thread::get_id() << " resume scan prefetching";
- local_resumer = resumer_;
- resumer_ = nullptr;
- }
- }
-
- // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that
- // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC
- // in the same event thread before returning from the previous call. This seems like the
- // wrong thing to do(™), but we cannot fix that now. Since the call back can end up calling
- // this::OnNext(), we should unlock the mutex.
- if (local_resumer != nullptr) {
- local_resumer->Resume();
- }
- return result;
-}
-
-void AsyncTableResultScanner::AddToCache(const std::vector<std::shared_ptr<Result>> &results) {
- VLOG(5) << "AsyncTableResultScanner: AddToCache()";
- for (const auto r : results) {
- queue_.push(r);
- cache_size_ += EstimatedSizeWithSharedPtr(r);
- }
-}
-
-template <typename T>
-inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr<T> t) {
- return t->EstimatedSize() + sizeof(std::shared_ptr<T>);
-}
-
-void AsyncTableResultScanner::OnNext(const std::vector<std::shared_ptr<Result>> &results,
- std::shared_ptr<ScanController> controller) {
- VLOG(5) << "AsyncTableResultScanner: OnNext()";
- {
- std::unique_lock<std::mutex> mlock(mutex_);
- if (closed_) {
- controller->Terminate();
- return;
- }
- AddToCache(results);
-
- if (cache_size_ >= max_cache_size_) {
- StopPrefetch(controller);
- }
- }
- cond_.notify_all();
-}
-
-void AsyncTableResultScanner::StopPrefetch(std::shared_ptr<ScanController> controller) {
- VLOG(1) << std::this_thread::get_id()
- << ": stop prefetching when scanning as the cache size " +
- folly::to<std::string>(cache_size_) + " is greater than the max_cache_size " +
- folly::to<std::string>(max_cache_size_);
-
- resumer_ = controller->Suspend();
- num_prefetch_stopped_++;
-}
-
-/**
- * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
- * onNext.
- * <p>
- * This method give you a chance to terminate a slow scan operation.
- * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
- * instance is only valid within the scope of onHeartbeat method. You can only call its
- * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
- */
-void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr<ScanController> controller) {
- std::unique_lock<std::mutex> mlock(mutex_);
- if (closed_) {
- controller->Terminate();
- }
-}
-
-/**
- * Indicate that we hit an unrecoverable error and the scan operation is terminated.
- * <p>
- * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
- */
-void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) {
- LOG(WARNING) << "Scanner received error" << error.what();
- std::unique_lock<std::mutex> mlock(mutex_);
- error_ = error;
- cond_.notify_all();
-}
-
-/**
- * Indicate that the scan operation is completed normally.
- */
-void AsyncTableResultScanner::OnComplete() {
- std::unique_lock<std::mutex> mlock(mutex_);
- closed_ = true;
- cond_.notify_all();
-}
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-table-result-scanner.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-table-result-scanner.h b/hbase-native-client/core/async-table-result-scanner.h
deleted file mode 100644
index dcdf871..0000000
--- a/hbase-native-client/core/async-table-result-scanner.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Conv.h>
-#include <folly/ExceptionWrapper.h>
-#include <folly/Logging.h>
-#include <chrono>
-#include <condition_variable>
-#include <memory>
-#include <mutex>
-#include <queue>
-#include <string>
-#include <vector>
-
-#include "core/raw-scan-result-consumer.h"
-#include "core/result-scanner.h"
-#include "core/result.h"
-#include "if/Client.pb.h"
-#include "if/HBase.pb.h"
-
-namespace hbase {
-
-class AsyncTableResultScanner : public ResultScanner, public RawScanResultConsumer {
- public:
- explicit AsyncTableResultScanner(int64_t max_cache_size);
-
- virtual ~AsyncTableResultScanner();
-
- void Close() override;
-
- std::shared_ptr<Result> Next() override;
-
- void OnNext(const std::vector<std::shared_ptr<Result>> &results,
- std::shared_ptr<ScanController> controller) override;
-
- /**
- * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
- * onNext.
- * <p>
- * This method give you a chance to terminate a slow scan operation.
- * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
- * instance is only valid within the scope of onHeartbeat method. You can only call its
- * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
- */
- void OnHeartbeat(std::shared_ptr<ScanController> controller) override;
-
- /**
- * Indicate that we hit an unrecoverable error and the scan operation is terminated.
- * <p>
- * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
- */
- void OnError(const folly::exception_wrapper &error) override;
-
- /**
- * Indicate that the scan operation is completed normally.
- */
- void OnComplete() override;
-
- // For testing
- uint32_t num_prefetch_stopped() { return num_prefetch_stopped_; }
-
- private:
- void AddToCache(const std::vector<std::shared_ptr<Result>> &results);
-
- template <typename T>
- inline size_t EstimatedSizeWithSharedPtr(std::shared_ptr<T> t);
-
- void StopPrefetch(std::shared_ptr<ScanController> controller);
-
- private:
- std::queue<std::shared_ptr<Result>> queue_;
- std::mutex mutex_;
- std::condition_variable cond_;
- folly::exception_wrapper error_;
- int64_t cache_size_;
- int64_t max_cache_size_;
- bool closed_;
- std::shared_ptr<ScanResumer> resumer_ = nullptr;
- uint32_t num_prefetch_stopped_ = 0;
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/cell-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/cell-test.cc b/hbase-native-client/core/cell-test.cc
deleted file mode 100644
index 4611473..0000000
--- a/hbase-native-client/core/cell-test.cc
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/cell.h"
-
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-#include <memory>
-
-using hbase::Cell;
-using hbase::CellType;
-
-TEST(CellTest, Constructor) {
- std::string row = "row-value";
- std::string family = "family-value";
- std::string column = "column-value";
- std::string value = "value-value";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
- CellType cell_type = CellType::PUT;
-
- Cell cell{row, family, column, timestamp, value, cell_type};
-
- EXPECT_EQ(row, cell.Row());
- EXPECT_EQ(family, cell.Family());
- EXPECT_EQ(column, cell.Qualifier());
- EXPECT_EQ(value, cell.Value());
- EXPECT_EQ(timestamp, cell.Timestamp());
- EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CopyConstructor) {
- std::string row = "row-value";
- std::string family = "family-value";
- std::string column = "column-value";
- std::string value = "value-value";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
- CellType cell_type = CellType::PUT;
-
- auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
- Cell cell2{*cell};
- cell = nullptr;
-
- EXPECT_EQ(row, cell2.Row());
- EXPECT_EQ(family, cell2.Family());
- EXPECT_EQ(column, cell2.Qualifier());
- EXPECT_EQ(value, cell2.Value());
- EXPECT_EQ(timestamp, cell2.Timestamp());
- EXPECT_EQ(cell_type, cell2.Type());
-}
-
-TEST(CellTest, CopyAssignment) {
- std::string row = "row-value";
- std::string family = "family-value";
- std::string column = "column-value";
- std::string value = "value-value";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
- CellType cell_type = CellType::PUT;
-
- auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
- Cell cell2 = *cell;
- cell = nullptr;
-
- EXPECT_EQ(row, cell2.Row());
- EXPECT_EQ(family, cell2.Family());
- EXPECT_EQ(column, cell2.Qualifier());
- EXPECT_EQ(value, cell2.Value());
- EXPECT_EQ(timestamp, cell2.Timestamp());
- EXPECT_EQ(cell_type, cell2.Type());
-}
-
-TEST(CellTest, CellRowTest) {
- std::string row = "only-row";
- std::string family = "D";
- std::string column = "";
- std::string value = "";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
- CellType cell_type = CellType::PUT;
- Cell cell{row, family, column, timestamp, value, cell_type};
-
- EXPECT_EQ(row, cell.Row());
- EXPECT_EQ(family, cell.Family());
- EXPECT_EQ(column, cell.Qualifier());
- EXPECT_EQ(value, cell.Value());
- EXPECT_EQ(timestamp, cell.Timestamp());
- EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CellRowFamilyTest) {
- std::string row = "only-row";
- std::string family = "only-family";
- std::string column = "";
- std::string value = "";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
- CellType cell_type = CellType::PUT;
- Cell cell{row, family, column, timestamp, value, cell_type};
-
- EXPECT_EQ(row, cell.Row());
- EXPECT_EQ(family, cell.Family());
- EXPECT_EQ(column, cell.Qualifier());
- EXPECT_EQ(value, cell.Value());
- EXPECT_EQ(timestamp, cell.Timestamp());
- EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CellRowFamilyValueTest) {
- std::string row = "only-row";
- std::string family = "only-family";
- std::string column = "";
- std::string value = "only-value";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
- CellType cell_type = CellType::PUT;
-
- Cell cell{row, family, column, timestamp, value, cell_type};
-
- EXPECT_EQ(row, cell.Row());
- EXPECT_EQ(family, cell.Family());
- EXPECT_EQ(column, cell.Qualifier());
- EXPECT_EQ(value, cell.Value());
- EXPECT_EQ(timestamp, cell.Timestamp());
- EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CellRowFamilyColumnValueTest) {
- std::string row = "only-row";
- std::string family = "only-family";
- std::string column = "only-column";
- std::string value = "only-value";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
- CellType cell_type = CellType::PUT;
- Cell cell{row, family, column, timestamp, value, cell_type};
-
- EXPECT_EQ(row, cell.Row());
- EXPECT_EQ(family, cell.Family());
- EXPECT_EQ(column, cell.Qualifier());
- EXPECT_EQ(value, cell.Value());
- EXPECT_EQ(timestamp, cell.Timestamp());
- EXPECT_EQ(cell_type, cell.Type());
-}
-
-TEST(CellTest, CellDebugString) {
- CellType cell_type = CellType::PUT;
- std::string row = "row";
- std::string family = "family";
- std::string column = "column";
- std::string value = "value";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
-
- Cell cell{row, family, column, timestamp, value, cell_type};
- LOG(INFO) << cell.DebugString();
- EXPECT_EQ("row/family:column/LATEST_TIMESTAMP/PUT/vlen=5/seqid=0", cell.DebugString());
-
- Cell cell2{row, "", column, 42, value, CellType::DELETE};
- LOG(INFO) << cell2.DebugString();
- EXPECT_EQ("row/column/42/DELETE/vlen=5/seqid=0", cell2.DebugString());
-}
-
-TEST(CellTest, CellEstimatedSize) {
- CellType cell_type = CellType::PUT;
- int64_t timestamp = std::numeric_limits<int64_t>::max();
-
- Cell empty{"a", "a", "", timestamp, "", cell_type};
- Cell cell1{"aa", "a", "", timestamp, "", cell_type};
- Cell cell2{"a", "aa", "", timestamp, "", cell_type};
- Cell cell3{"a", "a", "a", timestamp, "", cell_type};
- Cell cell4{"a", "a", "", timestamp, "a", cell_type};
- Cell cell5{"a", "a", "", timestamp, "a", CellType::DELETE};
- Cell cell6{"aaaaaa", "a", "", timestamp, "a", cell_type};
-
- LOG(INFO) << empty.EstimatedSize();
- LOG(INFO) << cell1.EstimatedSize();
-
- EXPECT_TRUE(empty.EstimatedSize() > sizeof(Cell));
- EXPECT_TRUE(cell1.EstimatedSize() > empty.EstimatedSize());
- EXPECT_EQ(cell1.EstimatedSize(), cell2.EstimatedSize());
- EXPECT_EQ(cell2.EstimatedSize(), cell3.EstimatedSize());
- EXPECT_EQ(cell3.EstimatedSize(), cell4.EstimatedSize());
- EXPECT_EQ(cell4.EstimatedSize(), cell5.EstimatedSize());
- EXPECT_TRUE(cell6.EstimatedSize() > cell1.EstimatedSize());
-}