You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/12 05:28:21 UTC
[08/28] impala git commit: IMPALA-6792: Fail status reporting if
coordinator refuses connections
IMPALA-6792: Fail status reporting if coordinator refuses connections
The ReportExecStatusAux() function is run on a dedicated thread per
fragment instance. This thread will run until the fragment instance
completes executing.
On every attempt to send a report to the coordinator, it will attempt
to send up to 3 RPCs. If all 3 of them fail, then the fragment instance
will cancel itself.
However, there is one case where a failure to send the RPC will not
be considered a failed RPC. If when we attempt to obtain a new
connection, we end up creating a new connection
(via ClientCache::CreateClient()) instead of getting a previously
cached connection, and this new connection fails to even Open(),
it will not be counted as a RPC failure.
This patch counts such an error as a failed RPC too.
This patch also clarifies some of the error log messages and introduces
a flag to control the sleep interval between failed ReportExecStatus RPC
retries.
Change-Id: If668838f99f78b5ffa713488178b2eb5799ba220
Reviewed-on: http://gerrit.cloudera.org:8080/9916
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/da3437a3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/da3437a3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/da3437a3
Branch: refs/heads/2.x
Commit: da3437a31b28c7fe598baf0f81e780e7f1dc82d5
Parents: bd63208
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Tue Apr 3 14:24:21 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Apr 11 22:56:00 2018 +0000
----------------------------------------------------------------------
be/src/runtime/query-state.cc | 34 +++++++++++++++++++++++++---------
1 file changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/da3437a3/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index ad5748f..04a4283 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -38,6 +38,10 @@
#include "common/names.h"
+DEFINE_int32(report_status_retry_interval_ms, 100,
+ "The interval in milliseconds to wait before retrying a failed status report RPC to "
+ "the coordinator.");
+
using namespace impala;
QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
@@ -249,28 +253,40 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
DCHECK_EQ(res.status.status_code, TErrorCode::OK);
// Try to send the RPC 3 times before failing. Sleep for 100ms between retries.
// It's safe to retry the RPC as the coordinator handles duplicate RPC messages.
+ Status client_status;
for (int i = 0; i < 3; ++i) {
- Status client_status;
ImpalaBackendConnection client(ExecEnv::GetInstance()->impalad_client_cache(),
query_ctx().coord_address, &client_status);
if (client_status.ok()) {
rpc_status = client.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res);
if (rpc_status.ok()) break;
}
- if (i < 2) SleepForMs(100);
+ if (i < 2) SleepForMs(FLAGS_report_status_retry_interval_ms);
}
Status result_status(res.status);
- if ((!rpc_status.ok() || !result_status.ok()) && instances_started) {
+ if ((!client_status.ok() || !rpc_status.ok() || !result_status.ok()) &&
+ instances_started) {
// TODO: should we try to keep rpc_status for the final report? (but the final
// report, following this Cancel(), may not succeed anyway.)
// TODO: not keeping an error status here means that all instances might
// abort with CANCELLED status, despite there being an error
- if (!rpc_status.ok()) {
- // TODO: Fix IMPALA-2990. Cancelling fragment instances here may cause query to
- // hang as the coordinator may not be aware of the cancellation. Remove the log
- // statement once IMPALA-2990 is fixed.
- LOG(ERROR) << "Cancelling fragment instances due to failure to report status. "
- << "Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+ // TODO: Fix IMPALA-2990. Cancelling fragment instances without sending the
+ // ReporExecStatus RPC may cause query to hang as the coordinator may not be aware
+ // of the cancellation. Remove the log statements once IMPALA-2990 is fixed.
+ if (!client_status.ok()) {
+ LOG(ERROR) << "Cancelling fragment instances due to failure to obtain a connection "
+ << "to the coordinator. (" << client_status.GetDetail()
+ << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+ } else if (!rpc_status.ok()) {
+ LOG(ERROR) << "Cancelling fragment instances due to failure to reach the "
+ << "coordinator. (" << rpc_status.GetDetail()
+ << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+ } else if (!result_status.ok()) {
+ // If the ReportExecStatus RPC succeeded in reaching the coordinator and we get
+ // back a non-OK status, it means that the coordinator expects us to cancel the
+ // fragment instances for this query.
+ LOG(INFO) << "Cancelling fragment instances as directed by the coordinator. "
+ << "Returned status: " << result_status.GetDetail();
}
Cancel();
}