You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/07/10 17:21:51 UTC
[impala] 01/02: IMPALA-3380: Add a timeout for statestore RPCs
This is an automated email from the ASF dual-hosted git repository.
stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 17b083d371c8e62c30a5cc7fe6c4aa474866f289
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Thu Jun 11 17:41:20 2020 -0700
IMPALA-3380: Add a timeout for statestore RPCs
Adds the following startup flags for statestore subscribers:
'statestore_client_rpc_timeout_ms'. The timeout is set to 5 minutes by
default.
Testing:
* Adds some tests for catalog_client_rpc_timeout_ms that validate the
timeout is used correctly, and that retries are triggered
Change-Id: If49892ff1950cf474f951aabf4c952dbc44189e2
Reviewed-on: http://gerrit.cloudera.org:8080/16150
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/catalog/catalog-server.cc | 2 ++
be/src/runtime/exec-env.cc | 5 ----
be/src/runtime/exec-env.h | 4 ---
be/src/statestore/statestore-subscriber.cc | 25 +++++++++--------
be/src/statestore/statestore-subscriber.h | 4 +++
tests/custom_cluster/test_rpc_timeout.py | 45 ++++++++++++++++++++++++++++++
6 files changed, 65 insertions(+), 20 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 9dbe35c..d38f241 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -73,6 +73,7 @@ DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maxim
"(in seconds) a partial catalog object fetch RPC spends in the queue waiting "
"to run. Must be set to a value greater than zero.");
+DECLARE_string(debug_actions);
DECLARE_string(state_store_host);
DECLARE_int32(state_store_subscriber_port);
DECLARE_int32(state_store_port);
@@ -126,6 +127,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
void ResetMetadata(TResetMetadataResponse& resp, const TResetMetadataRequest& req)
override {
VLOG_RPC << "ResetMetadata(): request=" << ThriftDebugString(req);
+ DebugActionNoFail(FLAGS_debug_actions, "RESET_METADATA_DELAY");
Status status = catalog_server_->catalog()->ResetMetadata(req, &resp);
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index a25f1ca..5850ccd 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -229,10 +229,6 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
int statestore_port)
: obj_pool_(new ObjectPool),
metrics_(new MetricGroup("impala-metrics")),
- impalad_client_cache_(
- new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
- FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "",
- !FLAGS_ssl_client_ca_certificate.empty())),
// Create the CatalogServiceClientCache with num_retries = 1 and wait_ms = 0.
// Connections are still retried, but the retry mechanism is driven by
// DoRpcWithRetry. Clients should always use DoRpcWithRetry rather than DoRpc to
@@ -369,7 +365,6 @@ Status ExecEnv::Init() {
InitSystemStateInfo();
RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
- impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
RETURN_IF_ERROR(RegisterMemoryMetrics(
metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 0902ada..0299c89 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -118,9 +118,6 @@ class ExecEnv {
KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
- ImpalaBackendClientCache* impalad_client_cache() {
- return impalad_client_cache_.get();
- }
CatalogServiceClientCache* catalogd_client_cache() {
return catalogd_client_cache_.get();
}
@@ -187,7 +184,6 @@ class ExecEnv {
boost::scoped_ptr<Scheduler> scheduler_;
boost::scoped_ptr<AdmissionController> admission_controller_;
boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
- boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_;
boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_;
boost::scoped_ptr<HBaseTableFactory> htable_factory_;
boost::scoped_ptr<io::DiskIoMgr> disk_io_mgr_;
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 1d931bd..1c10e60 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -66,6 +66,8 @@ DEFINE_int64_hidden(statestore_subscriber_recovery_grace_period_ms, 30000L, "Per
"after the last successful subscription attempt until the subscriber will be "
"considered fully recovered. After a successful reconnect attempt, updates to the "
"cluster membership will only become effective after this period has elapsed.");
+DEFINE_int32(statestore_client_rpc_timeout_ms, 300000, "(Advanced) The underlying "
+ "TSocket send/recv timeout in milliseconds for a catalog client RPC.");
DECLARE_string(debug_actions);
DECLARE_string(ssl_client_ca_certificate);
@@ -122,17 +124,18 @@ class StatestoreSubscriberThriftIf : public StatestoreSubscriberIf {
StatestoreSubscriber::StatestoreSubscriber(const string& subscriber_id,
const TNetworkAddress& heartbeat_address, const TNetworkAddress& statestore_address,
MetricGroup* metrics)
- : subscriber_id_(subscriber_id),
- statestore_address_(statestore_address),
- thrift_iface_(new StatestoreSubscriberThriftIf(this)),
- failure_detector_(new TimeoutFailureDetector(
- seconds(FLAGS_statestore_subscriber_timeout_seconds),
- seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
- client_cache_(new StatestoreClientCache(1, 0, 0, 0, "",
- !FLAGS_ssl_client_ca_certificate.empty())),
- metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
- heartbeat_address_(heartbeat_address),
- is_registered_(false) {
+ : subscriber_id_(subscriber_id),
+ statestore_address_(statestore_address),
+ thrift_iface_(new StatestoreSubscriberThriftIf(this)),
+ failure_detector_(
+ new TimeoutFailureDetector(seconds(FLAGS_statestore_subscriber_timeout_seconds),
+ seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
+ client_cache_(new StatestoreClientCache(1, 0, FLAGS_statestore_client_rpc_timeout_ms,
+ FLAGS_statestore_client_rpc_timeout_ms, "",
+ !FLAGS_ssl_client_ca_certificate.empty())),
+ metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
+ heartbeat_address_(heartbeat_address),
+ is_registered_(false) {
connected_to_statestore_metric_ =
metrics_->AddProperty("statestore-subscriber.connected", false);
connection_failure_metric_ =
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 406022b..f8015cf 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -151,6 +151,10 @@ class StatestoreSubscriber {
std::unique_ptr<Thread> recovery_mode_thread_;
/// statestore client cache - only one client is ever used. Initialized in constructor.
+ /// The StatestoreClientCache is created with num_retries = 1 and wait_ms = 0.
+ /// Connections are still retried, but the retry mechanism is driven by DoRpcWithRetry.
+ /// Clients should always use DoRpcWithRetry rather than DoRpc to ensure that both RPCs
+ /// and connections are retried.
boost::scoped_ptr<StatestoreClientCache> client_cache_;
/// MetricGroup instance that all metrics are registered in. Not owned by this class.
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 9b2a3b2..59a809a 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -191,3 +191,48 @@ class TestRPCTimeout(CustomClusterTestSuite):
conclude that the backend is unresponsive."""
self.execute_query_verify_metrics(self.SLOW_TEST_QUERY,
expected_exception="cancelled due to unresponsive backend")
+
+
+class TestCatalogRPCTimeout(CustomClusterTestSuite):
+ """"Tests RPC timeout and retry handling for catalogd operations."""
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestCatalogRPCTimeout, cls).setup_class()
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--catalog_client_rpc_timeout_ms=10 "
+ "--catalog_client_rpc_retry_interval_ms=1 "
+ "--catalog_client_connection_num_retries=1",
+ catalogd_args="--debug_actions=RESET_METADATA_DELAY:SLEEP@1000")
+ def test_catalog_rpc_timeout(self):
+ """Tests that catalog_client_rpc_timeout_ms enforces a timeout on catalogd
+ operations. The debug action causes a delay of 1 second for refresh table
+ commands. The RPC timeout is 10 ms, so all refresh table commands should
+ fail with an RPC timeout exception."""
+ try:
+ self.execute_query("refresh functional.alltypes")
+ except ImpalaBeeswaxException as e:
+ assert "RPC recv timed out" in str(e)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--catalog_client_rpc_timeout_ms=5000 "
+ "--catalog_client_rpc_retry_interval_ms=0 "
+ "--catalog_client_connection_num_retries=10",
+ catalogd_args="--debug_actions=RESET_METADATA_DELAY:JITTER@10000@0.75")
+ def test_catalog_rpc_retries(self):
+ """Tests that catalogd operations are retried. The debug action should randomly
+ cause refresh table commands to fail. However, the catalogd client will retry
+ the command 10 times, so eventually the refresh attempt should succeed. The debug
+ action will add 10 seconds of delay to refresh table operations. The delay will only
+ be triggered 75% of the time.
+ """
+ self.execute_query("refresh functional.alltypes")