You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/07/25 18:21:35 UTC
[3/3] incubator-impala git commit: IMPALA-5586: Null-aware anti-join
can take a long time to cancel
IMPALA-5586: Null-aware anti-join can take a long time to cancel
Queries with a null-aware anti-join joining on a large number of NULLs
can take a long time to cancel if threads are stuck in
PartitionedHashJoinNode::EvaluateNullProbe(). This change adds the
RETURN_IF_CANCELLED macro to the function.
Testing:
Added logs to PartitionedHashJoinNode::EvaluateNullProbe() and made sure
that the function returns right away on cancellation.
Change-Id: I0800754d4ad31cbadbdfadc630c640963f3f6053
Reviewed-on: http://gerrit.cloudera.org:8080/7393
Tested-by: Impala Public Jenkins
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7ccbfe47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7ccbfe47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7ccbfe47
Branch: refs/heads/master
Commit: 7ccbfe47fe1a0b693f6d94ae4b1062a0e3b66e88
Parents: 2c0fc30
Author: aphadke <ap...@cloudera.com>
Authored: Mon Jul 10 17:37:31 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Jul 25 03:57:14 2017 +0000
----------------------------------------------------------------------
be/src/exec/partitioned-hash-join-node.cc | 19 +++++++++++++------
be/src/exec/partitioned-hash-join-node.h | 5 +++--
2 files changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7ccbfe47/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 927bf1a..0f731d3 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -608,7 +608,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
// Finished up all probe rows for 'hash_partitions_'. We may have already cleaned up
// the hash partitions, e.g. if we had to output some unmatched build rows below.
if (builder_->num_hash_partitions() != 0) {
- RETURN_IF_ERROR(CleanUpHashPartitions(out_batch));
+ RETURN_IF_ERROR(CleanUpHashPartitions(state, out_batch));
if (out_batch->AtCapacity()) break;
}
@@ -914,7 +914,8 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
RETURN_IF_ERROR(probe_stream->GetNext(probe_batch_.get(), &eos));
if (probe_batch_->num_rows() == 0) {
- RETURN_IF_ERROR(EvaluateNullProbe(builder_->null_aware_partition()->build_rows()));
+ RETURN_IF_ERROR(EvaluateNullProbe(
+ state, builder_->null_aware_partition()->build_rows()));
nulls_build_batch_.reset();
RETURN_IF_ERROR(PrepareNullAwareNullProbe());
return Status::OK();
@@ -994,7 +995,8 @@ void PartitionedHashJoinNode::CreateProbePartition(
this, builder_->hash_partition(partition_idx), std::move(probe_rows));
}
-Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
+Status PartitionedHashJoinNode::EvaluateNullProbe(
+ RuntimeState* state, BufferedTupleStream* build) {
if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
return Status::OK();
}
@@ -1011,13 +1013,17 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
int num_join_conjuncts = other_join_conjuncts_.size();
-
DCHECK_LE(probe_rows->num_rows(), matched_null_probe_.size());
// For each row, iterate over all rows in the build table.
SCOPED_TIMER(null_aware_eval_timer_);
for (int i = 0; i < probe_rows->num_rows(); ++i) {
+ // This loop may run for a long time. Check for cancellation.
+ RETURN_IF_CANCELLED(state);
if (matched_null_probe_[i]) continue;
for (int j = 0; j < build_rows->num_rows(); ++j) {
+ // This loop may run for a long time if the number of build_rows is large.
+ // Periodically check for cancellation.
+ if (j % 1024 == 0) RETURN_IF_CANCELLED(state);
CreateOutputRow(semi_join_staging_row_, probe_rows->GetRow(i),
build_rows->GetRow(j));
if (ExecNode::EvalConjuncts(
@@ -1031,7 +1037,8 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
return Status::OK();
}
-Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
+Status PartitionedHashJoinNode::CleanUpHashPartitions(
+ RuntimeState* state, RowBatch* batch) {
DCHECK_EQ(probe_batch_pos_, -1);
// At this point all the rows have been read from the probe side for all partitions in
// hash_partitions_.
@@ -1090,7 +1097,7 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
// For NAAJ, we need to try to match all the NULL probe rows with this partition
// before closing it. The NULL probe rows could have come from any partition
// so we collect them all and match them at the end.
- RETURN_IF_ERROR(EvaluateNullProbe(build_partition->build_rows()));
+ RETURN_IF_ERROR(EvaluateNullProbe(state, build_partition->build_rows()));
build_partition->Close(batch);
} else {
build_partition->Close(batch);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7ccbfe47/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index bf90ae4..41493d0 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -331,7 +331,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// rows in build. This updates matched_null_probe_, short-circuiting if one of the
/// conjuncts pass (i.e. there is a match).
/// This is used for NAAJ, when there are NULL probe rows.
- Status EvaluateNullProbe(BufferedTupleStream* build) WARN_UNUSED_RESULT;
+ Status EvaluateNullProbe(
+ RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT;
/// Prepares to output NULLs on the probe side for NAAJ. Before calling this,
/// matched_null_probe_ should have been fully evaluated.
@@ -351,7 +352,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// unmatched rows.
/// - If the build partition did not have a hash table, meaning both build and probe
/// rows were spilled, move the partition to 'spilled_partitions_'.
- Status CleanUpHashPartitions(RowBatch* batch) WARN_UNUSED_RESULT;
+ Status CleanUpHashPartitions(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT;
/// Get the next row batch from the probe (left) side (child(0)). If we are done
/// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.