You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/13 06:37:21 UTC
[impala] 07/07: IMPALA-8143: Enhance DoRpcWithRetry().
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 94652d74521e95e8606ea2d22aabcaddde6fc471
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Wed May 22 17:25:04 2019 -0700
IMPALA-8143: Enhance DoRpcWithRetry().
Allow callers of RpcMgr::DoRpcWithRetry to specify a time to sleep if
the remote service is busy. DoRpcWithRetry now only sleeps if the remote
service is busy.
TESTING:
Ran all end-to-end tests.
Add two new tests to rpc-mgr-test.cc which test RpcMgr::DoRpcWithRetry.
One test uses a fake Proxy class, the other uses a new DebugAction to
cause Krpc calls to be rejected as if the service was too busy.
Change-Id: Ia9693151c35e02235665b3c285a48c585973d390
Reviewed-on: http://gerrit.cloudera.org:8080/12672
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/rpc/impala-service-pool.cc | 13 ++++-
be/src/rpc/impala-service-pool.h | 3 +
be/src/rpc/rpc-mgr-test.cc | 87 ++++++++++++++++++++++++++++-
be/src/rpc/rpc-mgr-test.h | 27 +++++++--
be/src/rpc/rpc-mgr.h | 15 +++++
be/src/rpc/rpc-mgr.inline.h | 36 ++++++++++++
be/src/runtime/coordinator-backend-state.cc | 14 +++--
be/src/runtime/query-state.h | 2 +-
be/src/service/client-request-state.cc | 13 +++--
be/src/service/control-service.h | 23 --------
10 files changed, 189 insertions(+), 44 deletions(-)
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 44a0c64..9db137a 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -50,9 +50,11 @@ METRIC_DEFINE_histogram(server, impala_incoming_queue_time,
using namespace rapidjson;
+DECLARE_string(debug_actions);
+
namespace impala {
-// Metric key format for rpc call duration metrics.
-const string RPC_QUEUE_OVERFLOW_METRIC_KEY = "rpc.$0.rpcs_queue_overflow";
+const char * ImpalaServicePool::RPC_QUEUE_OVERFLOW_METRIC_KEY =
+ "rpc.$0.rpcs_queue_overflow";
ImpalaServicePool::ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
int service_queue_length, kudu::rpc::GeneratedServiceIf* service,
@@ -186,6 +188,13 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
service_mem_tracker_->Consume(transfer_size);
}
+ Status debug_status = DebugAction(FLAGS_debug_actions, "SERVICE_POOL_SERVER_BUSY");
+ if (UNLIKELY(!debug_status.ok())) {
+ // Simulate the service being too busy.
+ RejectTooBusy(c);
+ return kudu::Status::OK();
+ }
+
boost::optional<kudu::rpc::InboundCall*> evicted;
auto queue_status = service_queue_.Put(c, &evicted);
if (UNLIKELY(queue_status == kudu::rpc::QueueStatus::QUEUE_FULL)) {
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index b4313bf..dd56436 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -68,6 +68,9 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
/// Expose the service pool metrics by storing them as JSON in 'value'.
void ToJson(rapidjson::Value* value, rapidjson::Document* document);
+ /// Metric key format for rpc call duration metrics.
+ static const char* RPC_QUEUE_OVERFLOW_METRIC_KEY;
+
private:
void RunThread();
void RejectTooBusy(kudu::rpc::InboundCall* c);
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 07dbfe7..c0d4fbd 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -31,12 +31,14 @@ DECLARE_int32(num_reactor_threads);
DECLARE_int32(num_acceptor_threads);
DECLARE_int32(rpc_negotiation_timeout_ms);
DECLARE_string(hostname);
+DECLARE_string(debug_actions);
// For tests that do not require kerberized testing, we use RpcTest.
namespace impala {
+// Test multiple services managed by an Rpc Manager using TLS.
TEST_F(RpcMgrTest, MultipleServicesTls) {
- // TODO: We're starting a seperate RpcMgr here instead of configuring
+ // TODO: We're starting a separate RpcMgr here instead of configuring
// RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
// new gtest params to turn on TLS which needs to be a coordinated change across
// rpc-mgr-test and thrift-server-test.
@@ -55,6 +57,7 @@ TEST_F(RpcMgrTest, MultipleServicesTls) {
tls_rpc_mgr.Shutdown();
}
+// Test multiple services managed by an Rpc Manager.
TEST_F(RpcMgrTest, MultipleServices) {
ASSERT_OK(RunMultipleServicesTest(&rpc_mgr_, krpc_address_));
}
@@ -163,6 +166,7 @@ TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
tls_rpc_mgr.Shutdown();
}
+// Test behavior with a slow service.
TEST_F(RpcMgrTest, SlowCallback) {
// Use a callback which is slow to respond.
auto slow_cb = [](RpcContext* ctx) {
@@ -199,6 +203,7 @@ TEST_F(RpcMgrTest, SlowCallback) {
}
}
+// Test async calls.
TEST_F(RpcMgrTest, AsyncCall) {
GeneratedServiceIf* scan_mem_impl =
TakeOverService(make_unique<ScanMemServiceImpl>(&rpc_mgr_));
@@ -251,6 +256,86 @@ TEST_F(RpcMgrTest, NegotiationTimeout) {
secondary_rpc_mgr.Shutdown();
}
+// Test RpcMgr::DoRpcWithRetry using a fake proxy.
+TEST_F(RpcMgrTest, DoRpcWithRetry) {
+ TQueryCtx query_ctx;
+ const int num_retries = 10;
+ const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+
+ // Test how DoRpcWithRetry retries by using a proxy that always fails.
+ unique_ptr<FailingPingServiceProxy> failing_proxy =
+ make_unique<FailingPingServiceProxy>();
+ // A call that fails is not retried as the server is not busy.
+ PingRequestPB request1;
+ PingResponsePB response1;
+ Status rpc_status_fail =
+ RpcMgr::DoRpcWithRetry(failing_proxy, &FailingPingServiceProxy::Ping, request1,
+ &response1, query_ctx, "ping failed", num_retries, timeout_ms);
+ ASSERT_FALSE(rpc_status_fail.ok());
+ // Check that proxy was only called once.
+ ASSERT_EQ(1, failing_proxy->GetNumberOfCalls());
+
+ // Test injection of DebugAction into DoRpcWithRetry.
+ query_ctx.client_request.query_options.__set_debug_action("DoRpcWithRetry:FAIL");
+ PingRequestPB request2;
+ PingResponsePB response2;
+ Status inject_status = RpcMgr::DoRpcWithRetry(failing_proxy,
+ &FailingPingServiceProxy::Ping, request2, &response2, query_ctx, "ping failed",
+ num_retries, timeout_ms, 0, "DoRpcWithRetry");
+ ASSERT_FALSE(inject_status.ok());
+ EXPECT_ERROR(inject_status, TErrorCode::INTERNAL_ERROR);
+ ASSERT_EQ("Debug Action: DoRpcWithRetry:FAIL", inject_status.msg().msg());
+}
+
+// Test RpcMgr::DoRpcWithRetry by injecting service-too-busy failures.
+TEST_F(RpcMgrTest, BusyService) {
+ TQueryCtx query_ctx;
+ auto cb = [](RpcContext* ctx) { ctx->RespondSuccess(); };
+ GeneratedServiceIf* ping_impl =
+ TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_, cb));
+ const int num_service_threads = 4;
+ const int queue_size = 25;
+ ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl,
+ static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
+ FLAGS_num_acceptor_threads = 2;
+ FLAGS_num_reactor_threads = 10;
+ ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+ // Find the counter which tracks the number of times the service queue is too full.
+ const string& overflow_count = Substitute(
+ ImpalaServicePool::RPC_QUEUE_OVERFLOW_METRIC_KEY, ping_impl->service_name());
+ IntCounter* overflow_metric =
+ ExecEnv::GetInstance()->rpc_metrics()->FindMetricForTesting<IntCounter>(
+ overflow_count);
+ ASSERT_TRUE(overflow_metric != nullptr);
+
+ unique_ptr<PingServiceProxy> proxy;
+ ASSERT_OK(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(
+ krpc_address_, FLAGS_hostname, &proxy));
+
+ // There have been no overflows yet.
+ EXPECT_EQ(overflow_metric->GetValue(), 0L);
+
+ // Use DebugAction to make the Impala Service Pool reject 50% of Krpc calls as if the
+ // service is too busy.
+ auto s = ScopedFlagSetter<string>::Make(
+ &FLAGS_debug_actions, "SERVICE_POOL_SERVER_BUSY:FAIL@0.5");
+ PingRequestPB request;
+ PingResponsePB response;
+ const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+ int num_retries = 40; // How many times DoRpcWithRetry can retry.
+ int num_rpc_retry_calls = 40; // How many times to call DoRpcWithRetry
+ for (int i = 0; i < num_rpc_retry_calls; ++i) {
+ Status status = RpcMgr::DoRpcWithRetry(proxy, &PingServiceProxy::Ping, request,
+ &response, query_ctx, "ping failed", num_retries, timeout_ms);
+ // DoRpcWithRetry will fail with probability (1/2)^num_rpc_retry_calls.
+ ASSERT_TRUE(status.ok());
+ }
+ // There will be no overflows (i.e. service too busy) with probability
+ // (1/2)^num_retries.
+ ASSERT_GT(overflow_metric->GetValue(), 0);
+}
+
} // namespace impala
int main(int argc, char** argv) {
diff --git a/be/src/rpc/rpc-mgr-test.h b/be/src/rpc/rpc-mgr-test.h
index 8b749b5..ad428a4 100644
--- a/be/src/rpc/rpc-mgr-test.h
+++ b/be/src/rpc/rpc-mgr-test.h
@@ -18,8 +18,6 @@
#ifndef IMPALA_RPC_RPC_MGR_TEST_H
#define IMPALA_RPC_RPC_MGR_TEST_H
-#include "rpc/rpc-mgr.inline.h"
-
#include "common/init.h"
#include "exec/kudu-util.h"
#include "kudu/rpc/remote_user.h"
@@ -268,6 +266,25 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
};
+/// A class that behaves like a ::kudu::rpc::Proxy and keeps a count of the number of
+/// times it is called. It always fails by returning an IOError.
+class FailingPingServiceProxy {
+ public:
+ kudu::Status Ping(const class PingRequestPB& req, class PingResponsePB* resp,
+ ::kudu::rpc::RpcController* controller) {
+ ++number_of_calls_;
+ return kudu::Status::IOError(
+ Substitute("ping failing, number of calls=$0.", number_of_calls_));
+ }
+
+ /// Return the number of times Ping has been called.
+ int GetNumberOfCalls() const { return number_of_calls_; }
+
+ private:
+ /// Number of times Ping has been called.
+ int number_of_calls_ = 0;
+};
+
Status RpcMgrTest::RunMultipleServicesTest(
RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
// Test that a service can be started, and will respond to requests.
@@ -308,9 +325,9 @@ Status RpcMgrTest::RunMultipleServicesTest(
KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
"unable to execute Ping() RPC.");
if (response.int_response() != 42) {
- return Status(Substitute(
- "Ping() failed. Incorrect response. Expected: 42; Got: $0",
- response.int_response()));
+ return Status(
+ Substitute("Ping() failed. Incorrect response. Expected: 42; Got: $0",
+ response.int_response()));
}
} else {
ScanMemRequestPB request;
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index 7829897..eb45203 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -175,6 +175,21 @@ class RpcMgr {
/// in flight, and metrics and histograms for each service and their methods.
void ToJson(rapidjson::Document* document);
+ /// Retry the Rpc 'rpc_call' on the 'proxy' object up to 'times_to_try' times.
+ /// The 'rpc_call' must be idempotent as it may be called multiple times.
+ /// Each Rpc has a timeout of 'timeout_ms' milliseconds.
+ /// If the service is busy then sleep 'server_busy_backoff_ms' milliseconds before
+ /// retrying.
+ /// Pass 'debug_action' to DebugAction() to potentially inject errors.
+ /// TODO: Clean up this interface. Replace the debug action with fault injection in RPC
+ /// callbacks or other places.
+ template <typename Proxy, typename ProxyMethod, typename Request, typename Response>
+ static Status DoRpcWithRetry(const std::unique_ptr<Proxy>& proxy,
+ const ProxyMethod& rpc_call, const Request& request, Response* response,
+ const TQueryCtx& query_ctx, const char* error_msg, const int times_to_try,
+ const int64_t timeout_ms, const int64_t server_busy_backoff_ms = 0,
+ const char* debug_action = nullptr);
+
~RpcMgr() {
DCHECK_EQ(service_pools_.size(), 0)
<< "Must call Shutdown() before destroying RpcMgr";
diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h
index 9a1fc7e..76ee34d 100644
--- a/be/src/rpc/rpc-mgr.inline.h
+++ b/be/src/rpc/rpc-mgr.inline.h
@@ -49,6 +49,42 @@ Status RpcMgr::GetProxy(const TNetworkAddress& address, const std::string& hostn
return Status::OK();
}
+template <typename Proxy, typename ProxyMethod, typename Request, typename Response>
+Status RpcMgr::DoRpcWithRetry(const std::unique_ptr<Proxy>& proxy,
+ const ProxyMethod& rpc_call, const Request& request, Response* response,
+ const TQueryCtx& query_ctx, const char* error_msg, const int times_to_try,
+ const int64_t timeout_ms, const int64_t server_busy_backoff_ms,
+ const char* debug_action) {
+ DCHECK_GT(times_to_try, 0);
+ Status rpc_status;
+
+ for (int i = 0; i < times_to_try; ++i) {
+ kudu::rpc::RpcController rpc_controller;
+ rpc_controller.set_timeout(kudu::MonoDelta::FromMilliseconds(timeout_ms));
+
+ if (debug_action != nullptr) {
+ // Check for injected failures.
+ rpc_status = DebugAction(query_ctx.client_request.query_options, debug_action);
+ if (!rpc_status.ok()) continue;
+ }
+
+ const kudu::Status& k_status =
+ (proxy.get()->*rpc_call)(request, response, &rpc_controller);
+ rpc_status = FromKuduStatus(k_status, error_msg);
+
+ // If the call succeeded, or the server is not busy, then return result to caller.
+ if (rpc_status.ok() || !RpcMgr::IsServerTooBusy(rpc_controller)) {
+ return rpc_status;
+ }
+
+ // Server is busy, sleep if caller requested it and this is not the last time to try.
+ if (server_busy_backoff_ms != 0 && i != times_to_try - 1) {
+ SleepForMs(server_busy_backoff_ms);
+ }
+ }
+ return rpc_status;
+}
+
} // namespace impala
#endif
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 4935a7a..bd97712 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -26,6 +26,7 @@
#include "kudu/rpc/rpc_controller.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
+#include "rpc/rpc-mgr.inline.h"
#include "runtime/backend-client.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator-filter-state.h"
@@ -465,12 +466,13 @@ bool Coordinator::BackendState::Cancel() {
TUniqueIdToUniqueIdPB(query_id(), request.mutable_query_id());
CancelQueryFInstancesResponsePB response;
- auto cancel_rpc = [&](RpcController* rpc_controller) -> kudu::Status {
- return proxy->CancelQueryFInstances(request, &response, rpc_controller);
- };
-
- Status rpc_status = ControlService::DoRpcWithRetry(cancel_rpc, query_ctx(),
- "COORD_CANCEL_QUERY_FINSTANCES_RPC", "Cancel() RPC failed", 3, 10);
+ const int num_retries = 3;
+ const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+ const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
+ Status rpc_status =
+ RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::CancelQueryFInstances, request,
+ &response, query_ctx(), "Cancel() RPC failed", num_retries, timeout_ms,
+ backoff_time_ms, "COORD_CANCEL_QUERY_FINSTANCES_RPC");
if (!rpc_status.ok()) {
status_.MergeStatus(rpc_status);
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 1f8cc18..408ba0a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -417,7 +417,7 @@ class QueryState {
void ConstructReport(bool instances_started, ReportExecStatusRequestPB* report,
TRuntimeProfileForest* profiles_forest);
- /// Gather statues and profiles of all fragment instances belonging to this query state
+ /// Gather statuses and profiles of all fragment instances belonging to this query state
/// and send it to the coordinator via ReportExecStatus() RPC. Returns true if the
/// report rpc was successful or if it was unsuccessful and we've reached the maximum
/// number of allowed failures and cancelled.
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 1427fbb..6b2ab39 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -25,6 +25,7 @@
#include "common/status.h"
#include "exec/kudu-util.h"
#include "kudu/rpc/rpc_controller.h"
+#include "rpc/rpc-mgr.inline.h"
#include "runtime/backend-client.h"
#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
@@ -658,12 +659,12 @@ Status ClientRequestState::ExecShutdownRequest() {
RemoteShutdownResultPB resp;
VLOG_QUERY << "Sending Shutdown RPC to " << TNetworkAddressToString(addr);
- auto shutdown_rpc = [&](RpcController* rpc_controller) -> kudu::Status {
- return proxy->RemoteShutdown(params, &resp, rpc_controller);
- };
-
- Status rpc_status = ControlService::DoRpcWithRetry(
- shutdown_rpc, query_ctx_, "CRS_SHUTDOWN_RPC", "RemoteShutdown() RPC failed", 3, 10);
+ const int num_retries = 3;
+ const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+ const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
+ Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::RemoteShutdown,
+ params, &resp, query_ctx_, "RemoteShutdown() RPC failed", num_retries, timeout_ms,
+ backoff_time_ms, "CRS_SHUTDOWN_RPC");
if (!rpc_status.ok()) {
const string& msg = rpc_status.msg().msg();
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index 6a0267f..4a869e0 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -76,29 +76,6 @@ class ControlService : public ControlServiceIf {
static Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
std::unique_ptr<ControlServiceProxy>* proxy);
- /// Retry the Rpc 'rpc_call' up to 'times_to_try' times.
- /// Each Rpc has a timeout of 'timeout_s' seconds.
- /// There is no sleeping between retries.
- /// Pass 'debug_action' to DebugAction() to potentially inject errors.
- template <typename F>
- static Status DoRpcWithRetry(F&& rpc_call, const TQueryCtx& query_ctx,
- const char* debug_action, const char* error_msg, int times_to_try, int timeout_s) {
- DCHECK_GT(times_to_try, 0);
- Status rpc_status;
- for (int i = 0; i < times_to_try; i++) {
- RpcController rpc_controller;
- rpc_controller.set_timeout(MonoDelta::FromSeconds(timeout_s));
- // Check for injected failures.
- rpc_status = DebugAction(query_ctx.client_request.query_options, debug_action);
- if (!rpc_status.ok()) continue;
-
- rpc_status = FromKuduStatus(rpc_call(&rpc_controller), error_msg);
- if (rpc_status.ok()) break;
- // TODO(IMPALA-8143) Add a sleep if RpcMgr::IsServerTooBusy().
- }
- return rpc_status;
- }
-
private:
/// Tracks the memory usage of payload in the service queue.
std::unique_ptr<MemTracker> mem_tracker_;