You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/07/25 18:21:35 UTC

[3/3] incubator-impala git commit: IMPALA-5586: Null-aware anti-join can take a long time to cancel

IMPALA-5586: Null-aware anti-join can take a long time to cancel

Queries with a null-aware anti-join joining on a large number of NULLs
can take a long time to cancel if threads are stuck in
PartitionedHashJoinNode::EvaluateNullProbe(). This change adds the
RETURN_IF_CANCELLED macro to the function.

Testing:
Added logs to PartitionedHashJoinNode::EvaluateNullProbe() and made sure
that the function returns right away on cancellation.

Change-Id: I0800754d4ad31cbadbdfadc630c640963f3f6053
Reviewed-on: http://gerrit.cloudera.org:8080/7393
Tested-by: Impala Public Jenkins
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7ccbfe47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7ccbfe47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7ccbfe47

Branch: refs/heads/master
Commit: 7ccbfe47fe1a0b693f6d94ae4b1062a0e3b66e88
Parents: 2c0fc30
Author: aphadke <ap...@cloudera.com>
Authored: Mon Jul 10 17:37:31 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Jul 25 03:57:14 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node.cc | 19 +++++++++++++------
 be/src/exec/partitioned-hash-join-node.h  |  5 +++--
 2 files changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7ccbfe47/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 927bf1a..0f731d3 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -608,7 +608,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
     // Finished up all probe rows for 'hash_partitions_'. We may have already cleaned up
     // the hash partitions, e.g. if we had to output some unmatched build rows below.
     if (builder_->num_hash_partitions() != 0) {
-      RETURN_IF_ERROR(CleanUpHashPartitions(out_batch));
+      RETURN_IF_ERROR(CleanUpHashPartitions(state, out_batch));
       if (out_batch->AtCapacity()) break;
     }
 
@@ -914,7 +914,8 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
     RETURN_IF_ERROR(probe_stream->GetNext(probe_batch_.get(), &eos));
 
     if (probe_batch_->num_rows() == 0) {
-      RETURN_IF_ERROR(EvaluateNullProbe(builder_->null_aware_partition()->build_rows()));
+      RETURN_IF_ERROR(EvaluateNullProbe(
+            state, builder_->null_aware_partition()->build_rows()));
       nulls_build_batch_.reset();
       RETURN_IF_ERROR(PrepareNullAwareNullProbe());
       return Status::OK();
@@ -994,7 +995,8 @@ void PartitionedHashJoinNode::CreateProbePartition(
       this, builder_->hash_partition(partition_idx), std::move(probe_rows));
 }
 
-Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
+Status PartitionedHashJoinNode::EvaluateNullProbe(
+    RuntimeState* state, BufferedTupleStream* build) {
   if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
     return Status::OK();
   }
@@ -1011,13 +1013,17 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
 
   ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
   int num_join_conjuncts = other_join_conjuncts_.size();
-
   DCHECK_LE(probe_rows->num_rows(), matched_null_probe_.size());
   // For each row, iterate over all rows in the build table.
   SCOPED_TIMER(null_aware_eval_timer_);
   for (int i = 0; i < probe_rows->num_rows(); ++i) {
+    // This loop may run for a long time. Check for cancellation.
+    RETURN_IF_CANCELLED(state);
     if (matched_null_probe_[i]) continue;
     for (int j = 0; j < build_rows->num_rows(); ++j) {
+      // This loop may run for a long time if the number of build_rows is large.
+      // Periodically check for cancellation.
+      if (j % 1024 == 0) RETURN_IF_CANCELLED(state);
       CreateOutputRow(semi_join_staging_row_, probe_rows->GetRow(i),
           build_rows->GetRow(j));
       if (ExecNode::EvalConjuncts(
@@ -1031,7 +1037,8 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
+Status PartitionedHashJoinNode::CleanUpHashPartitions(
+    RuntimeState* state, RowBatch* batch) {
   DCHECK_EQ(probe_batch_pos_, -1);
   // At this point all the rows have been read from the probe side for all partitions in
   // hash_partitions_.
@@ -1090,7 +1097,7 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
         // For NAAJ, we need to try to match all the NULL probe rows with this partition
         // before closing it. The NULL probe rows could have come from any partition
         // so we collect them all and match them at the end.
-        RETURN_IF_ERROR(EvaluateNullProbe(build_partition->build_rows()));
+        RETURN_IF_ERROR(EvaluateNullProbe(state, build_partition->build_rows()));
         build_partition->Close(batch);
       } else {
         build_partition->Close(batch);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7ccbfe47/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index bf90ae4..41493d0 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -331,7 +331,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// rows in build. This updates matched_null_probe_, short-circuiting if one of the
   /// conjuncts pass (i.e. there is a match).
   /// This is used for NAAJ, when there are NULL probe rows.
-  Status EvaluateNullProbe(BufferedTupleStream* build) WARN_UNUSED_RESULT;
+  Status EvaluateNullProbe(
+      RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT;
 
   /// Prepares to output NULLs on the probe side for NAAJ. Before calling this,
   /// matched_null_probe_ should have been fully evaluated.
@@ -351,7 +352,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   ///    unmatched rows.
   ///  - If the build partition did not have a hash table, meaning both build and probe
   ///    rows were spilled, move the partition to 'spilled_partitions_'.
-  Status CleanUpHashPartitions(RowBatch* batch) WARN_UNUSED_RESULT;
+  Status CleanUpHashPartitions(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT;
 
   /// Get the next row batch from the probe (left) side (child(0)). If we are done
   /// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.