You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/07/10 17:21:51 UTC

[impala] 01/02: IMPALA-3380: Add a timeout for statestore RPCs

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 17b083d371c8e62c30a5cc7fe6c4aa474866f289
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Thu Jun 11 17:41:20 2020 -0700

    IMPALA-3380: Add a timeout for statestore RPCs
    
    Adds the following startup flags for statestore subscribers:
    'statestore_client_rpc_timeout_ms'. The timeout is set to 5 minutes by
    default.
    
    Testing:
    * Adds some tests for catalog_client_rpc_timeout_ms that validate the
      timeout is used correctly, and that retries are triggered
    
    Change-Id: If49892ff1950cf474f951aabf4c952dbc44189e2
    Reviewed-on: http://gerrit.cloudera.org:8080/16150
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc           |  2 ++
 be/src/runtime/exec-env.cc                 |  5 ----
 be/src/runtime/exec-env.h                  |  4 ---
 be/src/statestore/statestore-subscriber.cc | 25 +++++++++--------
 be/src/statestore/statestore-subscriber.h  |  4 +++
 tests/custom_cluster/test_rpc_timeout.py   | 45 ++++++++++++++++++++++++++++++
 6 files changed, 65 insertions(+), 20 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 9dbe35c..d38f241 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -73,6 +73,7 @@ DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maxim
     "(in seconds) a partial catalog object fetch RPC spends in the queue waiting "
     "to run. Must be set to a value greater than zero.");
 
+DECLARE_string(debug_actions);
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
@@ -126,6 +127,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void ResetMetadata(TResetMetadataResponse& resp, const TResetMetadataRequest& req)
       override {
     VLOG_RPC << "ResetMetadata(): request=" << ThriftDebugString(req);
+    DebugActionNoFail(FLAGS_debug_actions, "RESET_METADATA_DELAY");
     Status status = catalog_server_->catalog()->ResetMetadata(req, &resp);
     if (!status.ok()) LOG(ERROR) << status.GetDetail();
     TStatus thrift_status;
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index a25f1ca..5850ccd 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -229,10 +229,6 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
     int statestore_port)
   : obj_pool_(new ObjectPool),
     metrics_(new MetricGroup("impala-metrics")),
-    impalad_client_cache_(
-        new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
-            FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "",
-            !FLAGS_ssl_client_ca_certificate.empty())),
     // Create the CatalogServiceClientCache with num_retries = 1 and wait_ms = 0.
     // Connections are still retried, but the retry mechanism is driven by
     // DoRpcWithRetry. Clients should always use DoRpcWithRetry rather than DoRpc to
@@ -369,7 +365,6 @@ Status ExecEnv::Init() {
   InitSystemStateInfo();
 
   RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
-  impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
   RETURN_IF_ERROR(RegisterMemoryMetrics(
       metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 0902ada..0299c89 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -118,9 +118,6 @@ class ExecEnv {
 
   KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
 
-  ImpalaBackendClientCache* impalad_client_cache() {
-    return impalad_client_cache_.get();
-  }
   CatalogServiceClientCache* catalogd_client_cache() {
     return catalogd_client_cache_.get();
   }
@@ -187,7 +184,6 @@ class ExecEnv {
   boost::scoped_ptr<Scheduler> scheduler_;
   boost::scoped_ptr<AdmissionController> admission_controller_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
-  boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_;
   boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_;
   boost::scoped_ptr<HBaseTableFactory> htable_factory_;
   boost::scoped_ptr<io::DiskIoMgr> disk_io_mgr_;
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 1d931bd..1c10e60 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -66,6 +66,8 @@ DEFINE_int64_hidden(statestore_subscriber_recovery_grace_period_ms, 30000L, "Per
     "after the last successful subscription attempt until the subscriber will be "
     "considered fully recovered. After a successful reconnect attempt, updates to the "
     "cluster membership will only become effective after this period has elapsed.");
+DEFINE_int32(statestore_client_rpc_timeout_ms, 300000, "(Advanced) The underlying "
+    "TSocket send/recv timeout in milliseconds for a catalog client RPC.");
 
 DECLARE_string(debug_actions);
 DECLARE_string(ssl_client_ca_certificate);
@@ -122,17 +124,18 @@ class StatestoreSubscriberThriftIf : public StatestoreSubscriberIf {
 StatestoreSubscriber::StatestoreSubscriber(const string& subscriber_id,
     const TNetworkAddress& heartbeat_address, const TNetworkAddress& statestore_address,
     MetricGroup* metrics)
-    : subscriber_id_(subscriber_id),
-      statestore_address_(statestore_address),
-      thrift_iface_(new StatestoreSubscriberThriftIf(this)),
-      failure_detector_(new TimeoutFailureDetector(
-          seconds(FLAGS_statestore_subscriber_timeout_seconds),
-          seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
-      client_cache_(new StatestoreClientCache(1, 0, 0, 0, "",
-                !FLAGS_ssl_client_ca_certificate.empty())),
-      metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
-      heartbeat_address_(heartbeat_address),
-      is_registered_(false) {
+  : subscriber_id_(subscriber_id),
+    statestore_address_(statestore_address),
+    thrift_iface_(new StatestoreSubscriberThriftIf(this)),
+    failure_detector_(
+        new TimeoutFailureDetector(seconds(FLAGS_statestore_subscriber_timeout_seconds),
+            seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
+    client_cache_(new StatestoreClientCache(1, 0, FLAGS_statestore_client_rpc_timeout_ms,
+        FLAGS_statestore_client_rpc_timeout_ms, "",
+        !FLAGS_ssl_client_ca_certificate.empty())),
+    metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
+    heartbeat_address_(heartbeat_address),
+    is_registered_(false) {
   connected_to_statestore_metric_ =
       metrics_->AddProperty("statestore-subscriber.connected", false);
   connection_failure_metric_ =
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 406022b..f8015cf 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -151,6 +151,10 @@ class StatestoreSubscriber {
   std::unique_ptr<Thread> recovery_mode_thread_;
 
   /// statestore client cache - only one client is ever used. Initialized in constructor.
+  /// The StatestoreClientCache is created with num_retries = 1 and wait_ms = 0.
+  /// Connections are still retried, but the retry mechanism is driven by DoRpcWithRetry.
+  /// Clients should always use DoRpcWithRetry rather than DoRpc to ensure that both RPCs
+  /// and connections are retried.
   boost::scoped_ptr<StatestoreClientCache> client_cache_;
 
   /// MetricGroup instance that all metrics are registered in. Not owned by this class.
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 9b2a3b2..59a809a 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -191,3 +191,48 @@ class TestRPCTimeout(CustomClusterTestSuite):
     conclude that the backend is unresponsive."""
     self.execute_query_verify_metrics(self.SLOW_TEST_QUERY,
         expected_exception="cancelled due to unresponsive backend")
+
+
+class TestCatalogRPCTimeout(CustomClusterTestSuite):
+  """"Tests RPC timeout and retry handling for catalogd operations."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestCatalogRPCTimeout, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--catalog_client_rpc_timeout_ms=10 "
+                 "--catalog_client_rpc_retry_interval_ms=1 "
+                 "--catalog_client_connection_num_retries=1",
+    catalogd_args="--debug_actions=RESET_METADATA_DELAY:SLEEP@1000")
+  def test_catalog_rpc_timeout(self):
+    """Tests that catalog_client_rpc_timeout_ms enforces a timeout on catalogd
+    operations. The debug action causes a delay of 1 second for refresh table
+    commands. The RPC timeout is 10 ms, so all refresh table commands should
+    fail with an RPC timeout exception."""
+    try:
+      self.execute_query("refresh functional.alltypes")
+    except ImpalaBeeswaxException as e:
+      assert "RPC recv timed out" in str(e)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--catalog_client_rpc_timeout_ms=5000 "
+                 "--catalog_client_rpc_retry_interval_ms=0 "
+                 "--catalog_client_connection_num_retries=10",
+    catalogd_args="--debug_actions=RESET_METADATA_DELAY:JITTER@10000@0.75")
+  def test_catalog_rpc_retries(self):
+    """Tests that catalogd operations are retried. The debug action should randomly
+    cause refresh table commands to fail. However, the catalogd client will retry
+    the command 10 times, so eventually the refresh attempt should succeed. The debug
+    action will add 10 seconds of delay to refresh table operations. The delay will only
+    be triggered 75% of the time.
+    """
+    self.execute_query("refresh functional.alltypes")