You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/04/11 13:15:36 UTC
hbase git commit: HBASE-17726 [C++] Move implementation from header
to cc for request retry
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 66f8f36ec -> 8aa8a9251
HBASE-17726 [C++] Move implementation from header to cc for request retry
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8aa8a925
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8aa8a925
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8aa8a925
Branch: refs/heads/HBASE-14850
Commit: 8aa8a92519bd93e403ea45863490eafbf25e7eb9
Parents: 66f8f36
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Apr 11 06:14:41 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Apr 11 06:14:41 2017 -0700
----------------------------------------------------------------------
hbase-native-client/core/BUCK | 7 +-
hbase-native-client/core/async-connection.h | 10 +-
hbase-native-client/core/async-region-locator.h | 1 +
.../core/async-rpc-retrying-caller.cc | 197 ++++++++++++++++++-
.../core/async-rpc-retrying-caller.h | 168 +---------------
.../core/async-rpc-retrying-test.cc | 30 ++-
hbase-native-client/core/get-test.cc | 2 +-
hbase-native-client/core/mutation.cc | 4 +-
hbase-native-client/core/mutation.h | 2 +-
hbase-native-client/core/region-request.h | 4 +-
hbase-native-client/core/request-converter.h | 4 +-
hbase-native-client/core/response-converter.cc | 42 ++--
12 files changed, 265 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 30c3390..412ee3b 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -21,6 +21,8 @@ cxx_library(
exported_headers=[
"async-connection.h",
"async-region-locator.h",
+ "async-rpc-retrying-caller-factory.h",
+ "async-rpc-retrying-caller.h",
"client.h",
"cell.h",
"hbase-macros.h",
@@ -42,8 +44,6 @@ cxx_library(
"response-converter.h",
"table.h",
"raw-async-table.h",
- "async-rpc-retrying-caller-factory.h",
- "async-rpc-retrying-caller.h",
"hbase-rpc-controller.h",
"time-range.h",
"zk-util.h",
@@ -56,8 +56,11 @@ cxx_library(
],
srcs=[
"async-connection.cc",
+ "async-rpc-retrying-caller-factory.cc",
+ "async-rpc-retrying-caller.cc",
"cell.cc",
"client.cc",
+ "hbase-rpc-controller.cc",
"keyvalue-codec.cc",
"location-cache.cc",
"meta-utils.cc",
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-connection.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h
index 6a61124..ff11577 100644
--- a/hbase-native-client/core/async-connection.h
+++ b/hbase-native-client/core/async-connection.h
@@ -26,12 +26,12 @@
#include <memory>
#include <string>
+#include <utility>
#include "connection/rpc-client.h"
#include "core/async-region-locator.h"
#include "core/configuration.h"
#include "core/connection-configuration.h"
-#include "core/connection-configuration.h"
#include "core/hbase-configuration-loader.h"
#include "core/hbase-rpc-controller.h"
#include "core/keyvalue-codec.h"
@@ -45,8 +45,8 @@ class AsyncRpcRetryingCallerFactory;
class AsyncConnection {
public:
- AsyncConnection(){};
- virtual ~AsyncConnection(){};
+ AsyncConnection() {}
+ virtual ~AsyncConnection() {}
virtual std::shared_ptr<Configuration> conf() = 0;
virtual std::shared_ptr<ConnectionConfiguration> connection_conf() = 0;
virtual std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() = 0;
@@ -82,7 +82,7 @@ class AsyncConnectionImpl : public AsyncConnection,
return std::make_shared<HBaseRpcController>();
}
- virtual void Close() override;
+ void Close() override;
protected:
AsyncConnectionImpl() {}
@@ -105,7 +105,7 @@ class AsyncConnectionImpl : public AsyncConnection,
bool is_closed_ = false;
private:
- AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {}
+ explicit AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {}
void Init();
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-region-locator.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-region-locator.h b/hbase-native-client/core/async-region-locator.h
index b0019e0..c606dcb 100644
--- a/hbase-native-client/core/async-region-locator.h
+++ b/hbase-native-client/core/async-region-locator.h
@@ -20,6 +20,7 @@
#pragma once
#include <folly/futures/Future.h>
+#include <memory>
#include <string>
#include "core/region-location.h"
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index 743b6bb..965a44b 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -19,4 +19,199 @@
#include "core/async-rpc-retrying-caller.h"
-namespace hbase {} /* namespace hbase */
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/futures/Unit.h>
+
+#include "connection/rpc-client.h"
+#include "core/async-connection.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/region-location.h"
+#include "core/result.h"
+#include "exceptions/exception.h"
+#include "if/HBase.pb.h"
+#include "utils/connection-util.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+namespace hbase {
+
+template <typename RESP>
+AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller(
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<hbase::pb::TableName> table_name,
+ const std::string& row, RegionLocateType locate_type, Callable<RESP> callable,
+ nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos,
+ nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+ : conn_(conn),
+ table_name_(table_name),
+ row_(row),
+ locate_type_(locate_type),
+ callable_(callable),
+ pause_(pause),
+ max_retries_(max_retries),
+ operation_timeout_nanos_(operation_timeout_nanos),
+ rpc_timeout_nanos_(rpc_timeout_nanos),
+ start_log_errors_count_(start_log_errors_count),
+ promise_(std::make_shared<folly::Promise<RESP>>()),
+ tries_(1) {
+ controller_ = conn_->CreateRpcController();
+ start_ns_ = TimeUtil::GetNowNanos();
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+ exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+ retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_);
+}
+
+template <typename RESP>
+AsyncSingleRequestRpcRetryingCaller<RESP>::~AsyncSingleRequestRpcRetryingCaller() {}
+
+template <typename RESP>
+folly::Future<RESP> AsyncSingleRequestRpcRetryingCaller<RESP>::Call() {
+ auto f = promise_->getFuture();
+ LocateThenCall();
+ return f;
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() {
+ int64_t locate_timeout_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ locate_timeout_ns = RemainingTimeNs();
+ if (locate_timeout_ns <= 0) {
+ CompleteExceptionally();
+ return;
+ }
+ } else {
+ locate_timeout_ns = -1L;
+ }
+
+ conn_->region_locator()
+ ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
+ .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
+ .onError([this](const std::exception& e) {
+ OnError(e,
+ [this]() -> std::string {
+ return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
+ table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) +
+ ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " +
+ TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " +
+ TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
+ },
+ [](const std::exception& error) {});
+ });
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
+ const std::exception& error, Supplier<std::string> err_msg,
+ Consumer<std::exception> update_cached_location) {
+ ThrowableWithExtraContext twec(std::make_shared<std::exception>(error), TimeUtil::GetNowNanos());
+ exceptions_->push_back(twec);
+ if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) || tries_ >= max_retries_) {
+ CompleteExceptionally();
+ return;
+ }
+
+ int64_t delay_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+ if (max_delay_ns <= 0) {
+ CompleteExceptionally();
+ return;
+ }
+ delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
+ } else {
+ delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
+ }
+ update_cached_location(error);
+ tries_++;
+ retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); },
+ milliseconds(TimeUtil::ToMillis(delay_ns)));
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) {
+ int64_t call_timeout_ns;
+ if (operation_timeout_nanos_.count() > 0) {
+ call_timeout_ns = this->RemainingTimeNs();
+ if (call_timeout_ns <= 0) {
+ this->CompleteExceptionally();
+ return;
+ }
+ call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
+ } else {
+ call_timeout_ns = rpc_timeout_nanos_.count();
+ }
+
+ std::shared_ptr<RpcClient> rpc_client;
+ try {
+ // TODO: There is no connection attempt happening here, no need to try-catch.
+ rpc_client = conn_->rpc_client();
+ } catch (const IOException& e) {
+ OnError(e,
+ [&, this]() -> std::string {
+ return "Get async rpc_client to " +
+ folly::sformat("{0}:{1}", loc.server_name().host_name(),
+ loc.server_name().port()) +
+ " for '" + row_ + "' in " + loc.DebugString() + " of " +
+ table_name_->namespace_() + "::" + table_name_->qualifier() +
+ " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
+ std::to_string(max_attempts_) + ", timeout = " +
+ TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
+ " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
+ },
+ [&, this](const std::exception& error) {
+ conn_->region_locator()->UpdateCachedLocation(loc, error);
+ });
+ return;
+ }
+
+ ResetController(controller_, call_timeout_ns);
+
+ callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client)
+ .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
+ .onError([&, this](const std::exception& e) {
+ OnError(e,
+ [&, this]() -> std::string {
+ return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(),
+ loc.server_name().port()) +
+ " for '" + row_ + "' in " + loc.DebugString() + " of " +
+ table_name_->namespace_() + "::" + table_name_->qualifier() +
+ " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
+ std::to_string(max_attempts_) + ", timeout = " +
+ TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
+ " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
+ " ms";
+ },
+ [&, this](const std::exception& error) {
+ conn_->region_locator()->UpdateCachedLocation(loc, error);
+ });
+ return;
+ });
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::CompleteExceptionally() {
+ this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+}
+
+template <typename RESP>
+int64_t AsyncSingleRequestRpcRetryingCaller<RESP>::RemainingTimeNs() {
+ return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+template <typename RESP>
+void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController(
+ std::shared_ptr<HBaseRpcController> controller, const int64_t& timeout_ns) {
+ controller->Reset();
+ if (timeout_ns >= 0) {
+ controller->set_call_timeout(
+ milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns))));
+ }
+}
+
+// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the
+// templetized
+// class definitions.
+template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
+template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
index 6503301..6006388 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller.h
@@ -18,11 +18,10 @@
*/
#pragma once
-#include <folly/Format.h>
-#include <folly/Logging.h>
#include <folly/futures/Future.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/HHWheelTimer.h>
+
#include <algorithm>
#include <chrono>
#include <functional>
@@ -31,15 +30,11 @@
#include <type_traits>
#include <utility>
#include <vector>
-#include "connection/rpc-client.h"
#include "core/async-connection.h"
#include "core/hbase-rpc-controller.h"
#include "core/region-location.h"
#include "exceptions/exception.h"
#include "if/HBase.pb.h"
-#include "utils/connection-util.h"
-#include "utils/sys-util.h"
-#include "utils/time-util.h"
using std::chrono::nanoseconds;
using std::chrono::milliseconds;
@@ -80,168 +75,26 @@ class AsyncSingleRequestRpcRetryingCaller {
Callable<RESP> callable, nanoseconds pause,
uint32_t max_retries, nanoseconds operation_timeout_nanos,
nanoseconds rpc_timeout_nanos,
- uint32_t start_log_errors_count)
- : conn_(conn),
- table_name_(table_name),
- row_(row),
- locate_type_(locate_type),
- callable_(callable),
- pause_(pause),
- max_retries_(max_retries),
- operation_timeout_nanos_(operation_timeout_nanos),
- rpc_timeout_nanos_(rpc_timeout_nanos),
- start_log_errors_count_(start_log_errors_count),
- promise_(std::make_shared<folly::Promise<RESP>>()),
- tries_(1) {
- controller_ = conn_->CreateRpcController();
- start_ns_ = TimeUtil::GetNowNanos();
- max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
- exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
- retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_);
- }
+ uint32_t start_log_errors_count);
- virtual ~AsyncSingleRequestRpcRetryingCaller() {}
+ virtual ~AsyncSingleRequestRpcRetryingCaller();
- folly::Future<RESP> Call() {
- auto f = promise_->getFuture();
- LocateThenCall();
- return f;
- }
+ folly::Future<RESP> Call();
private:
- void LocateThenCall() {
- int64_t locate_timeout_ns;
- if (operation_timeout_nanos_.count() > 0) {
- locate_timeout_ns = RemainingTimeNs();
- if (locate_timeout_ns <= 0) {
- CompleteExceptionally();
- return;
- }
- } else {
- locate_timeout_ns = -1L;
- }
-
- conn_->region_locator()
- ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
- .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
- .onError([this](const std::exception& e) {
- OnError(e,
- [this]() -> std::string {
- return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
- table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) +
- ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " +
- TimeUtil::ToMillisStr(operation_timeout_nanos_) +
- " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
- " ms";
- },
- [](const std::exception& error) {});
- });
- }
+ void LocateThenCall();
void OnError(const std::exception& error, Supplier<std::string> err_msg,
- Consumer<std::exception> update_cached_location) {
- ThrowableWithExtraContext twec(std::make_shared<std::exception>(error),
- TimeUtil::GetNowNanos());
- exceptions_->push_back(twec);
- if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) ||
- tries_ >= max_retries_) {
- CompleteExceptionally();
- return;
- }
-
- int64_t delay_ns;
- if (operation_timeout_nanos_.count() > 0) {
- int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
- if (max_delay_ns <= 0) {
- CompleteExceptionally();
- return;
- }
- delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
- } else {
- delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
- }
- update_cached_location(error);
- tries_++;
- retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); },
- milliseconds(TimeUtil::ToMillis(delay_ns)));
- }
-
- void Call(const RegionLocation& loc) {
- int64_t call_timeout_ns;
- if (operation_timeout_nanos_.count() > 0) {
- call_timeout_ns = this->RemainingTimeNs();
- if (call_timeout_ns <= 0) {
- this->CompleteExceptionally();
- return;
- }
- call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
- } else {
- call_timeout_ns = rpc_timeout_nanos_.count();
- }
+ Consumer<std::exception> update_cached_location);
- std::shared_ptr<RpcClient> rpc_client;
- try {
- // TODO: There is no connection attempt happening here, no need to try-catch.
- rpc_client = conn_->rpc_client();
- } catch (const IOException& e) {
- OnError(e,
- [&, this]() -> std::string {
- return "Get async rpc_client to " +
- folly::sformat("{0}:{1}", loc.server_name().host_name(),
- loc.server_name().port()) +
- " for '" + row_ + "' in " + loc.DebugString() + " of " +
- table_name_->namespace_() + "::" + table_name_->qualifier() +
- " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
- std::to_string(max_attempts_) + ", timeout = " +
- TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
- " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
- },
- [&, this](const std::exception& error) {
- conn_->region_locator()->UpdateCachedLocation(loc, error);
- });
- return;
- }
+ void Call(const RegionLocation& loc);
- ResetController(controller_, call_timeout_ns);
+ void CompleteExceptionally();
- callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client)
- .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
- .onError([&, this](const std::exception& e) {
- OnError(e,
- [&, this]() -> std::string {
- return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(),
- loc.server_name().port()) +
- " for '" + row_ + "' in " + loc.DebugString() + " of " +
- table_name_->namespace_() + "::" + table_name_->qualifier() +
- " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
- std::to_string(max_attempts_) + ", timeout = " +
- TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
- " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
- " ms";
- },
- [&, this](const std::exception& error) {
- conn_->region_locator()->UpdateCachedLocation(loc, error);
- });
- return;
- });
- }
-
- void CompleteExceptionally() {
- this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
- }
-
- int64_t RemainingTimeNs() {
- return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
- }
+ int64_t RemainingTimeNs();
static void ResetController(std::shared_ptr<HBaseRpcController> controller,
- const int64_t& timeout_ns) {
- controller->Reset();
- if (timeout_ns >= 0) {
- controller->set_call_timeout(
- milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns))));
- }
- }
+ const int64_t& timeout_ns);
private:
folly::HHWheelTimer::UniquePtr retry_timer_;
@@ -263,5 +116,4 @@ class AsyncSingleRequestRpcRetryingCaller {
uint32_t max_attempts_;
folly::EventBase event_base_;
};
-
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index 3ed6866..4956972 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -47,11 +47,23 @@
#include "test-util/test-util.h"
#include "utils/time-util.h"
-using namespace google::protobuf;
-using namespace hbase;
-using namespace hbase::pb;
-using namespace std::placeholders;
-using namespace testing;
+using hbase::AsyncRpcRetryingCallerFactory;
+using hbase::AsyncConnection;
+using hbase::AsyncRegionLocator;
+using hbase::ConnectionConfiguration;
+using hbase::Configuration;
+using hbase::HBaseRpcController;
+using hbase::RegionLocation;
+using hbase::RegionLocateType;
+using hbase::RpcClient;
+using hbase::RequestConverter;
+using hbase::ResponseConverter;
+using hbase::ReqConverter;
+using hbase::RespConverter;
+using hbase::Put;
+using hbase::TimeUtil;
+using hbase::Client;
+
using ::testing::Return;
using ::testing::_;
using std::chrono::nanoseconds;
@@ -62,10 +74,10 @@ class MockAsyncRegionLocator : public AsyncRegionLocator {
: region_location_(region_location) {}
~MockAsyncRegionLocator() = default;
- folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(const hbase::pb::TableName&,
- const std::string&,
- const RegionLocateType,
- const int64_t) override {
+ folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName&,
+ const std::string&,
+ const RegionLocateType,
+ const int64_t) override {
folly::Promise<std::shared_ptr<RegionLocation>> promise;
promise.setValue(region_location_);
return promise.getFuture();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/get-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/get-test.cc b/hbase-native-client/core/get-test.cc
index 6127e23..4a44a26 100644
--- a/hbase-native-client/core/get-test.cc
+++ b/hbase-native-client/core/get-test.cc
@@ -17,8 +17,8 @@
*
*/
-#include "core/cell.h"
#include "core/get.h"
+#include "core/cell.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/mutation.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/mutation.cc b/hbase-native-client/core/mutation.cc
index ab33105..7182202 100644
--- a/hbase-native-client/core/mutation.cc
+++ b/hbase-native-client/core/mutation.cc
@@ -26,8 +26,8 @@
namespace hbase {
-Mutation::Mutation(const std::string &row) : Row(row) { }
-Mutation::Mutation(const std::string &row, int64_t timestamp) : Row(row), timestamp_(timestamp) { }
+Mutation::Mutation(const std::string &row) : Row(row) {}
+Mutation::Mutation(const std::string &row, int64_t timestamp) : Row(row), timestamp_(timestamp) {}
Mutation::Mutation(const Mutation &mutation) {
row_ = mutation.row_;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/mutation.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/mutation.h b/hbase-native-client/core/mutation.h
index 5e0381b..496891e 100644
--- a/hbase-native-client/core/mutation.h
+++ b/hbase-native-client/core/mutation.h
@@ -31,7 +31,7 @@
namespace hbase {
-class Mutation: public Row {
+class Mutation : public Row {
public:
/**
* Constructors
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/region-request.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-request.h b/hbase-native-client/core/region-request.h
index 6f29d44..7ce7c96 100644
--- a/hbase-native-client/core/region-request.h
+++ b/hbase-native-client/core/region-request.h
@@ -34,9 +34,7 @@ class RegionRequest {
explicit RegionRequest(const std::shared_ptr<hbase::RegionLocation> ®ion_loc)
: region_loc_(region_loc) {}
~RegionRequest() {}
- void AddAction(std::shared_ptr<Action> action) {
- actions_.push_back(action);
- }
+ void AddAction(std::shared_ptr<Action> action) { actions_.push_back(action); }
std::shared_ptr<hbase::RegionLocation> region_location() const { return region_loc_; }
const ActionList &actions() const { return actions_; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/request-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h
index ff6b290..6861604 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -26,11 +26,11 @@
#include "core/action.h"
#include "core/cell.h"
#include "core/get.h"
+#include "core/mutation.h"
+#include "core/put.h"
#include "core/region-request.h"
#include "core/scan.h"
#include "core/server-request.h"
-#include "core/mutation.h"
-#include "core/put.h"
#include "if/Client.pb.h"
#include "if/HBase.pb.h"
http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 7729257..b29a819 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -61,11 +61,11 @@ std::shared_ptr<Result> ResponseConverter::ToResult(
int cells_read = 0;
while (cells_read != result.associated_cell_count()) {
if (cell_scanner->Advance()) {
- vcells.push_back(cell_scanner->Current());
+ vcells.push_back(cell_scanner->Current());
cells_read += 1;
} else {
- LOG(ERROR)<< "CellScanner::Advance() returned false unexpectedly. Cells Read:- "
- << cells_read << "; Expected Cell Count:- " << result.associated_cell_count();
+ LOG(ERROR) << "CellScanner::Advance() returned false unexpectedly. Cells Read:- "
+ << cells_read << "; Expected Cell Count:- " << result.associated_cell_count();
std::runtime_error("CellScanner::Advance() returned false unexpectedly");
}
}
@@ -111,16 +111,15 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const R
std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ptr<Request> req,
const Response& resp) {
- auto multi_req = std::static_pointer_cast < hbase::pb::MultiRequest > (req->req_msg());
- auto multi_resp = std::static_pointer_cast < hbase::pb::MultiResponse > (resp.resp_msg());
+ auto multi_req = std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg());
+ auto multi_resp = std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg());
VLOG(3) << "GetResults:" << multi_resp->ShortDebugString();
int req_region_action_count = multi_req->regionaction_size();
int res_region_action_count = multi_resp->regionactionresult_size();
if (req_region_action_count != res_region_action_count) {
- throw std::runtime_error(
- "Request mutation count=" + std::to_string(req_region_action_count)
- + " does not match response mutation result count="
- + std::to_string(res_region_action_count));
+ throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) +
+ " does not match response mutation result count=" +
+ std::to_string(res_region_action_count));
}
auto multi_response = std::make_unique<hbase::MultiResponse>();
for (int32_t num = 0; num < res_region_action_count; num++) {
@@ -134,7 +133,7 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
auto region_name = rs.value();
if (action_result.has_exception()) {
if (action_result.exception().has_value()) {
- auto exc = std::make_shared < hbase::IOException > (action_result.exception().value());
+ auto exc = std::make_shared<hbase::IOException>(action_result.exception().value());
VLOG(8) << "Store Region Exception:- " << exc->what();
multi_response->AddRegionException(region_name, exc);
}
@@ -142,19 +141,18 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
}
if (actions.action_size() != action_result.resultorexception_size()) {
- throw std::runtime_error(
- "actions.action_size=" + std::to_string(actions.action_size())
- + ", action_result.resultorexception_size="
- + std::to_string(action_result.resultorexception_size()) + " for region "
- + actions.region().value());
+ throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) +
+ ", action_result.resultorexception_size=" +
+ std::to_string(action_result.resultorexception_size()) +
+ " for region " + actions.region().value());
}
for (hbase::pb::ResultOrException roe : action_result.resultorexception()) {
- std::shared_ptr < Result > result;
- std::shared_ptr < std::exception > exc;
+ std::shared_ptr<Result> result;
+ std::shared_ptr<std::exception> exc;
if (roe.has_exception()) {
if (roe.exception().has_value()) {
- exc = std::make_shared < hbase::IOException > (roe.exception().value());
+ exc = std::make_shared<hbase::IOException>(roe.exception().value());
VLOG(8) << "Store ResultOrException:- " << exc->what();
}
} else if (roe.has_result()) {
@@ -165,9 +163,9 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
// Sometimes, the response is just "it was processed". Generally, this occurs for things
// like mutateRows where either we get back 'processed' (or not) and optionally some
// statistics about the regions we touched.
- std::vector < std::shared_ptr < Cell >> empty_cells;
- result = std::make_shared < Result
- > (empty_cells, multi_resp->processed() ? true : false, false, false);
+ std::vector<std::shared_ptr<Cell>> empty_cells;
+ result = std::make_shared<Result>(empty_cells, multi_resp->processed() ? true : false,
+ false, false);
}
multi_response->AddRegionResult(region_name, roe.index(), std::move(result), exc);
}
@@ -177,7 +175,7 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics();
for (int i = 0; i < stats.region_size(); i++) {
multi_response->AddStatistic(stats.region(i).value(),
- std::make_shared < RegionLoadStats > (stats.stat(i)));
+ std::make_shared<RegionLoadStats>(stats.stat(i)));
}
}
return multi_response;