You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/09/10 06:00:22 UTC

[impala] 01/03: IMPALA-8904: retry statestore RegisterSubscriber() RPC

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 19cb8dc1c1c2247e91adc4bf62cab27a7c1e4381
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed Aug 28 12:05:35 2019 -0700

    IMPALA-8904: retry statestore RegisterSubscriber() RPC
    
    Previously connection failures triggered a retry, but
    failures on the actual RPC did not trigger a retry. This
    change moves the retry loop to DoRpcWithRetry(), instead
    of relying on the ClientCache to retry the connection.
    
    Note that DoRpcWithRetry() for thrift was dead code since
    most backend RPCs were ported to KRPC, but should still work.
    
    Testing:
    Added targeted test with debug action to inject error on first
    subscribe RPC.
    
    Change-Id: I5d4e6283b5ec83170a1d1d03075b3384a9f108b5
    Reviewed-on: http://gerrit.cloudera.org:8080/14198
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/client-cache.h                      | 27 ++++++++-----
 be/src/statestore/statestore-subscriber.cc         | 22 +++++++----
 tests/custom_cluster/test_statestore_rpc_errors.py | 44 ++++++++++++++++++++++
 3 files changed, 77 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index c06b979..b41c9b1 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -33,6 +33,7 @@
 #include "util/debug-util.h"
 #include "util/metrics-fwd.h"
 #include "util/network-util.h"
+#include "util/time.h"
 
 #include "common/status.h"
 
@@ -274,26 +275,34 @@ class ClientConnection {
     static RpcStatus OK() { return {Status::OK(), false}; }
   };
 
-  /// Helper that retries constructing a client and calling DoRpc() up the three times
-  /// and handles both RPC failures and failures to get a client from 'client_cache'.
-  /// 'debug_fn' is a Status-returning function that can be used to inject errors into
-  /// the RPC.
+  /// Helper that retries constructing a client and calling DoRpc() up to 'retries' times
+  /// with 'delay_ms' delay between retries. This handles both RPC failures and failures
+  /// to get a client from 'client_cache'.  'debug_fn' is a Status-returning function that
+  /// can be used to inject errors into the RPC.
   template <class F, class DebugF, class Request, class Response>
   static RpcStatus DoRpcWithRetry(ClientCache<T>* client_cache, TNetworkAddress address,
-      const F& f, const Request& request, const DebugF& debug_fn, Response* response) {
+      const F& f, const Request& request, int retries, int64_t delay_ms,
+      const DebugF& debug_fn, Response* response) {
     Status rpc_status;
     Status client_status;
 
-    // Try to send the RPC 3 times before failing.
-    for (int i = 0; i < 3; ++i) {
-      ImpalaBackendConnection client(client_cache, address, &client_status);
+    // Try to send the RPC as many times as requested before failing.
+    for (int i = 0; i < retries; ++i) {
+      if (i > 0) SleepForMs(delay_ms); // Delay before retrying.
+      ClientConnection<T> client(client_cache, address, &client_status);
       if (!client_status.ok()) continue;
 
       rpc_status = debug_fn();
-      if (!rpc_status.ok()) continue;
+      if (!rpc_status.ok()) {
+        LOG(INFO) << "Injected RPC error to " << TNetworkAddressToString(address) << ": "
+                  << rpc_status.GetDetail();
+        continue;
+      }
 
       rpc_status = client.DoRpc(f, request, response);
       if (rpc_status.ok()) break;
+      LOG(INFO) << "RPC to " << TNetworkAddressToString(address) << " failed "
+                << rpc_status.GetDetail();
     }
     if (!client_status.ok()) return {client_status, true};
     if (!rpc_status.ok()) return {rpc_status, false};
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 9ed64e2..6952fe5 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -64,6 +64,7 @@ DEFINE_int64_hidden(statestore_subscriber_recovery_grace_period_ms, 30000L, "Per
     "considered fully recovered. After a successful reconnect attempt, updates to the "
     "cluster membership will only become effective after this period has elapsed.");
 
+DECLARE_string(debug_actions);
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
@@ -124,8 +125,7 @@ StatestoreSubscriber::StatestoreSubscriber(const string& subscriber_id,
       failure_detector_(new TimeoutFailureDetector(
           seconds(FLAGS_statestore_subscriber_timeout_seconds),
           seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
-      client_cache_(new StatestoreClientCache(FLAGS_statestore_subscriber_cnxn_attempts,
-                FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
+      client_cache_(new StatestoreClientCache(1, 0, 0, 0, "",
                 !FLAGS_ssl_client_ca_certificate.empty())),
       metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
       heartbeat_address_(heartbeat_address),
@@ -172,9 +172,6 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
 
 Status StatestoreSubscriber::Register() {
   Status client_status;
-  StatestoreServiceConn client(client_cache_.get(), statestore_address_, &client_status);
-  RETURN_IF_ERROR(client_status);
-
   TRegisterSubscriberRequest request;
   for (const auto& registration : topic_registrations_) {
     TTopicRegistration thrift_topic;
@@ -189,8 +186,19 @@ Status StatestoreSubscriber::Register() {
   request.subscriber_location = heartbeat_address_;
   request.subscriber_id = subscriber_id_;
   TRegisterSubscriberResponse response;
-  RETURN_IF_ERROR(client.DoRpc(&StatestoreServiceClientWrapper::RegisterSubscriber,
-      request, &response));
+  int attempt = 0; // Used for debug action only.
+  StatestoreServiceConn::RpcStatus rpc_status =
+      StatestoreServiceConn::DoRpcWithRetry(client_cache_.get(), statestore_address_,
+          &StatestoreServiceClientWrapper::RegisterSubscriber, request,
+          FLAGS_statestore_subscriber_cnxn_attempts,
+          FLAGS_statestore_subscriber_cnxn_retry_interval_ms,
+          [&attempt]() {
+            return attempt++ == 0 ?
+                DebugAction(FLAGS_debug_actions, "REGISTER_SUBSCRIBER_FIRST_ATTEMPT") :
+                Status::OK();
+          },
+          &response);
+  RETURN_IF_ERROR(rpc_status.status);
   Status status = Status(response.status);
   if (status.ok()) {
     connected_to_statestore_metric_->SetValue(true);
diff --git a/tests/custom_cluster/test_statestore_rpc_errors.py b/tests/custom_cluster/test_statestore_rpc_errors.py
new file mode 100644
index 0000000..da6817d
--- /dev/null
+++ b/tests/custom_cluster/test_statestore_rpc_errors.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestStatestoreRpcErrors(CustomClusterTestSuite):
+  """Tests for statestore RPC handling."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestStatestoreRpcErrors, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      " --debug_actions=REGISTER_SUBSCRIBER_FIRST_ATTEMPT:FAIL@1.0")
+  def test_register_subscriber_rpc_error(self, vector):
+    self.assert_impalad_log_contains("INFO",
+        "Injected RPC error.*Debug Action: REGISTER_SUBSCRIBER_FIRST_ATTEMPT")
+
+    # Ensure cluster has started up by running a query.
+    result = self.execute_query("select count(*) from functional_parquet.alltypes")
+    assert result.success, str(result)