You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/16 21:41:03 UTC

[4/5] incubator-impala git commit: IMPALA-5199: prevent hang on empty row batch exchange

IMPALA-5199: prevent hang on empty row batch exchange

The error path where delivery of "eos" fails now behaves
the same as if delivery of a row batch fails.

Testing:
Added a timeout test where the leaf fragments return 0 rows. Before
the change this reproduced the hang.

Change-Id: Ib370ebe44e3bb34d3f0fb9f05aa6386eb91c8645
Reviewed-on: http://gerrit.cloudera.org:8080/8005
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5119ced5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5119ced5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5119ced5

Branch: refs/heads/master
Commit: 5119ced50c0e0c4001621c9d4da598c187bdb580
Parents: 491822f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Sep 7 16:28:46 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Sep 16 00:50:07 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-mgr.cc                  | 17 +++++++++++++----
 .../queries/QueryTest/exchange-delays.test         | 10 ++++++++++
 2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5119ced5/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index a161ad3..9af8384 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -197,10 +197,19 @@ Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
     PlanNodeId dest_node_id, int sender_id) {
   VLOG_FILE << "CloseSender(): fragment_instance_id=" << fragment_instance_id
             << ", node=" << dest_node_id;
-  bool unused;
+  Status status;
+  bool already_unregistered;
   shared_ptr<DataStreamRecvr> recvr = FindRecvrOrWait(fragment_instance_id, dest_node_id,
-      &unused);
-  if (recvr.get() != NULL) recvr->RemoveSender(sender_id);
+      &already_unregistered);
+  if (recvr == nullptr) {
+    // Was not able to notify the receiver that this was the end of stream. Notify the
+    // sender that this failed so that they can take appropriate action (i.e. failing
+    // the query).
+    status = already_unregistered ? Status::OK() :
+        Status(TErrorCode::DATASTREAM_SENDER_TIMEOUT, PrintId(fragment_instance_id));
+  } else {
+    recvr->RemoveSender(sender_id);
+  }
 
   {
     // Remove any closed streams that have been in the cache for more than
@@ -221,7 +230,7 @@ Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
                  << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
     }
   }
-  return Status::OK();
+  return status;
 }
 
 Status DataStreamMgr::DeregisterRecvr(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5119ced5/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
index 0dac1d9..b1f6f75 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
@@ -8,3 +8,13 @@ from tpch.lineitem
 ---- CATCH
 Sender timed out waiting for receiver fragment instance
 ====
+---- QUERY
+# IMPALA-5199: Query with zero rows sent over exchange.
+select l_orderkey, count(*)
+from tpch.lineitem
+where l_linenumber = -1
+group by l_orderkey
+---- RESULTS
+---- CATCH
+Sender timed out waiting for receiver fragment instance
+====