You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/05/15 17:08:53 UTC
[impala] 05/06: IMPALA-8138: Remove FAULT_INJECTION_RPC_DELAY
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit efae8dcf3b70ec1e0ccb7bdd45084b03c0be4354
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Fri Mar 29 11:44:22 2019 -0700
IMPALA-8138: Remove FAULT_INJECTION_RPC_DELAY
This patch removes the FAULT_INJECTION_RPC_DELAY macro and replaces
its uses with DebugAction which is more flexible. For example, it
supports JITTER which injects random delays.
Every backend rpc has a debug action of the form RPC_NAME_DELAY.
DebugAction has previously always been used via query options.
However, for the rpcs considered here there is not always a query with
an accessible TQUeryOptions available (for example, we do not send any
query info with the RemoteShutdown rpc), so this patch introduces a
flag, '--debug_actions', which is used to control these rpc delay
debug actions.
Testing:
- Updated existing tests to use the new mechanism.
Change-Id: I712b188e0cdf91f431c9b94052501e5411af407b
Reviewed-on: http://gerrit.cloudera.org:8080/13060
Reviewed-by: Thomas Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/common/global-flags.cc | 9 ++++-----
be/src/service/control-service.cc | 15 ++++++++-------
be/src/service/data-stream-service.cc | 4 +++-
be/src/service/impala-internal-service.cc | 8 +++++---
be/src/testutil/fault-injection-util.cc | 16 ----------------
be/src/testutil/fault-injection-util.h | 24 ------------------------
be/src/util/debug-util.cc | 6 ++----
be/src/util/debug-util.h | 27 +++++++++++++++++----------
tests/custom_cluster/test_rpc_timeout.py | 28 +++++++++++++++-------------
9 files changed, 54 insertions(+), 83 deletions(-)
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ca1261a..427e777 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -136,11 +136,6 @@ DEFINE_int32(stress_datastream_recvr_delay_ms, 0, "A stress option that causes d
"stream receiver registration to be delayed. Effective in debug builds only.");
DEFINE_bool(skip_file_runtime_filtering, false, "Skips file-based runtime filtering for"
"testing purposes. Effective in debug builds only.");
-DEFINE_int32(fault_injection_rpc_delay_ms, 0, "A fault injection option that causes "
- "rpc server handling to be delayed to trigger an RPC timeout on the caller side. "
- "Effective in debug builds only.");
-DEFINE_int32(fault_injection_rpc_type, 0, "A fault injection option that specifies "
- "which rpc call will be injected with the delay. Effective in debug builds only.");
DEFINE_int32(fault_injection_rpc_exception_type, 0, "A fault injection option that "
"specifies the exception to be thrown in the caller side of an RPC call. Effective "
"in debug builds only");
@@ -156,6 +151,10 @@ DEFINE_int32(stress_disk_read_delay_ms, 0, "A stress option that injects extra d
" in milliseconds when the I/O manager is reading from disk.");
#endif
+DEFINE_string(debug_actions, "", "For testing only. Uses the same format as the debug "
+ "action query options, but allows for injection of debug actions in code paths where "
+ "query options are not available.");
+
// Used for testing the path where the Kudu client is stubbed.
DEFINE_bool(disable_kudu, false, "If true, Kudu features will be disabled.");
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index d48b873..2c903d1 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -49,6 +49,7 @@ DEFINE_string(control_service_queue_mem_limit, "50MB", QUEUE_LIMIT_MSG.c_str());
DEFINE_int32(control_service_num_svc_threads, 0, "Number of threads for processing "
"control service's RPCs. if left at default value 0, it will be set to number of "
"CPU cores. Set it to a positive value to change from the default.");
+DECLARE_string(debug_actions);
namespace impala {
@@ -116,6 +117,9 @@ void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
shared_ptr<ClientRequestState> request_state =
ExecEnv::GetInstance()->impala_server()->GetClientRequestState(query_id);
+ // This failpoint is to allow jitter to be injected.
+ DebugActionNoFail(FLAGS_debug_actions, "REPORT_EXEC_STATUS_DELAY");
+
if (request_state.get() == nullptr) {
// This is expected occasionally (since a report RPC might be in flight while
// cancellation is happening). Return an error to the caller to get it to stop.
@@ -128,9 +132,6 @@ void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
return;
}
- // This failpoint is to allow jitter to be injected.
- DebugActionNoFail(request_state->query_options(), "REPORT_EXEC_STATUS_DELAY");
-
// The runtime profile is sent as a Thrift serialized buffer via sidecar. Get the
// sidecar and deserialize the thrift profile if there is any. The sender may have
// failed to serialize the Thrift profile so an empty thrift profile is valid.
@@ -167,8 +168,8 @@ void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB*
DCHECK(request->has_query_id());
const TUniqueId& query_id = ProtoToQueryId(request->query_id());
VLOG_QUERY << "CancelQueryFInstances(): query_id=" << PrintId(query_id);
- // TODO(IMPALA-8143) Use DebugAction for fault injection.
- FAULT_INJECTION_RPC_DELAY(RPC_CANCELQUERYFINSTANCES);
+ // This failpoint is to allow jitter to be injected.
+ DebugActionNoFail(FLAGS_debug_actions, "CANCEL_QUERY_FINSTANCES_DELAY");
QueryState::ScopedRef qs(query_id);
if (qs.get() == nullptr) {
Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
@@ -182,8 +183,8 @@ void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB*
void ControlService::RemoteShutdown(const RemoteShutdownParamsPB* req,
RemoteShutdownResultPB* response, RpcContext* rpc_context) {
- // TODO(IMPALA-8143) Use DebugAction for fault injection.
- FAULT_INJECTION_RPC_DELAY(RPC_REMOTESHUTDOWN);
+ // This failpoint is to allow jitter to be injected.
+ DebugActionNoFail(FLAGS_debug_actions, "REMOTE_SHUTDOWN_DELAY");
Status status = ExecEnv::GetInstance()->impala_server()->StartShutdown(
req->has_deadline_s() ? req->deadline_s() : -1,
response->mutable_shutdown_status());
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 1859139..890ceec 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -49,6 +49,7 @@ DEFINE_string(datastream_service_queue_mem_limit, "5%", QUEUE_LIMIT_MSG.c_str())
DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for processing "
"datastream services' RPCs. If left at default value 0, it will be set to number of "
"CPU cores. Set it to a positive value to change from the default.");
+DECLARE_string(debug_actions);
namespace impala {
@@ -94,13 +95,14 @@ bool DataStreamService::Authorize(const google::protobuf::Message* req,
void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
EndDataStreamResponsePB* response, RpcContext* rpc_context) {
+ DebugActionNoFail(FLAGS_debug_actions, "END_DATA_STREAM_DELAY");
// CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here.
ExecEnv::GetInstance()->stream_mgr()->CloseSender(request, response, rpc_context);
}
void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
TransmitDataResponsePB* response, RpcContext* rpc_context) {
- FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
+ DebugActionNoFail(FLAGS_debug_actions, "TRANSMIT_DATA_DELAY");
// AddData() is guaranteed to eventually respond to this RPC so we don't do it here.
ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
}
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index d4ed14d..5260253 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -32,6 +32,8 @@
using namespace impala;
+DECLARE_string(debug_actions);
+
ImpalaInternalService::ImpalaInternalService() {
impala_server_ = ExecEnv::GetInstance()->impala_server();
DCHECK(impala_server_ != nullptr);
@@ -41,7 +43,7 @@ ImpalaInternalService::ImpalaInternalService() {
void ImpalaInternalService::ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
const TExecQueryFInstancesParams& params) {
- FAULT_INJECTION_RPC_DELAY(RPC_EXECQUERYFINSTANCES);
+ DebugActionNoFail(FLAGS_debug_actions, "EXEC_QUERY_FINSTANCES_DELAY");
DCHECK(params.__isset.coord_state_idx);
DCHECK(params.__isset.query_ctx);
DCHECK(params.__isset.fragment_ctxs);
@@ -68,7 +70,7 @@ template <typename T> void SetUnknownIdError(
void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
const TUpdateFilterParams& params) {
- FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
+ DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
DCHECK(params.__isset.filter_id);
DCHECK(params.__isset.query_id);
DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
@@ -77,7 +79,7 @@ void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
const TPublishFilterParams& params) {
- FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER);
+ DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
DCHECK(params.__isset.filter_id);
DCHECK(params.__isset.dst_query_id);
DCHECK(params.__isset.dst_fragment_idx);
diff --git a/be/src/testutil/fault-injection-util.cc b/be/src/testutil/fault-injection-util.cc
index 14e00ef..48d3a9e 100644
--- a/be/src/testutil/fault-injection-util.cc
+++ b/be/src/testutil/fault-injection-util.cc
@@ -28,8 +28,6 @@
#include "common/names.h"
-DECLARE_int32(fault_injection_rpc_delay_ms);
-DECLARE_int32(fault_injection_rpc_type);
DECLARE_int32(fault_injection_rpc_exception_type);
namespace impala {
@@ -37,20 +35,6 @@ namespace impala {
using apache::thrift::transport::TTransportException;
using apache::thrift::transport::TSSLException;
-int32_t FaultInjectionUtil::GetTargetRPCType() {
- int32_t target_rpc_type = FLAGS_fault_injection_rpc_type;
- if (target_rpc_type == RPC_RANDOM) target_rpc_type = rand() % RPC_RANDOM;
- DCHECK_LT(target_rpc_type, RPC_RANDOM);
- return target_rpc_type;
-}
-
-void FaultInjectionUtil::InjectRpcDelay(RpcCallType my_type) {
- int32_t delay_ms = FLAGS_fault_injection_rpc_delay_ms;
- if (delay_ms == 0) return;
- int32_t target_rpc_type = GetTargetRPCType();
- if (target_rpc_type == my_type) SleepForMs(delay_ms);
-}
-
void FaultInjectionUtil::InjectRpcException(bool is_send, int freq) {
static AtomicInt32 send_count(-1);
static AtomicInt32 recv_count(-1);
diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h
index f545e1f..0ab77d0 100644
--- a/be/src/testutil/fault-injection-util.h
+++ b/be/src/testutil/fault-injection-util.h
@@ -26,17 +26,6 @@ namespace impala {
class FaultInjectionUtil {
public:
- enum RpcCallType {
- RPC_NULL = 0,
- RPC_EXECQUERYFINSTANCES,
- RPC_CANCELQUERYFINSTANCES,
- RPC_PUBLISHFILTER,
- RPC_UPDATEFILTER,
- RPC_TRANSMITDATA,
- RPC_REPORTEXECSTATUS,
- RPC_REMOTESHUTDOWN,
- RPC_RANDOM // This must be last.
- };
enum RpcExceptionType {
RPC_EXCEPTION_NONE = 0,
@@ -52,26 +41,14 @@ class FaultInjectionUtil {
RPC_EXCEPTION_SSL_RECV_TIMEDOUT,
};
- /// Test util function that injects delays to specified RPC server handling function
- /// so that RPC caller could hit the RPC recv timeout condition.
- /// 'my_type' specifies which RPC type of the current function.
- /// FLAGS_fault_injection_rpc_type specifies which RPC function the delay should
- /// be enabled. FLAGS_fault_injection_rpc_delay_ms specifies the delay in ms.
- static void InjectRpcDelay(RpcCallType my_type);
-
/// Test util function that injects exceptions to RPC client functions.
/// 'is_send' indicates whether injected fault is at the send() or recv() of an RPC.
/// The exception specified in 'FLAGS_fault_injection_rpc_exception_type' is injected
/// on every 'freq' invocations of this function.
static void InjectRpcException(bool is_send, int freq);
- private:
- static int32_t GetTargetRPCType();
-
};
-#define FAULT_INJECTION_RPC_DELAY(type) \
- FaultInjectionUtil::InjectRpcDelay(FaultInjectionUtil::type)
#define FAULT_INJECTION_SEND_RPC_EXCEPTION(freq) \
FaultInjectionUtil::InjectRpcException(true, freq)
#define FAULT_INJECTION_RECV_RPC_EXCEPTION(freq) \
@@ -79,7 +56,6 @@ class FaultInjectionUtil {
#else // NDEBUG
-#define FAULT_INJECTION_RPC_DELAY(type)
#define FAULT_INJECTION_SEND_RPC_EXCEPTION(freq)
#define FAULT_INJECTION_RECV_RPC_EXCEPTION(freq)
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 552c16c..7029a71 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -328,10 +328,8 @@ static bool ParseProbability(const string& prob_str, bool* should_execute) {
return true;
}
-Status DebugActionImpl(
- const TQueryOptions& query_options, const char* label) {
- const DebugActionTokens& action_list = TokenizeDebugActions(
- query_options.debug_action);
+Status DebugActionImpl(const string& debug_action, const char* label) {
+ const DebugActionTokens& action_list = TokenizeDebugActions(debug_action);
static const char ERROR_MSG[] = "Invalid debug_action $0:$1 ($2)";
for (const vector<string>& components : action_list) {
// size() != 2 check filters out ExecNode debug actions.
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 1ca53b5..b2235ac 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -140,19 +140,29 @@ DebugActionTokens TokenizeDebugActions(const string& debug_actions);
/// becomes {"x", "y"} and "x" becomes {"x"}.
std::vector<std::string> TokenizeDebugActionParams(const string& action);
-/// Slow path implementing DebugAction() for the case where
-/// 'query_options.debug_action' is non-empty.
-Status DebugActionImpl(
- const TQueryOptions& query_options, const char* label) WARN_UNUSED_RESULT;
+/// Slow path implementing DebugAction() for the case where 'debug_action' is non-empty.
+Status DebugActionImpl(const string& debug_action, const char* label) WARN_UNUSED_RESULT;
/// If debug_action query option has a "global action" (i.e. not exec-node specific)
/// and matches the given 'label', apply the the action. See ImpalaService.thrift for
/// details of the format and available global actions. For ExecNode code, use
/// ExecNode::ExecDebugAction() instead.
WARN_UNUSED_RESULT static inline Status DebugAction(
+ const string& debug_action, const char* label) {
+ if (LIKELY(debug_action.empty())) return Status::OK();
+ return DebugActionImpl(debug_action, label);
+}
+
+WARN_UNUSED_RESULT static inline Status DebugAction(
const TQueryOptions& query_options, const char* label) {
- if (LIKELY(query_options.debug_action.empty())) return Status::OK();
- return DebugActionImpl(query_options, label);
+ return DebugAction(query_options.debug_action, label);
+}
+
+static inline void DebugActionNoFail(const string& debug_action, const char* label) {
+ Status status = DebugAction(debug_action, label);
+ if (!status.ok()) {
+ LOG(ERROR) << "Ignoring debug action failure: " << status.GetDetail();
+ }
}
/// Like DebugAction() but for use in contexts that can't safely propagate an error
@@ -160,10 +170,7 @@ WARN_UNUSED_RESULT static inline Status DebugAction(
/// and ignored.
static inline void DebugActionNoFail(
const TQueryOptions& query_options, const char* label) {
- Status status = DebugAction(query_options, label);
- if (!status.ok()) {
- LOG(ERROR) << "Ignoring debug action failure: " << status.GetDetail();
- }
+ DebugActionNoFail(query_options.debug_action, label);
}
// FILE_CHECKs are conditions that we expect to be true but could fail due to a malformed
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 40b02fc..166b385 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -85,7 +85,7 @@ class TestRPCTimeout(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
- " --fault_injection_rpc_delay_ms=1000 --fault_injection_rpc_type=1"
+ " --debug_actions=EXEC_QUERY_FINSTANCES_DELAY:SLEEP@1000"
" --datastream_sender_timeout_ms=30000")
def test_execqueryfinstances_race(self, vector):
""" Test for IMPALA-7464, where the rpc times out while the rpc handler continues to
@@ -94,7 +94,7 @@ class TestRPCTimeout(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
- " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=1"
+ " --debug_actions=EXEC_QUERY_FINSTANCES_DELAY:SLEEP@3000"
" --datastream_sender_timeout_ms=30000")
def test_execqueryfinstances_timeout(self, vector):
for i in range(3):
@@ -109,7 +109,7 @@ class TestRPCTimeout(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
- " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=2"
+ " --debug_actions=CANCEL_QUERY_FINSTANCES_DELAY:SLEEP@3000"
" --datastream_sender_timeout_ms=30000")
def test_cancelplanfragment_timeout(self, vector):
query = "select * from tpch.lineitem limit 5000"
@@ -117,20 +117,22 @@ class TestRPCTimeout(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
- " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=3")
+ " --debug_actions=PUBLISH_FILTER_DELAY:SLEEP@3000")
def test_publishfilter_timeout(self, vector):
self.execute_runtime_filter_query()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
- " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=4")
+ " --debug_actions=UPDATE_FILTER_DELAY:SLEEP@3000")
def test_updatefilter_timeout(self, vector):
self.execute_runtime_filter_query()
+ all_rpcs = ["EXEC_QUERY_FINSTANCES", "CANCEL_QUERY_FINSTANCES", "PUBLISH_FILTER",
+ "UPDATE_FILTER", "TRANSMIT_DATA", "END_DATA_STREAM", "REMOTE_SHUTDOWN"]
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
- " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=7"
- " --datastream_sender_timeout_ms=30000")
+ " --datastream_sender_timeout_ms=30000 --debug_actions=%s" %
+ "|".join(map(lambda rpc: "%s_DELAY:JITTER@3000@0.1" % rpc, all_rpcs)))
def test_random_rpc_timeout(self, vector):
self.execute_query_verify_metrics(self.TEST_QUERY, None, 10)
@@ -138,15 +140,15 @@ class TestRPCTimeout(CustomClusterTestSuite):
# Useful for triggering IMPALA-8274.
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--status_report_interval_ms=100"
- " --backend_client_rpc_timeout_ms=100")
+ " --backend_client_rpc_timeout_ms=100"
+ " --debug_actions=REPORT_EXEC_STATUS_DELAY:JITTER@110@0.7")
def test_reportexecstatus_jitter(self, vector):
LONG_RUNNING_QUERY = "with v as (select t1.ss_hdemo_sk as xk " +\
"from tpcds_parquet.store_sales t1, tpcds_parquet.store_sales t2 " +\
"where t1.ss_hdemo_sk = t2.ss_hdemo_sk) " +\
"select count(*) from v, tpcds_parquet.household_demographics t3 " +\
"where v.xk = t3.hd_demo_sk"
- query_options = {'debug_action': 'REPORT_EXEC_STATUS_DELAY:JITTER@110@0.7'}
- self.execute_query_verify_metrics(LONG_RUNNING_QUERY, query_options, 1)
+ self.execute_query_verify_metrics(LONG_RUNNING_QUERY, None, 1)
# Use a small service queue memory limit and a single service thread to exercise
# the retry paths in the ReportExecStatus() RPC
@@ -165,16 +167,16 @@ class TestRPCTimeout(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=100"
- " --status_report_interval_ms=1000 --status_report_max_retry_s=1000")
+ " --status_report_interval_ms=1000 --status_report_max_retry_s=1000"
+ " --debug_actions=REPORT_EXEC_STATUS_DELAY:SLEEP@1000")
def test_reportexecstatus_retries(self, unique_database):
tbl = "%s.kudu_test" % unique_database
self.execute_query("create table %s (a int primary key) stored as kudu" % tbl)
# Since the sleep time (1000ms) is much longer than the rpc timeout (100ms), all
# reports will appear to fail. The query is designed to result in many intermediate
# status reports but fewer than the max allowed failures, so the query should succeed.
- query_options = {'debug_action': 'REPORT_EXEC_STATUS_DELAY:SLEEP@1000'}
result = self.execute_query(
- "insert into %s select 0 from tpch.lineitem limit 100000" % tbl, query_options)
+ "insert into %s select 0 from tpch.lineitem limit 100000" % tbl)
assert result.success, str(result)
# Ensure that the error log was tracked correctly - all but the first row inserted
# should result in a 'key already present' insert error.