You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/16 21:41:03 UTC
[4/5] incubator-impala git commit: IMPALA-5199: prevent hang on empty
row batch exchange
IMPALA-5199: prevent hang on empty row batch exchange
The error path where delivery of "eos" fails now behaves
the same as if delivery of a row batch fails.
Testing:
Added a timeout test where the leaf fragments return 0 rows. Before
the change this reproduced the hang.
Change-Id: Ib370ebe44e3bb34d3f0fb9f05aa6386eb91c8645
Reviewed-on: http://gerrit.cloudera.org:8080/8005
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5119ced5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5119ced5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5119ced5
Branch: refs/heads/master
Commit: 5119ced50c0e0c4001621c9d4da598c187bdb580
Parents: 491822f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Sep 7 16:28:46 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Sep 16 00:50:07 2017 +0000
----------------------------------------------------------------------
be/src/runtime/data-stream-mgr.cc | 17 +++++++++++++----
.../queries/QueryTest/exchange-delays.test | 10 ++++++++++
2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5119ced5/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index a161ad3..9af8384 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -197,10 +197,19 @@ Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int sender_id) {
VLOG_FILE << "CloseSender(): fragment_instance_id=" << fragment_instance_id
<< ", node=" << dest_node_id;
- bool unused;
+ Status status;
+ bool already_unregistered;
shared_ptr<DataStreamRecvr> recvr = FindRecvrOrWait(fragment_instance_id, dest_node_id,
- &unused);
- if (recvr.get() != NULL) recvr->RemoveSender(sender_id);
+ &already_unregistered);
+ if (recvr == nullptr) {
+ // Was not able to notify the receiver that this was the end of stream. Notify the
+ // sender that this failed so that they can take appropriate action (i.e. failing
+ // the query).
+ status = already_unregistered ? Status::OK() :
+ Status(TErrorCode::DATASTREAM_SENDER_TIMEOUT, PrintId(fragment_instance_id));
+ } else {
+ recvr->RemoveSender(sender_id);
+ }
{
// Remove any closed streams that have been in the cache for more than
@@ -221,7 +230,7 @@ Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
<< PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
}
}
- return Status::OK();
+ return status;
}
Status DataStreamMgr::DeregisterRecvr(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5119ced5/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
index 0dac1d9..b1f6f75 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
@@ -8,3 +8,13 @@ from tpch.lineitem
---- CATCH
Sender timed out waiting for receiver fragment instance
====
+---- QUERY
+# IMPALA-5199: Query with zero rows sent over exchange.
+select l_orderkey, count(*)
+from tpch.lineitem
+where l_linenumber = -1
+group by l_orderkey
+---- RESULTS
+---- CATCH
+Sender timed out waiting for receiver fragment instance
+====