You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/13 06:37:21 UTC

[impala] 07/07: IMPALA-8143: Enhance DoRpcWithRetry().

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 94652d74521e95e8606ea2d22aabcaddde6fc471
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Wed May 22 17:25:04 2019 -0700

    IMPALA-8143: Enhance DoRpcWithRetry().
    
    Allow callers of RpcMgr::DoRpcWithRetry to specify a time to sleep if
    the remote service is busy. DoRpcWithRetry now only sleeps if the remote
    service is busy.
    
    TESTING:
    
    Ran all end-to-end tests.
    
    Add two new tests to rpc-mgr-test.cc which test RpcMgr::DoRpcWithRetry.
    One test uses a fake Proxy class, the other uses a new DebugAction to
    cause Krpc calls to be rejected as if the service was too busy.
    
    Change-Id: Ia9693151c35e02235665b3c285a48c585973d390
    Reviewed-on: http://gerrit.cloudera.org:8080/12672
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/impala-service-pool.cc           | 13 ++++-
 be/src/rpc/impala-service-pool.h            |  3 +
 be/src/rpc/rpc-mgr-test.cc                  | 87 ++++++++++++++++++++++++++++-
 be/src/rpc/rpc-mgr-test.h                   | 27 +++++++--
 be/src/rpc/rpc-mgr.h                        | 15 +++++
 be/src/rpc/rpc-mgr.inline.h                 | 36 ++++++++++++
 be/src/runtime/coordinator-backend-state.cc | 14 +++--
 be/src/runtime/query-state.h                |  2 +-
 be/src/service/client-request-state.cc      | 13 +++--
 be/src/service/control-service.h            | 23 --------
 10 files changed, 189 insertions(+), 44 deletions(-)

diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 44a0c64..9db137a 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -50,9 +50,11 @@ METRIC_DEFINE_histogram(server, impala_incoming_queue_time,
 
 using namespace rapidjson;
 
+DECLARE_string(debug_actions);
+
 namespace impala {
-// Metric key format for rpc call duration metrics.
-const string RPC_QUEUE_OVERFLOW_METRIC_KEY = "rpc.$0.rpcs_queue_overflow";
+const char * ImpalaServicePool::RPC_QUEUE_OVERFLOW_METRIC_KEY =
+    "rpc.$0.rpcs_queue_overflow";
 
 ImpalaServicePool::ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
     int service_queue_length, kudu::rpc::GeneratedServiceIf* service,
@@ -186,6 +188,13 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
     service_mem_tracker_->Consume(transfer_size);
   }
 
+  Status debug_status = DebugAction(FLAGS_debug_actions, "SERVICE_POOL_SERVER_BUSY");
+  if (UNLIKELY(!debug_status.ok())) {
+    // Simulate the service being too busy.
+    RejectTooBusy(c);
+    return kudu::Status::OK();
+  }
+
   boost::optional<kudu::rpc::InboundCall*> evicted;
   auto queue_status = service_queue_.Put(c, &evicted);
   if (UNLIKELY(queue_status == kudu::rpc::QueueStatus::QUEUE_FULL)) {
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index b4313bf..dd56436 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -68,6 +68,9 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// Expose the service pool metrics by storing them as JSON in 'value'.
   void ToJson(rapidjson::Value* value, rapidjson::Document* document);
 
+  /// Metric key format for rpc call duration metrics.
+  static const char* RPC_QUEUE_OVERFLOW_METRIC_KEY;
+
  private:
   void RunThread();
   void RejectTooBusy(kudu::rpc::InboundCall* c);
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 07dbfe7..c0d4fbd 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -31,12 +31,14 @@ DECLARE_int32(num_reactor_threads);
 DECLARE_int32(num_acceptor_threads);
 DECLARE_int32(rpc_negotiation_timeout_ms);
 DECLARE_string(hostname);
+DECLARE_string(debug_actions);
 
 // For tests that do not require kerberized testing, we use RpcTest.
 namespace impala {
 
+// Test multiple services managed by an Rpc Manager using TLS.
 TEST_F(RpcMgrTest, MultipleServicesTls) {
-  // TODO: We're starting a seperate RpcMgr here instead of configuring
+  // TODO: We're starting a separate RpcMgr here instead of configuring
   // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
   // new gtest params to turn on TLS which needs to be a coordinated change across
   // rpc-mgr-test and thrift-server-test.
@@ -55,6 +57,7 @@ TEST_F(RpcMgrTest, MultipleServicesTls) {
   tls_rpc_mgr.Shutdown();
 }
 
+// Test multiple services managed by an Rpc Manager.
 TEST_F(RpcMgrTest, MultipleServices) {
   ASSERT_OK(RunMultipleServicesTest(&rpc_mgr_, krpc_address_));
 }
@@ -163,6 +166,7 @@ TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
   tls_rpc_mgr.Shutdown();
 }
 
+// Test behavior with a slow service.
 TEST_F(RpcMgrTest, SlowCallback) {
   // Use a callback which is slow to respond.
   auto slow_cb = [](RpcContext* ctx) {
@@ -199,6 +203,7 @@ TEST_F(RpcMgrTest, SlowCallback) {
   }
 }
 
+// Test async calls.
 TEST_F(RpcMgrTest, AsyncCall) {
   GeneratedServiceIf* scan_mem_impl =
       TakeOverService(make_unique<ScanMemServiceImpl>(&rpc_mgr_));
@@ -251,6 +256,86 @@ TEST_F(RpcMgrTest, NegotiationTimeout) {
   secondary_rpc_mgr.Shutdown();
 }
 
+// Test RpcMgr::DoRpcWithRetry using a fake proxy.
+TEST_F(RpcMgrTest, DoRpcWithRetry) {
+  TQueryCtx query_ctx;
+  const int num_retries = 10;
+  const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+
+  // Test how DoRpcWithRetry retries by using a proxy that always fails.
+  unique_ptr<FailingPingServiceProxy> failing_proxy =
+      make_unique<FailingPingServiceProxy>();
+  // A call that fails is not retried as the server is not busy.
+  PingRequestPB request1;
+  PingResponsePB response1;
+  Status rpc_status_fail =
+      RpcMgr::DoRpcWithRetry(failing_proxy, &FailingPingServiceProxy::Ping, request1,
+          &response1, query_ctx, "ping failed", num_retries, timeout_ms);
+  ASSERT_FALSE(rpc_status_fail.ok());
+  // Check that proxy was only called once.
+  ASSERT_EQ(1, failing_proxy->GetNumberOfCalls());
+
+  // Test injection of DebugAction into DoRpcWithRetry.
+  query_ctx.client_request.query_options.__set_debug_action("DoRpcWithRetry:FAIL");
+  PingRequestPB request2;
+  PingResponsePB response2;
+  Status inject_status = RpcMgr::DoRpcWithRetry(failing_proxy,
+      &FailingPingServiceProxy::Ping, request2, &response2, query_ctx, "ping failed",
+      num_retries, timeout_ms, 0, "DoRpcWithRetry");
+  ASSERT_FALSE(inject_status.ok());
+  EXPECT_ERROR(inject_status, TErrorCode::INTERNAL_ERROR);
+  ASSERT_EQ("Debug Action: DoRpcWithRetry:FAIL", inject_status.msg().msg());
+}
+
+// Test RpcMgr::DoRpcWithRetry by injecting service-too-busy failures.
+TEST_F(RpcMgrTest, BusyService) {
+  TQueryCtx query_ctx;
+  auto cb = [](RpcContext* ctx) { ctx->RespondSuccess(); };
+  GeneratedServiceIf* ping_impl =
+      TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_, cb));
+  const int num_service_threads = 4;
+  const int queue_size = 25;
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+  // Find the counter which tracks the number of times the service queue is too full.
+  const string& overflow_count = Substitute(
+      ImpalaServicePool::RPC_QUEUE_OVERFLOW_METRIC_KEY, ping_impl->service_name());
+  IntCounter* overflow_metric =
+      ExecEnv::GetInstance()->rpc_metrics()->FindMetricForTesting<IntCounter>(
+          overflow_count);
+  ASSERT_TRUE(overflow_metric != nullptr);
+
+  unique_ptr<PingServiceProxy> proxy;
+  ASSERT_OK(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(
+      krpc_address_, FLAGS_hostname, &proxy));
+
+  // There have been no overflows yet.
+  EXPECT_EQ(overflow_metric->GetValue(), 0L);
+
+  // Use DebugAction to make the Impala Service Pool reject 50% of Krpc calls as if the
+  // service is too busy.
+  auto s = ScopedFlagSetter<string>::Make(
+      &FLAGS_debug_actions, "SERVICE_POOL_SERVER_BUSY:FAIL@0.5");
+  PingRequestPB request;
+  PingResponsePB response;
+  const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+  int num_retries = 40; // How many times DoRpcWithRetry can retry.
+  int num_rpc_retry_calls = 40; // How many times to call DoRpcWithRetry
+  for (int i = 0; i < num_rpc_retry_calls; ++i) {
+    Status status = RpcMgr::DoRpcWithRetry(proxy, &PingServiceProxy::Ping, request,
+        &response, query_ctx, "ping failed", num_retries, timeout_ms);
+    // DoRpcWithRetry will fail with probability (1/2)^num_rpc_retry_calls.
+    ASSERT_TRUE(status.ok());
+  }
+  // There will be no overflows (i.e. service too busy) with probability
+  // (1/2)^num_retries.
+  ASSERT_GT(overflow_metric->GetValue(), 0);
+}
+
 } // namespace impala
 
 int main(int argc, char** argv) {
diff --git a/be/src/rpc/rpc-mgr-test.h b/be/src/rpc/rpc-mgr-test.h
index 8b749b5..ad428a4 100644
--- a/be/src/rpc/rpc-mgr-test.h
+++ b/be/src/rpc/rpc-mgr-test.h
@@ -18,8 +18,6 @@
 #ifndef IMPALA_RPC_RPC_MGR_TEST_H
 #define IMPALA_RPC_RPC_MGR_TEST_H
 
-#include "rpc/rpc-mgr.inline.h"
-
 #include "common/init.h"
 #include "exec/kudu-util.h"
 #include "kudu/rpc/remote_user.h"
@@ -268,6 +266,25 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
 
 };
 
+/// A class that behaves like a ::kudu::rpc::Proxy and keeps a count of the number of
+/// times it is called. It always fails by returning an IOError.
+class FailingPingServiceProxy {
+ public:
+  kudu::Status Ping(const class PingRequestPB& req, class PingResponsePB* resp,
+      ::kudu::rpc::RpcController* controller) {
+    ++number_of_calls_;
+    return kudu::Status::IOError(
+        Substitute("ping failing, number of calls=$0.", number_of_calls_));
+  }
+
+  /// Return the number of times Ping has been called.
+  int GetNumberOfCalls() const { return number_of_calls_; }
+
+ private:
+  /// Number of times Ping has been called.
+  int number_of_calls_ = 0;
+};
+
 Status RpcMgrTest::RunMultipleServicesTest(
     RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
   // Test that a service can be started, and will respond to requests.
@@ -308,9 +325,9 @@ Status RpcMgrTest::RunMultipleServicesTest(
       KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
           "unable to execute Ping() RPC.");
       if (response.int_response() != 42) {
-          return Status(Substitute(
-              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
-                  response.int_response()));
+        return Status(
+            Substitute("Ping() failed. Incorrect response. Expected: 42; Got: $0",
+                response.int_response()));
       }
     } else {
       ScanMemRequestPB request;
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index 7829897..eb45203 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -175,6 +175,21 @@ class RpcMgr {
   /// in flight, and metrics and histograms for each service and their methods.
   void ToJson(rapidjson::Document* document);
 
+  /// Retry the Rpc 'rpc_call' on the 'proxy' object up to 'times_to_try' times.
+  /// The 'rpc_call' must be idempotent as it may be called multiple times.
+  /// Each Rpc has a timeout of 'timeout_ms' milliseconds.
+  /// If the service is busy then sleep 'server_busy_backoff_ms' milliseconds before
+  /// retrying.
+  /// Pass 'debug_action' to DebugAction() to potentially inject errors.
+  /// TODO: Clean up this interface. Replace the debug action with fault injection in RPC
+  /// callbacks or other places.
+  template <typename Proxy, typename ProxyMethod, typename Request, typename Response>
+  static Status DoRpcWithRetry(const std::unique_ptr<Proxy>& proxy,
+      const ProxyMethod& rpc_call, const Request& request, Response* response,
+      const TQueryCtx& query_ctx, const char* error_msg, const int times_to_try,
+      const int64_t timeout_ms, const int64_t server_busy_backoff_ms = 0,
+      const char* debug_action = nullptr);
+
   ~RpcMgr() {
     DCHECK_EQ(service_pools_.size(), 0)
         << "Must call Shutdown() before destroying RpcMgr";
diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h
index 9a1fc7e..76ee34d 100644
--- a/be/src/rpc/rpc-mgr.inline.h
+++ b/be/src/rpc/rpc-mgr.inline.h
@@ -49,6 +49,42 @@ Status RpcMgr::GetProxy(const TNetworkAddress& address, const std::string& hostn
   return Status::OK();
 }
 
+template <typename Proxy, typename ProxyMethod, typename Request, typename Response>
+Status RpcMgr::DoRpcWithRetry(const std::unique_ptr<Proxy>& proxy,
+    const ProxyMethod& rpc_call, const Request& request, Response* response,
+    const TQueryCtx& query_ctx, const char* error_msg, const int times_to_try,
+    const int64_t timeout_ms, const int64_t server_busy_backoff_ms,
+    const char* debug_action) {
+  DCHECK_GT(times_to_try, 0);
+  Status rpc_status;
+
+  for (int i = 0; i < times_to_try; ++i) {
+    kudu::rpc::RpcController rpc_controller;
+    rpc_controller.set_timeout(kudu::MonoDelta::FromMilliseconds(timeout_ms));
+
+    if (debug_action != nullptr) {
+      // Check for injected failures.
+      rpc_status = DebugAction(query_ctx.client_request.query_options, debug_action);
+      if (!rpc_status.ok()) continue;
+    }
+
+    const kudu::Status& k_status =
+        (proxy.get()->*rpc_call)(request, response, &rpc_controller);
+    rpc_status = FromKuduStatus(k_status, error_msg);
+
+    // If the call succeeded, or the server is not busy, then return result to caller.
+    if (rpc_status.ok() || !RpcMgr::IsServerTooBusy(rpc_controller)) {
+      return rpc_status;
+    }
+
+    // Server is busy, sleep if caller requested it and this is not the last time to try.
+    if (server_busy_backoff_ms != 0 && i != times_to_try - 1) {
+      SleepForMs(server_busy_backoff_ms);
+    }
+  }
+  return rpc_status;
+}
+
 } // namespace impala
 
 #endif
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 4935a7a..bd97712 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -26,6 +26,7 @@
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
+#include "rpc/rpc-mgr.inline.h"
 #include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator-filter-state.h"
@@ -465,12 +466,13 @@ bool Coordinator::BackendState::Cancel() {
   TUniqueIdToUniqueIdPB(query_id(), request.mutable_query_id());
   CancelQueryFInstancesResponsePB response;
 
-  auto cancel_rpc = [&](RpcController* rpc_controller) -> kudu::Status {
-    return proxy->CancelQueryFInstances(request, &response, rpc_controller);
-  };
-
-  Status rpc_status = ControlService::DoRpcWithRetry(cancel_rpc, query_ctx(),
-      "COORD_CANCEL_QUERY_FINSTANCES_RPC", "Cancel() RPC failed", 3, 10);
+  const int num_retries = 3;
+  const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+  const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
+  Status rpc_status =
+      RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::CancelQueryFInstances, request,
+          &response, query_ctx(), "Cancel() RPC failed", num_retries, timeout_ms,
+          backoff_time_ms, "COORD_CANCEL_QUERY_FINSTANCES_RPC");
 
   if (!rpc_status.ok()) {
     status_.MergeStatus(rpc_status);
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 1f8cc18..408ba0a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -417,7 +417,7 @@ class QueryState {
   void ConstructReport(bool instances_started, ReportExecStatusRequestPB* report,
       TRuntimeProfileForest* profiles_forest);
 
-  /// Gather statues and profiles of all fragment instances belonging to this query state
+  /// Gather statuses and profiles of all fragment instances belonging to this query state
   /// and send it to the coordinator via ReportExecStatus() RPC. Returns true if the
   /// report rpc was successful or if it was unsuccessful and we've reached the maximum
   /// number of allowed failures and cancelled.
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 1427fbb..6b2ab39 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -25,6 +25,7 @@
 #include "common/status.h"
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "rpc/rpc-mgr.inline.h"
 #include "runtime/backend-client.h"
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
@@ -658,12 +659,12 @@ Status ClientRequestState::ExecShutdownRequest() {
   RemoteShutdownResultPB resp;
   VLOG_QUERY << "Sending Shutdown RPC to " << TNetworkAddressToString(addr);
 
-  auto shutdown_rpc = [&](RpcController* rpc_controller) -> kudu::Status {
-    return proxy->RemoteShutdown(params, &resp, rpc_controller);
-  };
-
-  Status rpc_status = ControlService::DoRpcWithRetry(
-      shutdown_rpc, query_ctx_, "CRS_SHUTDOWN_RPC", "RemoteShutdown() RPC failed", 3, 10);
+  const int num_retries = 3;
+  const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+  const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
+  Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::RemoteShutdown,
+      params, &resp, query_ctx_, "RemoteShutdown() RPC failed", num_retries, timeout_ms,
+      backoff_time_ms, "CRS_SHUTDOWN_RPC");
 
   if (!rpc_status.ok()) {
     const string& msg = rpc_status.msg().msg();
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index 6a0267f..4a869e0 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -76,29 +76,6 @@ class ControlService : public ControlServiceIf {
   static Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
       std::unique_ptr<ControlServiceProxy>* proxy);
 
-  /// Retry the Rpc 'rpc_call' up to 'times_to_try' times.
-  /// Each Rpc has a timeout of 'timeout_s' seconds.
-  /// There is no sleeping between retries.
-  /// Pass 'debug_action' to DebugAction() to potentially inject errors.
-  template <typename F>
-  static Status DoRpcWithRetry(F&& rpc_call, const TQueryCtx& query_ctx,
-      const char* debug_action, const char* error_msg, int times_to_try, int timeout_s) {
-    DCHECK_GT(times_to_try, 0);
-    Status rpc_status;
-    for (int i = 0; i < times_to_try; i++) {
-      RpcController rpc_controller;
-      rpc_controller.set_timeout(MonoDelta::FromSeconds(timeout_s));
-      // Check for injected failures.
-      rpc_status = DebugAction(query_ctx.client_request.query_options, debug_action);
-      if (!rpc_status.ok()) continue;
-
-      rpc_status = FromKuduStatus(rpc_call(&rpc_controller), error_msg);
-      if (rpc_status.ok()) break;
-      // TODO(IMPALA-8143) Add a sleep if RpcMgr::IsServerTooBusy().
-    }
-    return rpc_status;
-  }
-
  private:
   /// Tracks the memory usage of payload in the service queue.
   std::unique_ptr<MemTracker> mem_tracker_;