You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/12 05:28:21 UTC

[08/28] impala git commit: IMPALA-6792: Fail status reporting if coordinator refuses connections

IMPALA-6792: Fail status reporting if coordinator refuses connections

The ReportExecStatusAux() function is run on a dedicated thread per
fragment instance. This thread will run until the fragment instance
completes executing.

On every attempt to send a report to the coordinator, it will attempt
to send up to 3 RPCs. If all 3 of them fail, then the fragment instance
will cancel itself.

However, there is one case where a failure to send the RPC will not
be considered a failed RPC. If when we attempt to obtain a new
connection, we end up creating a new connection
(via ClientCache::CreateClient()) instead of getting a previously
cached connection, and this new connection fails to even Open(),
it will not be counted as a RPC failure.

This patch counts such an error as a failed RPC too.

This patch also clarifies some of the error log messages and introduces
a flag to control the sleep interval between failed ReportExecStatus RPC
retries.

Change-Id: If668838f99f78b5ffa713488178b2eb5799ba220
Reviewed-on: http://gerrit.cloudera.org:8080/9916
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/da3437a3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/da3437a3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/da3437a3

Branch: refs/heads/2.x
Commit: da3437a31b28c7fe598baf0f81e780e7f1dc82d5
Parents: bd63208
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Tue Apr 3 14:24:21 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Apr 11 22:56:00 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/query-state.cc | 34 +++++++++++++++++++++++++---------
 1 file changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/da3437a3/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index ad5748f..04a4283 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -38,6 +38,10 @@
 
 #include "common/names.h"
 
+DEFINE_int32(report_status_retry_interval_ms, 100,
+    "The interval in milliseconds to wait before retrying a failed status report RPC to "
+    "the coordinator.");
+
 using namespace impala;
 
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
@@ -249,28 +253,40 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
   DCHECK_EQ(res.status.status_code, TErrorCode::OK);
   // Try to send the RPC 3 times before failing. Sleep for 100ms between retries.
   // It's safe to retry the RPC as the coordinator handles duplicate RPC messages.
+  Status client_status;
   for (int i = 0; i < 3; ++i) {
-    Status client_status;
     ImpalaBackendConnection client(ExecEnv::GetInstance()->impalad_client_cache(),
         query_ctx().coord_address, &client_status);
     if (client_status.ok()) {
       rpc_status = client.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res);
       if (rpc_status.ok()) break;
     }
-    if (i < 2) SleepForMs(100);
+    if (i < 2) SleepForMs(FLAGS_report_status_retry_interval_ms);
   }
   Status result_status(res.status);
-  if ((!rpc_status.ok() || !result_status.ok()) && instances_started) {
+  if ((!client_status.ok() || !rpc_status.ok() || !result_status.ok()) &&
+      instances_started) {
     // TODO: should we try to keep rpc_status for the final report? (but the final
     // report, following this Cancel(), may not succeed anyway.)
     // TODO: not keeping an error status here means that all instances might
     // abort with CANCELLED status, despite there being an error
-    if (!rpc_status.ok()) {
-      // TODO: Fix IMPALA-2990. Cancelling fragment instances here may cause query to
-      // hang as the coordinator may not be aware of the cancellation. Remove the log
-      // statement once IMPALA-2990 is fixed.
-      LOG(ERROR) << "Cancelling fragment instances due to failure to report status. "
-                 << "Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+    // TODO: Fix IMPALA-2990. Cancelling fragment instances without sending the
+    // ReporExecStatus RPC may cause query to hang as the coordinator may not be aware
+    // of the cancellation. Remove the log statements once IMPALA-2990 is fixed.
+    if (!client_status.ok()) {
+      LOG(ERROR) << "Cancelling fragment instances due to failure to obtain a connection "
+                 << "to the coordinator. (" << client_status.GetDetail()
+                 << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+    } else if (!rpc_status.ok()) {
+      LOG(ERROR) << "Cancelling fragment instances due to failure to reach the "
+                 << "coordinator. (" << rpc_status.GetDetail()
+                 << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+    } else if (!result_status.ok()) {
+      // If the ReportExecStatus RPC succeeded in reaching the coordinator and we get
+      // back a non-OK status, it means that the coordinator expects us to cancel the
+      // fragment instances for this query.
+      LOG(INFO) << "Cancelling fragment instances as directed by the coordinator. "
+                << "Returned status: " << result_status.GetDetail();
     }
     Cancel();
   }