You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/02/06 21:11:09 UTC

[7/7] incubator-impala git commit: IMPALA-3524: Don't process spilled partitions with 0 probe rows

IMPALA-3524: Don't process spilled partitions with 0 probe rows

In the partitioned hash join node, if a spilled partition has no probe
rows, building the hash table is unnecessary.

For some build types (right outer, right anti, and full outer), we still
need to process the build side to output unmatched rows (in this case, all
rows since there were no probe rows to match).

Testing: Added some cases to spilling.test. Manually tested these cases
for performance, and they all show around a 6% improvement.

Change-Id: I175b32dd9031e51218b38c37693ac3e31dfab47b
Reviewed-on: http://gerrit.cloudera.org:8080/5389
Reviewed-by: Jim Apple <jb...@apache.org>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6a9df540
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6a9df540
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6a9df540

Branch: refs/heads/master
Commit: 6a9df540967e07b09524268d0cc52b7d10835676
Parents: bdd39f6
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Dec 5 15:37:06 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Feb 6 20:22:33 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node.cc       | 156 +++++++++++++++----
 be/src/exec/partitioned-hash-join-node.h        |  52 +++++--
 .../queries/QueryTest/spilling.test             |  62 ++++++++
 .../tpch/queries/tpch-outer-joins.test          |   3 -
 4 files changed, 233 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 6073486..0c91b47 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -135,6 +135,8 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
 
   num_probe_rows_partitioned_ =
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
+  num_hash_table_builds_skipped_ =
+      ADD_COUNTER(runtime_profile(), "NumHashTableBuildsSkipped", TUnit::UNIT);
   AddCodegenDisabledMessage(state);
   return Status::OK();
 }
@@ -201,6 +203,8 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   CloseAndDeletePartitions();
   builder_->Reset();
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
+  output_unmatched_batch_.reset();
+  output_unmatched_batch_iter_.reset();
   return ExecNode::Reset(state);
 }
 
@@ -238,6 +242,8 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (ht_ctx_ != NULL) ht_ctx_->Close();
   nulls_build_batch_.reset();
+  output_unmatched_batch_.reset();
+  output_unmatched_batch_iter_.reset();
   CloseAndDeletePartitions();
   if (builder_ != NULL) builder_->Close(state);
   Expr::Close(build_expr_ctxs_, state);
@@ -330,13 +336,17 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
       // In case of right-outer, right-anti and full-outer joins, we move this partition
       // to the list of partitions that we need to output their unmatched build rows.
       DCHECK(output_build_partitions_.empty());
-      DCHECK(input_partition_->build_partition()->hash_tbl() != NULL)
-          << " id: " << id_
-          << " Build: " << input_partition_->build_partition()->build_rows()->num_rows()
-          << " Probe: " << probe_rows->num_rows() << endl
-          << GetStackTrace();
-      hash_tbl_iterator_ =
-          input_partition_->build_partition()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
+      DCHECK(output_unmatched_batch_iter_.get() == NULL);
+      if (input_partition_->build_partition()->hash_tbl() != NULL) {
+        hash_tbl_iterator_ =
+            input_partition_->build_partition()->hash_tbl()->FirstUnmatched(
+                ht_ctx_.get());
+      } else {
+        output_unmatched_batch_.reset(new RowBatch(
+            child(1)->row_desc(), runtime_state_->batch_size(), builder_->mem_tracker()));
+        output_unmatched_batch_iter_.reset(
+            new RowBatch::Iterator(output_unmatched_batch_.get(), 0));
+      }
       output_build_partitions_.push_back(input_partition_->build_partition());
     } else {
       // In any other case, just close the input build partition.
@@ -365,6 +375,24 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
   spilled_partitions_.pop_front();
   PhjBuilder::Partition* build_partition = input_partition_->build_partition();
   DCHECK(build_partition->is_spilled());
+  if (input_partition_->probe_rows()->num_rows() == 0) {
+    // If there are no probe rows, there's no need to build the hash table, and
+    // only partitions with NeedToProcessUnmatcheBuildRows() will have been added
+    // to 'spilled_partitions_' in CleanUpHashPartitions().
+    DCHECK(NeedToProcessUnmatchedBuildRows());
+    bool got_read_buffer = false;
+    RETURN_IF_ERROR(input_partition_->build_partition()->build_rows()->PrepareForRead(
+        false, &got_read_buffer));
+    if (!got_read_buffer) {
+      return mem_tracker()->MemLimitExceeded(
+          runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
+    }
+
+    *got_partition = true;
+    UpdateState(PROBING_SPILLED_PARTITION);
+    COUNTER_ADD(num_hash_table_builds_skipped_, 1);
+    return Status::OK();
+  }
 
   // Make sure we have a buffer to read the probe rows before we build the hash table.
   RETURN_IF_ERROR(input_partition_->PrepareForRead());
@@ -614,27 +642,82 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
   return Status::OK();
 }
 
-void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
+Status PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
   SCOPED_TIMER(probe_timer_);
   DCHECK(NeedToProcessUnmatchedBuildRows());
   DCHECK(!output_build_partitions_.empty());
+  const int start_num_rows = out_batch->num_rows();
+
+  if (output_unmatched_batch_iter_.get() != NULL) {
+    // There were no probe rows so we skipped building the hash table. In this case, all
+    // build rows of the partition are unmatched.
+    RETURN_IF_ERROR(OutputAllBuild(out_batch));
+  } else {
+    // We built and processed the hash table, so sweep over it and output unmatched rows.
+    RETURN_IF_ERROR(OutputUnmatchedBuildFromHashTable(out_batch));
+  }
+
+  num_rows_returned_ += out_batch->num_rows() - start_num_rows;
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  return Status::OK();
+}
+
+Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
+  // This will only be called for partitions that are added to 'output_build_partitions_'
+  // in NextSpilledProbeRowBatch(), which adds one partition that is then processed until
+  // it is done by the loop in GetNext(). So, there must be exactly one partition in
+  // 'output_build_partitions_' here.
+  DCHECK_EQ(output_build_partitions_.size(), 1);
+  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
+  const int num_conjuncts = conjunct_ctxs_.size();
+  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
+
+  bool eos = false;
+  while (!eos && !out_batch->AtCapacity()) {
+    if (output_unmatched_batch_iter_->AtEnd()) {
+      output_unmatched_batch_->TransferResourceOwnership(out_batch);
+      output_unmatched_batch_->Reset();
+      RETURN_IF_ERROR(output_build_partitions_.front()->build_rows()->GetNext(
+          output_unmatched_batch_.get(), &eos));
+      output_unmatched_batch_iter_.reset(
+          new RowBatch::Iterator(output_unmatched_batch_.get(), 0));
+    }
+
+    for (; !output_unmatched_batch_iter_->AtEnd() && !out_batch->AtCapacity();
+         output_unmatched_batch_iter_->Next()) {
+      OutputBuildRow(out_batch, output_unmatched_batch_iter_->Get(), &out_batch_iterator);
+      if (ExecNode::EvalConjuncts(
+              conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) {
+        out_batch->CommitLastRow();
+        out_batch_iterator.Next();
+      }
+    }
+  }
+
+  // If we reached eos and finished the last batch, then there are no other unmatched
+  // build rows for this partition. In that case we need to close the partition.
+  // Otherwise, we reached out_batch capacity and we need to continue to output
+  // unmatched build rows, without closing the partition.
+  if (eos && output_unmatched_batch_iter_->AtEnd()) {
+    output_build_partitions_.front()->Close(out_batch);
+    output_build_partitions_.pop_front();
+    DCHECK(output_build_partitions_.empty());
+    output_unmatched_batch_iter_.reset();
+  }
+  return Status::OK();
+}
+
+Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_batch) {
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
   const int num_conjuncts = conjunct_ctxs_.size();
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
-  const int start_num_rows = out_batch->num_rows();
 
   while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd()) {
     // Output remaining unmatched build rows.
     if (!hash_tbl_iterator_.IsMatched()) {
-      TupleRow* build_row = hash_tbl_iterator_.GetRow();
-      DCHECK(build_row != NULL);
-      if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
-        out_batch->CopyRow(build_row, out_batch_iterator.Get());
-      } else {
-        CreateOutputRow(out_batch_iterator.Get(), NULL, build_row);
-      }
-      if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts,
-          out_batch_iterator.Get())) {
+      OutputBuildRow(out_batch, hash_tbl_iterator_.GetRow(), &out_batch_iterator);
+      if (ExecNode::EvalConjuncts(
+              conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) {
         out_batch->CommitLastRow();
         out_batch_iterator.Next();
       }
@@ -646,9 +729,9 @@ void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
 
   // If we reached the end of the hash table, then there are no other unmatched build
   // rows for this partition. In that case we need to close the partition, and move to
-  // the next. If we have not reached the end of the hash table, it means that we reached
-  // out_batch capacity and we need to continue to output unmatched build rows, without
-  // closing the partition.
+  // the next. If we have not reached the end of the hash table, it means that we
+  // reached out_batch capacity and we need to continue to output unmatched build rows,
+  // without closing the partition.
   if (hash_tbl_iterator_.AtEnd()) {
     output_build_partitions_.front()->Close(out_batch);
     output_build_partitions_.pop_front();
@@ -658,9 +741,17 @@ void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
           output_build_partitions_.front()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
     }
   }
+  return Status::OK();
+}
 
-  num_rows_returned_ += out_batch->num_rows() - start_num_rows;
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+void PartitionedHashJoinNode::OutputBuildRow(
+    RowBatch* out_batch, TupleRow* build_row, RowBatch::Iterator* out_batch_iterator) {
+  DCHECK(build_row != NULL);
+  if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
+    out_batch->CopyRow(build_row, out_batch_iterator->Get());
+  } else {
+    CreateOutputRow(out_batch_iterator->Get(), NULL, build_row);
+  }
 }
 
 Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
@@ -970,13 +1061,24 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
       RETURN_IF_ERROR(
           probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
 
-      // Push newly created partitions at the front. This means a depth first walk
-      // (more finely partitioned partitions are processed first). This allows us
-      // to delete blocks earlier and bottom out the recursion earlier.
-      spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
+      if (probe_partition->probe_rows()->num_rows() != 0
+          || NeedToProcessUnmatchedBuildRows()) {
+        // Push newly created partitions at the front. This means a depth first walk
+        // (more finely partitioned partitions are processed first). This allows us
+        // to delete blocks earlier and bottom out the recursion earlier.
+        spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
+      } else {
+        // There's no more processing to do for this partition, and since there were no
+        // probe rows we didn't return any rows that reference memory from these
+        // partitions, so just free the resources.
+        build_partition->Close(NULL);
+        probe_partition->Close(NULL);
+        COUNTER_ADD(num_hash_table_builds_skipped_, 1);
+      }
     } else {
       DCHECK(probe_partition == NULL);
       if (NeedToProcessUnmatchedBuildRows()) {
+        DCHECK(output_unmatched_batch_iter_.get() == NULL);
         if (output_build_partitions_.empty()) {
           hash_tbl_iterator_ = build_partition->hash_tbl()->FirstUnmatched(ht_ctx_.get());
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 504dc7b..6d4e7f4 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -282,12 +282,29 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   int ProcessProbeBatch(const TJoinOp::type join_op, TPrefetchMode::type,
       RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
 
-  /// Sweep the hash_tbl_ of the partition that is at the front of
-  /// output_build_partitions_, using hash_tbl_iterator_ and output any unmatched build
-  /// rows. If reaches the end of the hash table it closes that partition, removes it from
-  /// output_build_partitions_ and moves hash_tbl_iterator_ to the beginning of the
-  /// new partition at the front of output_build_partitions_.
-  void OutputUnmatchedBuild(RowBatch* out_batch);
+  /// Used when NeedToProcessUnmatchedBuildRows() is true. Writes all unmatched rows from
+  /// 'output_build_partitions_' to 'out_batch', up to 'out_batch' capacity.
+  Status OutputUnmatchedBuild(RowBatch* out_batch);
+
+  /// Called by OutputUnmatchedBuild() when there isn't a hash table built, which happens
+  /// when a spilled partition had 0 probe rows. In this case, all of the build rows are
+  /// unmatched and we can iterate over the entire build side of the partition, which will
+  /// be the only partition in 'output_build_partitions_'. If it reaches the end of the
+  /// partition, it closes that partition and removes it from 'output_build_partitions_'.
+  Status OutputAllBuild(RowBatch* out_batch);
+
+  /// Called by OutputUnmatchedBuild when there is a hash table built. Sweeps the
+  /// 'hash_tbl_' of the partition that is at the front of 'output_build_partitions_',
+  /// using 'hash_tbl_iterator_' and outputs any unmatched build rows. If it reaches the
+  /// end of the hash table it closes that partition, removes it from
+  /// 'output_build_partitions_' and moves 'hash_tbl_iterator_' to the beginning of the
+  /// new partition at the front of 'output_build_partitions_'.
+  Status OutputUnmatchedBuildFromHashTable(RowBatch* out_batch);
+
+  /// Writes 'build_row' to 'out_batch' at the position of 'out_batch_iterator' in a
+  /// 'join_op_' specific way.
+  void OutputBuildRow(
+      RowBatch* out_batch, TupleRow* build_row, RowBatch::Iterator* out_batch_iterator);
 
   /// Initializes 'null_aware_probe_partition_' and prepares its probe stream for writing.
   Status InitNullAwareProbePartition();
@@ -338,10 +355,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Moves onto the next spilled partition and initializes 'input_partition_'. This
   /// function processes the entire build side of 'input_partition_' and when this
   /// function returns, we are ready to consume the probe side of 'input_partition_'.
-  /// If the build side's hash table fits in memory, we will construct input_partition_'s
-  /// hash table. If it does not, meaning we need to repartition, this function will
-  /// repartition the build rows into 'builder->hash_partitions_' and prepare for
-  /// repartitioning the partition's probe rows.
+  /// If the build side's hash table fits in memory and there are probe rows, we will
+  /// construct input_partition_'s hash table. If it does not fit, meaning we need to
+  /// repartition, this function will repartition the build rows into
+  /// 'builder->hash_partitions_' and prepare for repartitioning the partition's probe
+  /// rows. If there are no probe rows, we just prepare the build side to be read by
+  /// OutputUnmatchedBuild().
   Status PrepareSpilledPartitionForProbe(RuntimeState* state, bool* got_partition);
 
   /// Calls Close() on every probe partition, destroys the partitions and cleans up any
@@ -389,6 +408,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Time spent evaluating other_join_conjuncts for NAAJ.
   RuntimeProfile::Counter* null_aware_eval_timer_;
 
+  /// Number of partitions which had zero probe rows and we therefore didn't build the
+  /// hash table.
+  RuntimeProfile::Counter* num_hash_table_builds_skipped_;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
@@ -451,6 +474,15 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// outputting.
   int64_t null_probe_output_idx_;
 
+  /// Used by OutputAllBuild() to iterate over the entire build side tuple stream of the
+  /// current partition.
+  std::unique_ptr<RowBatch> output_unmatched_batch_;
+
+  /// Stores an iterator into 'output_unmatched_batch_' to start from on the next call to
+  /// OutputAllBuild(), or NULL if there are no partitions without hash tables needing to
+  /// be processed by OutputUnmatchedBuild().
+  std::unique_ptr<RowBatch::Iterator> output_unmatched_batch_iter_;
+
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index 91b425e..89668e8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -628,3 +628,65 @@ BIGINT,BIGINT,BIGINT,INT,DECIMAL,DECIMAL,DECIMAL,DECIMAL,STRING,STRING,STRING,ST
 row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====
+---- QUERY
+# Tests for the case where a spilled partition has 0 probe rows and so we don't build the
+# hash table in a partitioned hash join.
+# INNER JOIN
+set max_block_mgr_memory=10m;
+select straight_join count(*)
+from
+lineitem a, lineitem b
+where
+a.l_partkey = 1 and
+a.l_orderkey = b.l_orderkey;
+---- TYPES
+BIGINT
+---- RESULTS
+173
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, NULL AWARE LEFT ANTI JOIN
+set max_block_mgr_memory=10m;
+select straight_join count(*)
+from
+lineitem a
+where
+a.l_partkey not in (select l_partkey from lineitem where l_partkey > 10)
+and a.l_partkey < 1000;
+---- TYPES
+BIGINT
+---- RESULTS
+287
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, RIGHT OUTER JOIN
+set max_block_mgr_memory=100m;
+select straight_join count(*)
+from
+supplier right outer join lineitem on s_suppkey = l_suppkey
+where s_acctbal > 0 and s_acctbal < 10;
+---- TYPES
+BIGINT
+---- RESULTS
+12138
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, RIGHT ANTI JOIN
+set max_block_mgr_memory=30m;
+with x as (select * from supplier limit 10)
+select straight_join count(*)
+from
+x right anti join lineitem on s_suppkey + 100 = l_suppkey;
+---- TYPES
+BIGINT
+---- RESULTS
+5995258
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/testdata/workloads/tpch/queries/tpch-outer-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/queries/tpch-outer-joins.test b/testdata/workloads/tpch/queries/tpch-outer-joins.test
index 0da2850..a189a7a 100644
--- a/testdata/workloads/tpch/queries/tpch-outer-joins.test
+++ b/testdata/workloads/tpch/queries/tpch-outer-joins.test
@@ -29,9 +29,6 @@ SELECT straight_join * FROM orders o
 RIGHT OUTER JOIN lineitem l ON o.o_orderkey =  if(l.l_orderkey % 2 = 0, 0, l.l_orderkey)
 ORDER BY l_receiptdate, l_orderkey, l_shipdate
 limit 10
----- CATCH: ANY_OF
-Repartitioning did not reduce the size of a spilled partition
-Memory limit exceeded
 ====
 ---- QUERY
 # Regression test for IMPALA-2612. The following query will cause CastToChar