You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/07/10 17:21:50 UTC

[impala] branch master updated (65722d3 -> 70c2073)

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 65722d3  IMPALA-9855, IMPALA-9854: Fix query retry TSAN errors
     new 17b083d  IMPALA-3380: Add a timeout for statestore RPCs
     new 70c2073  IMPALA-9834: De-flake TestQueryRetries on EC builds

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/catalog/catalog-server.cc           |  2 ++
 be/src/runtime/exec-env.cc                 |  5 ----
 be/src/runtime/exec-env.h                  |  4 ---
 be/src/statestore/statestore-subscriber.cc | 25 +++++++++--------
 be/src/statestore/statestore-subscriber.h  |  4 +++
 tests/custom_cluster/test_query_retries.py | 18 ++++++++++--
 tests/custom_cluster/test_rpc_timeout.py   | 45 ++++++++++++++++++++++++++++++
 7 files changed, 81 insertions(+), 22 deletions(-)


[impala] 02/02: IMPALA-9834: De-flake TestQueryRetries on EC builds

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 70c2073d02675ffc64b09335e6c3a2744bc6d961
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Tue Jun 23 17:22:11 2020 -0700

    IMPALA-9834: De-flake TestQueryRetries on EC builds
    
    This patch skips all tests in TestQueryRetries on EC builds.
    
    The tests in TestQueryRetries runs queries that run on three instances
    during regular builds (HDFS, S3, etc.), but only two instances on EC
    builds. This causes some non-deterministism during the test because
    killing an impalad in the mini-cluster won't necessarily cause a retry
    to be triggered.
    
    It bumps up the timeout used when waiting for a query to be retried.
    
    It improves the assertion in __get_query_id_from_profile so that it
    dumps the full profile when the assertion fails. This should help
    debuggability of any test failures that fail in this assertion.
    
    Testing:
    * Ran TestQueryRetries locally
    
    Change-Id: Id5c73c2cbd0ef369175856c41f36d4b0de4b8d71
    Reviewed-on: http://gerrit.cloudera.org:8080/16149
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_query_retries.py | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index e17b807..54f2334 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -32,8 +32,16 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.errors import Timeout
+from tests.common.skip import SkipIfEC
 
 
+# All tests in this class have SkipIfEC because all tests run a query and expect
+# the query to be retried when killing a random impalad. On EC this does not always work
+# because many queries that might run on three impalads for HDFS / S3 builds, might only
+# run on two instances on EC builds. The difference is that EC creates smaller tables
+# compared to data stored on HDFS / S3. If the query is only run on two instances, then
+# randomly killing one impalad won't necessarily trigger a retry of the query.
+@SkipIfEC.fix_later
 class TestQueryRetries(CustomClusterTestSuite):
 
   # A query that shuffles a lot of data. Useful when testing query retries since it
@@ -499,7 +507,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     if not retried_query_id_search: return None
     return retried_query_id_search.group(1)
 
-  def __wait_until_retried(self, handle, timeout=60):
+  def __wait_until_retried(self, handle, timeout=300):
     """Wait until the given query handle has been retried. This is achieved by polling the
     runtime profile of the query and checking the 'Retry Status' field."""
     retried_state = "RETRIED"
@@ -529,7 +537,8 @@ class TestQueryRetries(CustomClusterTestSuite):
   def __get_query_id_from_profile(self, profile):
     """Extracts and returns the query id of the given profile."""
     query_id_search = re.search("Query \(id=(.*)\)", profile)
-    assert query_id_search, "Invalid query profile, has no query id"
+    assert query_id_search, "Invalid query profile, has no query id:\n{0}".format(
+        profile)
     return query_id_search.group(1)
 
   def __get_original_query_profile(self, original_query_id):
@@ -603,6 +612,11 @@ class TestQueryRetries(CustomClusterTestSuite):
     assert "Retried Query Id: {0}".format(retried_query_id) \
         in original_runtime_profile, original_runtime_profile
 
+    # Assert that the original query ran on all three nodes. All queries scan tables
+    # large enough such that scan fragments are scheduled on all impalads.
+    assert re.search("PLAN FRAGMENT.*instances=3", original_runtime_profile), \
+        original_runtime_profile
+
   def __validate_web_ui_state(self):
     """Validate the state of the web ui after a query (or queries) have been retried.
     The web ui should list 0 queries as in flight, running, or queued."""


[impala] 01/02: IMPALA-3380: Add a timeout for statestore RPCs

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 17b083d371c8e62c30a5cc7fe6c4aa474866f289
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Thu Jun 11 17:41:20 2020 -0700

    IMPALA-3380: Add a timeout for statestore RPCs
    
    Adds the following startup flags for statestore subscribers:
    'statestore_client_rpc_timeout_ms'. The timeout is set to 5 minutes by
    default.
    
    Testing:
    * Adds some tests for catalog_client_rpc_timeout_ms that validate the
      timeout is used correctly, and that retries are triggered
    
    Change-Id: If49892ff1950cf474f951aabf4c952dbc44189e2
    Reviewed-on: http://gerrit.cloudera.org:8080/16150
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc           |  2 ++
 be/src/runtime/exec-env.cc                 |  5 ----
 be/src/runtime/exec-env.h                  |  4 ---
 be/src/statestore/statestore-subscriber.cc | 25 +++++++++--------
 be/src/statestore/statestore-subscriber.h  |  4 +++
 tests/custom_cluster/test_rpc_timeout.py   | 45 ++++++++++++++++++++++++++++++
 6 files changed, 65 insertions(+), 20 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 9dbe35c..d38f241 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -73,6 +73,7 @@ DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maxim
     "(in seconds) a partial catalog object fetch RPC spends in the queue waiting "
     "to run. Must be set to a value greater than zero.");
 
+DECLARE_string(debug_actions);
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
@@ -126,6 +127,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void ResetMetadata(TResetMetadataResponse& resp, const TResetMetadataRequest& req)
       override {
     VLOG_RPC << "ResetMetadata(): request=" << ThriftDebugString(req);
+    DebugActionNoFail(FLAGS_debug_actions, "RESET_METADATA_DELAY");
     Status status = catalog_server_->catalog()->ResetMetadata(req, &resp);
     if (!status.ok()) LOG(ERROR) << status.GetDetail();
     TStatus thrift_status;
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index a25f1ca..5850ccd 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -229,10 +229,6 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
     int statestore_port)
   : obj_pool_(new ObjectPool),
     metrics_(new MetricGroup("impala-metrics")),
-    impalad_client_cache_(
-        new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
-            FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "",
-            !FLAGS_ssl_client_ca_certificate.empty())),
     // Create the CatalogServiceClientCache with num_retries = 1 and wait_ms = 0.
     // Connections are still retried, but the retry mechanism is driven by
     // DoRpcWithRetry. Clients should always use DoRpcWithRetry rather than DoRpc to
@@ -369,7 +365,6 @@ Status ExecEnv::Init() {
   InitSystemStateInfo();
 
   RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
-  impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
   RETURN_IF_ERROR(RegisterMemoryMetrics(
       metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 0902ada..0299c89 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -118,9 +118,6 @@ class ExecEnv {
 
   KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
 
-  ImpalaBackendClientCache* impalad_client_cache() {
-    return impalad_client_cache_.get();
-  }
   CatalogServiceClientCache* catalogd_client_cache() {
     return catalogd_client_cache_.get();
   }
@@ -187,7 +184,6 @@ class ExecEnv {
   boost::scoped_ptr<Scheduler> scheduler_;
   boost::scoped_ptr<AdmissionController> admission_controller_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
-  boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_;
   boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_;
   boost::scoped_ptr<HBaseTableFactory> htable_factory_;
   boost::scoped_ptr<io::DiskIoMgr> disk_io_mgr_;
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 1d931bd..1c10e60 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -66,6 +66,8 @@ DEFINE_int64_hidden(statestore_subscriber_recovery_grace_period_ms, 30000L, "Per
     "after the last successful subscription attempt until the subscriber will be "
     "considered fully recovered. After a successful reconnect attempt, updates to the "
     "cluster membership will only become effective after this period has elapsed.");
+DEFINE_int32(statestore_client_rpc_timeout_ms, 300000, "(Advanced) The underlying "
+    "TSocket send/recv timeout in milliseconds for a catalog client RPC.");
 
 DECLARE_string(debug_actions);
 DECLARE_string(ssl_client_ca_certificate);
@@ -122,17 +124,18 @@ class StatestoreSubscriberThriftIf : public StatestoreSubscriberIf {
 StatestoreSubscriber::StatestoreSubscriber(const string& subscriber_id,
     const TNetworkAddress& heartbeat_address, const TNetworkAddress& statestore_address,
     MetricGroup* metrics)
-    : subscriber_id_(subscriber_id),
-      statestore_address_(statestore_address),
-      thrift_iface_(new StatestoreSubscriberThriftIf(this)),
-      failure_detector_(new TimeoutFailureDetector(
-          seconds(FLAGS_statestore_subscriber_timeout_seconds),
-          seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
-      client_cache_(new StatestoreClientCache(1, 0, 0, 0, "",
-                !FLAGS_ssl_client_ca_certificate.empty())),
-      metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
-      heartbeat_address_(heartbeat_address),
-      is_registered_(false) {
+  : subscriber_id_(subscriber_id),
+    statestore_address_(statestore_address),
+    thrift_iface_(new StatestoreSubscriberThriftIf(this)),
+    failure_detector_(
+        new TimeoutFailureDetector(seconds(FLAGS_statestore_subscriber_timeout_seconds),
+            seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
+    client_cache_(new StatestoreClientCache(1, 0, FLAGS_statestore_client_rpc_timeout_ms,
+        FLAGS_statestore_client_rpc_timeout_ms, "",
+        !FLAGS_ssl_client_ca_certificate.empty())),
+    metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
+    heartbeat_address_(heartbeat_address),
+    is_registered_(false) {
   connected_to_statestore_metric_ =
       metrics_->AddProperty("statestore-subscriber.connected", false);
   connection_failure_metric_ =
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 406022b..f8015cf 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -151,6 +151,10 @@ class StatestoreSubscriber {
   std::unique_ptr<Thread> recovery_mode_thread_;
 
   /// statestore client cache - only one client is ever used. Initialized in constructor.
+  /// The StatestoreClientCache is created with num_retries = 1 and wait_ms = 0.
+  /// Connections are still retried, but the retry mechanism is driven by DoRpcWithRetry.
+  /// Clients should always use DoRpcWithRetry rather than DoRpc to ensure that both RPCs
+  /// and connections are retried.
   boost::scoped_ptr<StatestoreClientCache> client_cache_;
 
   /// MetricGroup instance that all metrics are registered in. Not owned by this class.
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 9b2a3b2..59a809a 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -191,3 +191,48 @@ class TestRPCTimeout(CustomClusterTestSuite):
     conclude that the backend is unresponsive."""
     self.execute_query_verify_metrics(self.SLOW_TEST_QUERY,
         expected_exception="cancelled due to unresponsive backend")
+
+
+class TestCatalogRPCTimeout(CustomClusterTestSuite):
+  """"Tests RPC timeout and retry handling for catalogd operations."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestCatalogRPCTimeout, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--catalog_client_rpc_timeout_ms=10 "
+                 "--catalog_client_rpc_retry_interval_ms=1 "
+                 "--catalog_client_connection_num_retries=1",
+    catalogd_args="--debug_actions=RESET_METADATA_DELAY:SLEEP@1000")
+  def test_catalog_rpc_timeout(self):
+    """Tests that catalog_client_rpc_timeout_ms enforces a timeout on catalogd
+    operations. The debug action causes a delay of 1 second for refresh table
+    commands. The RPC timeout is 10 ms, so all refresh table commands should
+    fail with an RPC timeout exception."""
+    try:
+      self.execute_query("refresh functional.alltypes")
+    except ImpalaBeeswaxException as e:
+      assert "RPC recv timed out" in str(e)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--catalog_client_rpc_timeout_ms=5000 "
+                 "--catalog_client_rpc_retry_interval_ms=0 "
+                 "--catalog_client_connection_num_retries=10",
+    catalogd_args="--debug_actions=RESET_METADATA_DELAY:JITTER@10000@0.75")
+  def test_catalog_rpc_retries(self):
+    """Tests that catalogd operations are retried. The debug action should randomly
+    cause refresh table commands to fail. However, the catalogd client will retry
+    the command 10 times, so eventually the refresh attempt should succeed. The debug
+    action will add 10 seconds of delay to refresh table operations. The delay will only
+    be triggered 75% of the time.
+    """
+    self.execute_query("refresh functional.alltypes")