You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:53 UTC
[hbase] 65/133: HBASE-17629 [C++] Timeouts and retry configuration
for connections
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 61d2c21df5e6f463772d5286cc5893f0c52da167
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Feb 24 16:30:52 2017 -0800
HBASE-17629 [C++] Timeouts and retry configuration for connections
---
hbase-native-client/bin/cpplint.sh | 3 +-
.../connection/connection-factory.cc | 19 +-
.../connection/connection-factory.h | 10 +-
hbase-native-client/connection/connection-pool.cc | 5 +-
hbase-native-client/connection/connection-pool.h | 19 +-
hbase-native-client/connection/rpc-client.cc | 4 +-
hbase-native-client/connection/rpc-client.h | 5 +-
hbase-native-client/core/BUCK | 1 +
hbase-native-client/core/client.cc | 12 +-
hbase-native-client/core/client.h | 24 ++-
.../core/connection-configuration.h | 195 +++++++++++++++++++++
11 files changed, 268 insertions(+), 29 deletions(-)
diff --git a/hbase-native-client/bin/cpplint.sh b/hbase-native-client/bin/cpplint.sh
index 78a00bd..81795fd 100755
--- a/hbase-native-client/bin/cpplint.sh
+++ b/hbase-native-client/bin/cpplint.sh
@@ -26,4 +26,5 @@ wget -nc $CPPLINT_LOC -O $OUTPUT
# Execute the script
# Exclude the following rules: build/header_guard (We use #pragma once instead)
# readability/todo (TODOs are generic)
-find core connection serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 python $OUTPUT --filter=-build/header_guard,-readability/todo --linelength=100
+# build/c++11 (We are building with c++14)
+find core connection serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 python $OUTPUT --filter=-build/header_guard,-readability/todo,-build/c++11 --linelength=100
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 6aba351..2f7e75c 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -19,22 +19,32 @@
#include "connection/connection-factory.h"
+#include <chrono>
+
#include "connection/client-dispatcher.h"
#include "connection/pipeline.h"
#include "connection/service.h"
using namespace folly;
using namespace hbase;
+using std::chrono::milliseconds;
+using std::chrono::nanoseconds;
ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
- std::shared_ptr<Codec> codec)
- : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
+ std::shared_ptr<Codec> codec,
+ nanoseconds connect_timeout)
+ : connect_timeout_(connect_timeout),
+ io_pool_(io_pool),
+ pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() {
auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();
client->group(io_pool_);
client->pipelineFactory(pipeline_factory_);
+ // TODO: Opened https://github.com/facebook/wangle/issues/85 in wangle so that we can set socket
+ // options like TCP_NODELAY, SO_KEEPALIVE, CONNECT_TIMEOUT_MILLIS, etc.
+
return client;
}
std::shared_ptr<HBaseService> ConnectionFactory::Connect(
@@ -43,7 +53,10 @@ std::shared_ptr<HBaseService> ConnectionFactory::Connect(
// Yes this will block however it makes dealing with connection pool soooooo
// much nicer.
// TODO see about using shared promise for this.
- auto pipeline = client->connect(SocketAddress(hostname, port, true)).get();
+ auto pipeline = client
+ ->connect(SocketAddress(hostname, port, true),
+ std::chrono::duration_cast<milliseconds>(connect_timeout_))
+ .get();
auto dispatcher = std::make_shared<ClientDispatcher>();
dispatcher->setPipeline(pipeline);
return dispatcher;
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index 0d1e0d0..fbcb6ef 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -20,6 +20,7 @@
#include <wangle/service/Service.h>
+#include <chrono>
#include <memory>
#include <string>
@@ -28,6 +29,8 @@
#include "connection/response.h"
#include "connection/service.h"
+using std::chrono::nanoseconds;
+
namespace hbase {
/**
@@ -41,8 +44,10 @@ class ConnectionFactory {
* There should only be one ConnectionFactory per client.
*/
ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
- std::shared_ptr<Codec> codec);
- /** Default Desctructor */
+ std::shared_ptr<Codec> codec,
+ nanoseconds connect_timeout = nanoseconds(0));
+
+ /** Default Destructor */
virtual ~ConnectionFactory() = default;
/**
@@ -60,6 +65,7 @@ class ConnectionFactory {
const std::string &hostname, int port);
private:
+ nanoseconds connect_timeout_;
std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool_;
std::shared_ptr<RpcPipelineFactory> pipeline_factory_;
};
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 6635a6d..b18ee89 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -34,8 +34,9 @@ using folly::SharedMutexWritePriority;
using folly::SocketAddress;
ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<Codec> codec)
- : cf_(std::make_shared<ConnectionFactory>(io_executor, codec)),
+ std::shared_ptr<Codec> codec,
+ nanoseconds connect_timeout)
+ : cf_(std::make_shared<ConnectionFactory>(io_executor, codec, connect_timeout)),
clients_(),
connections_(),
map_mutex_() {}
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 23e5e9a..2a8f195 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -20,6 +20,7 @@
#include <folly/SharedMutex.h>
#include <boost/functional/hash.hpp>
+#include <chrono>
#include <memory>
#include <mutex>
#include <unordered_map>
@@ -35,6 +36,8 @@ using hbase::ConnectionIdEquals;
using hbase::ConnectionIdHash;
using hbase::RpcConnection;
+using std::chrono::nanoseconds;
+
namespace hbase {
/**
@@ -47,14 +50,7 @@ class ConnectionPool {
public:
/** Create connection pool wit default connection factory */
ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<Codec> codec);
-
- /**
- * Desctructor.
- * All connections will be close.
- * All connections will be released
- */
- ~ConnectionPool();
+ std::shared_ptr<Codec> codec, nanoseconds connect_timeout = nanoseconds(0));
/**
* Constructor that allows specifiying the connetion factory.
@@ -63,6 +59,13 @@ class ConnectionPool {
explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf);
/**
+ * Destructor.
+ * All connections will be close.
+ * All connections will be released
+ */
+ ~ConnectionPool();
+
+ /**
* Get a connection to the server name. Start time is ignored.
* This can be a blocking operation for a short time.
*/
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index cfbda3a..c61a73e 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -40,9 +40,9 @@ class RpcChannelImplementation : public AbstractRpcChannel {
} // namespace hbase
RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<Codec> codec)
+ std::shared_ptr<Codec> codec, nanoseconds connect_timeout)
: io_executor_(io_executor) {
- cp_ = std::make_shared<ConnectionPool>(io_executor_, codec);
+ cp_ = std::make_shared<ConnectionPool>(io_executor_, codec, connect_timeout);
}
void RpcClient::Close() { io_executor_->stop(); }
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index f4645a0..5c11ab5 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -26,6 +26,7 @@
#include <google/protobuf/service.h>
+#include <chrono>
#include <utility>
using hbase::security::User;
@@ -43,6 +44,8 @@ using google::protobuf::Message;
using google::protobuf::RpcController;
using google::protobuf::Closure;
+using std::chrono::nanoseconds;
+
class RpcChannelImplementation;
namespace hbase {
@@ -52,7 +55,7 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
public:
RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<Codec> codec);
+ std::shared_ptr<Codec> codec, nanoseconds connect_timeout);
virtual ~RpcClient() { Close(); }
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index ce0c733..e541d8f 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -27,6 +27,7 @@ cxx_library(
"keyvalue-codec.h",
"region-location.h",
"location-cache.h",
+ "connection-configuration.h",
# TODO: move this out of exported
# Once meta lookup works
"meta-utils.h",
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 685524f..240da72 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -20,6 +20,7 @@
#include "core/client.h"
#include <glog/logging.h>
+#include <chrono>
#include <exception>
#include <utility>
@@ -42,9 +43,12 @@ void Client::init(const hbase::Configuration &conf) {
conf_ = std::make_shared<hbase::Configuration>(conf);
auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
- cpu_executor_ =
- std::make_shared<wangle::CPUThreadPoolExecutor>(4); // TODO: read num threads from conf
- io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
+ conn_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
+ // start thread pools
+ auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
+ auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
+ cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
+ io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
std::shared_ptr<Codec> codec = nullptr;
if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
@@ -53,7 +57,7 @@ void Client::init(const hbase::Configuration &conf) {
} else {
LOG(WARNING) << "Not using RPC Cell Codec";
}
- rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec);
+ rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout());
location_cache_ =
std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 0e436ba..a96d6f3 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -29,6 +29,7 @@
#include "connection/rpc-client.h"
#include "core/configuration.h"
+#include "core/connection-configuration.h"
#include "core/hbase_configuration_loader.h"
#include "core/keyvalue-codec.h"
#include "core/location-cache.h"
@@ -54,13 +55,13 @@ class Client {
* @param quorum_spec Where to connect to get Zookeeper bootstrap information.
*/
Client();
- explicit Client(const hbase::Configuration &conf);
+ explicit Client(const hbase::Configuration& conf);
~Client();
/**
* @brief Retrieve a Table implementation for accessing a table.
* @param - table_name
*/
- std::unique_ptr<hbase::Table> Table(const TableName &table_name);
+ std::unique_ptr<hbase::Table> Table(const TableName& table_name);
/**
* @brief Close the Client connection.
@@ -68,16 +69,27 @@ class Client {
void Close();
private:
- void init(const hbase::Configuration &conf);
- const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
- const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181";
- const std::string kRpcCodec = "hbase.client.rpc.codec";
+ /** Constants */
+ static constexpr const char* kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
+ static constexpr const char* kDefHBaseZookeeperQuorum_ = "localhost:2181";
+ /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */
+ static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size";
+ /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
+ static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size";
+ /** The RPC codec to encode cells. For now it is KeyValueCodec */
+ static constexpr const char* kRpcCodec = "hbase.client.rpc.codec";
+
+ /** Data */
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
std::shared_ptr<hbase::LocationCache> location_cache_;
std::shared_ptr<hbase::RpcClient> rpc_client_;
std::shared_ptr<hbase::Configuration> conf_;
+ std::shared_ptr<hbase::ConnectionConfiguration> conn_conf_;
bool is_closed_ = false;
+
+ /** Methods */
+ void init(const hbase::Configuration &conf);
};
} // namespace hbase
diff --git a/hbase-native-client/core/connection-configuration.h b/hbase-native-client/core/connection-configuration.h
new file mode 100644
index 0000000..e1e9f87
--- /dev/null
+++ b/hbase-native-client/core/connection-configuration.h
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <chrono>
+#include <climits>
+#include <string>
+
+#include "core/configuration.h"
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+/**
+ * Timeout configs.
+ */
+class ConnectionConfiguration {
+ public:
+ explicit ConnectionConfiguration(const Configuration& conf) {
+ connect_timeout_ =
+ ToNanos(conf.GetInt(kClientSocketConnectTimeout, kDefaultClientSocketConnectTimeout));
+ meta_operation_timeout_ =
+ ToNanos(conf.GetLong(kClientMetaOperationTimeout, kDefaultClientOperationTimeout));
+ operation_timeout_ =
+ ToNanos(conf.GetLong(kClientOperationTimeout, kDefaultClientOperationTimeout));
+ rpc_timeout_ = ToNanos(conf.GetLong(kRpcTimeout, kDefaultRpcTimeout));
+ read_rpc_timeout_ = ToNanos(conf.GetLong(kRpcReadTimeout, ToMillis(rpc_timeout_)));
+ write_rpc_timeout_ = ToNanos(conf.GetLong(kRpcWriteTimeout, ToMillis(rpc_timeout_)));
+ pause_ = ToNanos(conf.GetLong(kClientPause, kDefaultClientPause));
+ max_retries_ = conf.GetInt(kClientRetriesNumber, kDefaultClientRetriesNumber);
+ start_log_errors_count_ =
+ conf.GetInt(kStartLogErrorsAfterCount, kDefaultStartLogErrorsAfterCount);
+ scan_timeout_ =
+ ToNanos(conf.GetLong(kClientScannerTimeoutPeriod, kDefaultClientScannerTimeoutPeriod));
+ scanner_caching_ = conf.GetInt(kClientScannerCaching, kDefaultClientScannerCaching);
+ scanner_max_result_size_ =
+ conf.GetLong(kClientScannerMaxResultsSize, kDefaultClientScannerMaxResultsSize);
+ }
+
+ nanoseconds connect_timeout() const { return connect_timeout_; }
+
+ nanoseconds meta_operation_timeout() const { return meta_operation_timeout_; }
+
+ // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
+ // by this value, see scanTimeoutNs.
+ nanoseconds operation_timeout() const { return operation_timeout_; }
+
+ // timeout for each rpc request. Can be overridden by a more specific config, such as
+ // readRpcTimeout or writeRpcTimeout.
+ nanoseconds rpc_timeout() const { return rpc_timeout_; }
+
+ // timeout for each read rpc request
+ nanoseconds read_rpc_timeout() const { return read_rpc_timeout_; }
+
+ // timeout for each write rpc request
+ nanoseconds write_rpc_timeout() const { return write_rpc_timeout_; }
+
+ nanoseconds pause_nanos() const { return pause_; }
+
+ uint32_t max_retries() const { return max_retries_; }
+
+ /** How many retries are allowed before we start to log */
+ uint32_t start_log_errors_count() const { return start_log_errors_count_; }
+
+ // The scan timeout is used as operation timeout for every
+ // operations in a scan, such as openScanner or next.
+ nanoseconds scan_timeout() const { return scan_timeout_; }
+
+ uint32_t scanner_caching() const { return scanner_caching_; }
+
+ uint64_t scanner_max_result_size() const { return scanner_max_result_size_; }
+
+ private:
+ /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
+ static constexpr const char* kClientSocketConnectTimeout =
+ "hbase.ipc.client.socket.timeout.connect";
+ /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
+ static constexpr const uint32_t kDefaultClientSocketConnectTimeout = 10000; // 10 secs
+
+ /** Parameter name for HBase client operation timeout. */
+ static constexpr const char* kClientOperationTimeout = "hbase.client.operation.timeout";
+
+ /** Parameter name for HBase client meta operation timeout. */
+ static constexpr const char* kClientMetaOperationTimeout = "hbase.client.meta.operation.timeout";
+
+ /** Default HBase client operation timeout, which is tantamount to a blocking call */
+ static constexpr const uint32_t kDefaultClientOperationTimeout = 1200000;
+
+ /** timeout for each RPC */
+ static constexpr const char* kRpcTimeout = "hbase.rpc.timeout";
+
+ /** timeout for each read RPC */
+ static constexpr const char* kRpcReadTimeout = "hbase.rpc.read.timeout";
+
+ /** timeout for each write RPC */
+ static constexpr const char* kRpcWriteTimeout = "hbase.rpc.write.timeout";
+
+ static constexpr const uint32_t kDefaultRpcTimeout = 60000;
+
+ /**
+ * Parameter name for client pause value, used mostly as value to wait
+ * before running a retry of a failed get, region lookup, etc.
+ */
+ static constexpr const char* kClientPause = "hbase.client.pause";
+
+ static constexpr const uint64_t kDefaultClientPause = 100;
+
+ /**
+ * Parameter name for maximum retries, used as maximum for all retryable
+ * operations such as fetching of the root region from root region server,
+ * getting a cell's value, starting a row update, etc.
+ */
+ static constexpr const char* kClientRetriesNumber = "hbase.client.retries.number";
+
+ static constexpr const uint32_t kDefaultClientRetriesNumber = 31;
+
+ /**
+ * Configure the number of failures after which the client will start logging. A few failures
+ * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
+ * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
+ * this stage.
+ */
+ static constexpr const char* kStartLogErrorsAfterCount = "hbase.client.start.log.errors.counter";
+ static constexpr const uint32_t kDefaultStartLogErrorsAfterCount = 9;
+
+ /** The client scanner timeout period in milliseconds. */
+ static constexpr const char* kClientScannerTimeoutPeriod = "hbase.client.scanner.timeout.period";
+
+ static constexpr const uint32_t kDefaultClientScannerTimeoutPeriod = 60000;
+
+ /**
+ * Parameter name to set the default scanner caching for all clients.
+ */
+ static constexpr const char* kClientScannerCaching = "hbase.client.scanner.caching";
+
+ static constexpr const uint32_t kDefaultClientScannerCaching = INT_MAX;
+
+ /**
+ * Parameter name for maximum number of bytes returned when calling a scanner's next method.
+ * Controlled by the client.
+ */
+ static constexpr const char* kClientScannerMaxResultsSize =
+ "hbase.client.scanner.max.result.size";
+
+ /**
+ * Maximum number of bytes returned when calling a scanner's next method.
+ * Note that when a single row is larger than this limit the row is still
+ * returned completely.
+ *
+ * The default value is 2MB.
+ */
+ static constexpr const uint64_t kDefaultClientScannerMaxResultsSize = 2 * 1024 * 1024;
+
+ nanoseconds connect_timeout_;
+ nanoseconds meta_operation_timeout_;
+ nanoseconds operation_timeout_;
+ nanoseconds rpc_timeout_;
+ nanoseconds read_rpc_timeout_;
+ nanoseconds write_rpc_timeout_;
+ nanoseconds pause_;
+ uint32_t max_retries_;
+ uint32_t start_log_errors_count_;
+ nanoseconds scan_timeout_;
+ uint32_t scanner_caching_;
+ uint64_t scanner_max_result_size_;
+
+ static nanoseconds ToNanos(const uint64_t& millis) {
+ return std::chrono::duration_cast<nanoseconds>(milliseconds(millis));
+ }
+
+ static uint64_t ToMillis(const nanoseconds& nanos) {
+ return std::chrono::duration_cast<milliseconds>(nanos).count();
+ }
+};
+
+} // namespace hbase