You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/12 22:09:39 UTC

[04/50] [abbrv] incubator-impala git commit: IMPALA-1583: Simplify PartitionedHashJoinNode::ProcessProbeBatch()

IMPALA-1583: Simplify PartitionedHashJoinNode::ProcessProbeBatch()

PartitionedHashJoinNode::ProcessProbeBatch() is templatized
to deal with various join types. It's rather hard to follow
with logic of probing of different join types mixed together
and goto statements interleaved in between.

This change mechanically refactors the probing logic into four
different functions: one for inner joins, one for outer joins,
one for left semi joins and one for right semi joins. The code
is still templatized so with inlining, the generated code should
be about the same as before.

Change-Id: Ie2091bdf97ab34c5cdc84e84394c579a5b36afc0
Reviewed-on: http://gerrit.cloudera.org:8080/2893
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9a376385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9a376385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9a376385

Branch: refs/heads/master
Commit: 9a376385421b378fd71206317c86ab412f0b3a0d
Parents: 68e8262
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Apr 27 17:09:50 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 14:17:50 2016 -0700

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node-ir.cc | 428 ++++++++++++++--------
 be/src/exec/partitioned-hash-join-node.h     |  90 ++++-
 be/src/runtime/row-batch.h                   |   3 +
 3 files changed, 354 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a376385/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index 9c9923e..354003e 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -37,158 +37,200 @@ bool IR_NO_INLINE EvalOtherJoinConjuncts(
   return ExecNode::EvalConjuncts(ctxs, num_ctxs, row);
 }
 
-// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
-// codegen.
-template<int const JoinOp>
-int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
-    const HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
-  ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
-  const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  const int num_conjuncts = conjunct_ctxs_.size();
-
-  DCHECK(!out_batch->AtCapacity());
-  DCHECK_GE(probe_batch_pos_, 0);
-  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->AddRow());
-  TupleRow* out_row = out_batch_iterator.Get();
-  const int max_rows = out_batch->capacity() - out_batch->num_rows();
-  // Note that 'probe_batch_pos_' is the row no. of the row after 'current_probe_row_'.
-  RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
-  int num_rows_added = 0;
-
-  while (true) {
-    if (current_probe_row_ != NULL) {
-      while (!hash_tbl_iterator_.AtEnd()) {
-        TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
-        DCHECK(matched_build_row != NULL);
-
-        if ((JoinOp == TJoinOp::RIGHT_SEMI_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN) &&
-            hash_tbl_iterator_.IsMatched()) {
-          hash_tbl_iterator_.NextDuplicate();
-          continue;
-        }
-
-        if (JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
-            JoinOp == TJoinOp::RIGHT_ANTI_JOIN || JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
-            JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-          // Evaluate the non-equi-join conjuncts against a temp row assembled from all
-          // build and probe tuples.
-          if (num_other_join_conjuncts > 0) {
-            CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row);
-            if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs,
-                    num_other_join_conjuncts, semi_join_staging_row_)) {
-              hash_tbl_iterator_.NextDuplicate();
-              continue;
-            }
-          }
-
-          // Create output row assembled from build xor probe tuples.
-          if (JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
-              JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            out_batch->CopyRow(current_probe_row_, out_row);
-          } else {
-            out_batch->CopyRow(matched_build_row, out_row);
-          }
-        } else {
-          // Not a semi join; create an output row with all probe/build tuples and
-          // evaluate the non-equi-join conjuncts.
-          CreateOutputRow(out_row, current_probe_row_, matched_build_row);
-          if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, num_other_join_conjuncts,
-                  out_row)) {
-            hash_tbl_iterator_.NextDuplicate();
-            continue;
-          }
-        }
+bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowInnerJoin(
+    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+    ExprContext* const* conjunct_ctxs, int num_conjuncts,
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
+  DCHECK(current_probe_row_ != NULL);
+  TupleRow* out_row = out_batch_iterator->Get();
+  for (; !hash_tbl_iterator_.AtEnd(); hash_tbl_iterator_.NextDuplicate()) {
+    TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
+    DCHECK(matched_build_row != NULL);
 
-        // At this point the probe is considered matched.
-        matched_probe_ = true;
-        if (JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
-            JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-          // We can safely ignore this probe row for left anti joins.
-          hash_tbl_iterator_.SetAtEnd();
-          goto next_row;
-        }
+    // Create an output row with all probe/build tuples and evaluate the
+    // non-equi-join conjuncts.
+    CreateOutputRow(out_row, current_probe_row_, matched_build_row);
+    if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, num_other_join_conjuncts,
+        out_row)) {
+      continue;
+    }
+    if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+      --(*remaining_capacity);
+      if (*remaining_capacity == 0) {
+        hash_tbl_iterator_.NextDuplicate();
+        return false;
+      }
+      out_row = out_batch_iterator->Next();
+    }
+  }
+  return true;
+}
 
-        // Update hash_tbl_iterator.
-        if (JoinOp == TJoinOp::LEFT_SEMI_JOIN) {
-          hash_tbl_iterator_.SetAtEnd();
-        } else {
-          if (JoinOp == TJoinOp::RIGHT_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN ||
-              JoinOp == TJoinOp::FULL_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_SEMI_JOIN) {
-            // There is a match for this build row. Mark the Bucket or the DuplicateNode
-            // as matched for right/full joins.
-            hash_tbl_iterator_.SetMatched();
-          }
-          hash_tbl_iterator_.NextDuplicate();
-        }
+template<int const JoinOp>
+bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins(
+    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+    ExprContext* const* conjunct_ctxs, int num_conjuncts,
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
+  DCHECK(current_probe_row_ != NULL);
+  DCHECK(JoinOp == TJoinOp::RIGHT_SEMI_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN);
+  TupleRow* out_row = out_batch_iterator->Get();
+  for (; !hash_tbl_iterator_.AtEnd(); hash_tbl_iterator_.NextDuplicate()) {
+    TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
+    DCHECK(matched_build_row != NULL);
 
-        if (JoinOp != TJoinOp::RIGHT_ANTI_JOIN &&
-            ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
-          ++num_rows_added;
-          if (num_rows_added == max_rows) goto update_probe_batch_pos;
-          out_row = out_batch_iterator.Next();
-        }
+    if (hash_tbl_iterator_.IsMatched()) continue;
+    // Evaluate the non-equi-join conjuncts against a temp row assembled from all
+    // build and probe tuples.
+    if (num_other_join_conjuncts > 0) {
+      CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row);
+      if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs,
+          num_other_join_conjuncts, semi_join_staging_row_)) {
+        continue;
+      }
+    }
+    // Create output row assembled from build tuples.
+    out_batch_iterator->parent()->CopyRow(matched_build_row, out_row);
+    // Update the hash table to indicate that this entry has been matched.
+    hash_tbl_iterator_.SetMatched();
+    if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN &&
+        ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+      --(*remaining_capacity);
+      if (*remaining_capacity == 0) {
+        hash_tbl_iterator_.NextDuplicate();
+        return false;
       }
+      out_row = out_batch_iterator->Next();
+    }
+  }
+  return true;
+}
 
-      if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !matched_probe_) {
-        // Null aware behavior. The probe row did not match in the hash table so we
-        // should interpret the hash table probe as "unknown" if there are nulls on the
-        // build size. For those rows, we need to process the remaining join
-        // predicates later.
-        if (null_aware_partition_->build_rows()->num_rows() != 0) {
-          if (num_other_join_conjuncts == 0) goto next_row;
-          if (UNLIKELY(!AppendRow(null_aware_partition_->probe_rows(),
-                           current_probe_row_, status))) {
-            num_rows_added = -1;
-            goto update_probe_batch_pos;
-          }
-          goto next_row;
-        }
+template<int const JoinOp>
+bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
+    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+    ExprContext* const* conjunct_ctxs, int num_conjuncts,
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
+  DCHECK(current_probe_row_ != NULL);
+  DCHECK(JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
+      JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
+  TupleRow* out_row = out_batch_iterator->Get();
+  for (; !hash_tbl_iterator_.AtEnd(); hash_tbl_iterator_.NextDuplicate()) {
+    TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
+    DCHECK(matched_build_row != NULL);
+    // Evaluate the non-equi-join conjuncts against a temp row assembled from all
+    // build and probe tuples.
+    if (num_other_join_conjuncts > 0) {
+      CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row);
+      if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs,
+          num_other_join_conjuncts, semi_join_staging_row_)) {
+        continue;
       }
+    }
+    // Create output row assembled from probe tuples.
+    out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
+    // A match is found in the hash table. The search is over for this probe row.
+    matched_probe_ = true;
+    hash_tbl_iterator_.SetAtEnd();
+    // Append to output batch for left semi joins if the conjuncts are satisfied.
+    if (JoinOp == TJoinOp::LEFT_SEMI_JOIN &&
+        ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+      --(*remaining_capacity);
+      if (*remaining_capacity == 0) return false;
+      out_row = out_batch_iterator->Next();
+    }
+    // Done with this probe row.
+    return true;
+  }
 
-      if ((JoinOp == TJoinOp::LEFT_OUTER_JOIN || JoinOp == TJoinOp::FULL_OUTER_JOIN) &&
-          !matched_probe_) {
-        // No match for this row, we need to output it.
-        CreateOutputRow(out_row, current_probe_row_, NULL);
-        if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
-          ++num_rows_added;
-          matched_probe_ = true;
-          if (num_rows_added == max_rows) goto update_probe_batch_pos;
-          out_row = out_batch_iterator.Next();
+  if (JoinOp != TJoinOp::LEFT_SEMI_JOIN && !matched_probe_) {
+    if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+      // Null aware behavior. The probe row did not match in the hash table so we
+      // should interpret the hash table probe as "unknown" if there are nulls on the
+      // build side. For those rows, we need to process the remaining join
+      // predicates later.
+      if (null_aware_partition_->build_rows()->num_rows() != 0) {
+        if (num_other_join_conjuncts > 0 &&
+            UNLIKELY(!AppendRow(null_aware_partition_->probe_rows(),
+                         current_probe_row_, status))) {
+          DCHECK(!status->ok());
+          return false;
         }
-      }
-      if ((JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
-          JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) &&
-          !matched_probe_) {
-        // No match for this current_probe_row_, we need to output it. No need to
-        // evaluate the conjunct_ctxs since semi joins cannot have any.
-        out_batch->CopyRow(current_probe_row_, out_row);
-        ++num_rows_added;
-        matched_probe_ = true;
-        if (num_rows_added == max_rows) goto update_probe_batch_pos;
-        out_row = out_batch_iterator.Next();
+        return true;
       }
     }
+    // No match for this current_probe_row_, we need to output it. No need to
+    // evaluate the conjunct_ctxs since anti joins cannot have any.
+    out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
+    matched_probe_ = true;
+    --(*remaining_capacity);
+    if (*remaining_capacity == 0) return false;
+    out_row = out_batch_iterator->Next();
+  }
+  return true;
+}
 
-next_row:
-    // Must have reached the end of the hash table iterator for the current row before
-    // moving to the next row.
-    DCHECK(hash_tbl_iterator_.AtEnd());
+template<int const JoinOp>
+bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
+    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+    ExprContext* const* conjunct_ctxs, int num_conjuncts,
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
+  DCHECK(JoinOp == TJoinOp::LEFT_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
+      JoinOp == TJoinOp::FULL_OUTER_JOIN);
+  DCHECK(current_probe_row_ != NULL);
+  TupleRow* out_row = out_batch_iterator->Get();
+  for (; !hash_tbl_iterator_.AtEnd(); hash_tbl_iterator_.NextDuplicate()) {
+    TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
+    DCHECK(matched_build_row != NULL);
+    // Create an output row with all probe/build tuples and evaluate the
+    // non-equi-join conjuncts.
+    CreateOutputRow(out_row, current_probe_row_, matched_build_row);
+    if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, num_other_join_conjuncts,
+        out_row)) {
+      continue;
+    }
+    // At this point the probe is considered matched.
+    matched_probe_ = true;
+    if (JoinOp == TJoinOp::RIGHT_OUTER_JOIN || JoinOp == TJoinOp::FULL_OUTER_JOIN) {
+      // There is a match for this build row. Mark the Bucket or the DuplicateNode
+      // as matched for right/full outer joins.
+      hash_tbl_iterator_.SetMatched();
+    }
+    if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+      --(*remaining_capacity);
+      if (*remaining_capacity == 0) {
+        hash_tbl_iterator_.NextDuplicate();
+        return false;
+      }
+      out_row = out_batch_iterator->Next();
+    }
+  }
 
-    if (UNLIKELY(probe_batch_iterator.AtEnd())) {
-      // Finished this batch.
-      probe_batch_pos_ = probe_batch_->num_rows();
-      current_probe_row_ = NULL;
-      goto done;
+  if (JoinOp != TJoinOp::RIGHT_OUTER_JOIN && !matched_probe_) {
+    // No match for this row, we need to output it if it's a left/full outer join.
+    CreateOutputRow(out_row, current_probe_row_, NULL);
+    if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+      matched_probe_ = true;
+      --(*remaining_capacity);
+      if (*remaining_capacity == 0) return false;
+      out_row = out_batch_iterator->Next();
     }
+  }
+  return true;
+}
 
+template<int const JoinOp>
+bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
+    const HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
+    int* remaining_capacity, int num_other_join_conjuncts, Status* status) {
+  while (!probe_batch_iterator->AtEnd()) {
     // Establish current_probe_row_ and find its corresponding partition.
-    current_probe_row_ = probe_batch_iterator.Get();
-    probe_batch_iterator.Next();
+    current_probe_row_ = probe_batch_iterator->Get();
+    probe_batch_iterator->Next();
     matched_probe_ = false;
+
     uint32_t hash;
     if (!ht_ctx->EvalAndHashProbe(current_probe_row_, &hash)) {
-      if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+      if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && non_empty_build_) {
         // For NAAJ, we need to treat NULLs on the probe carefully. The logic is:
         // 1. No build rows -> Return this row.
         // 2. Has build rows & no other join predicates, skip row.
@@ -196,42 +238,108 @@ next_row:
         // build rows. First evaluate it against this partition, and if there is not
         // a match, save it to evaluate against other partitions later. If there
         // is a match, the row is skipped.
-        if (!non_empty_build_) continue;
-        if (num_other_join_conjuncts == 0) goto next_row;
-        if (UNLIKELY(!AppendRow(null_probe_rows_, current_probe_row_, status))) {
-          num_rows_added = -1;
-          goto update_probe_batch_pos;
+        if (num_other_join_conjuncts > 0) {
+          if (UNLIKELY(!AppendRow(null_probe_rows_, current_probe_row_, status))) {
+            DCHECK(!status->ok());
+            return false;
+          }
+          matched_null_probe_.push_back(false);
         }
-        matched_null_probe_.push_back(false);
-        goto next_row;
+        continue;
       }
-      continue;
+      return true;
     }
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+    // The build partition is in memory. Return this row for probing.
     if (LIKELY(hash_tbls_[partition_idx] != NULL)) {
       hash_tbl_iterator_ = hash_tbls_[partition_idx]->FindProbeRow(ht_ctx, hash);
-    } else {
-      Partition* partition = hash_partitions_[partition_idx];
-      if (UNLIKELY(partition->is_closed())) {
-        // This partition is closed, meaning the build side for this partition was empty.
-        DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
+      return true;
+    }
+    // The build partition is either empty or spilled.
+    Partition* partition = hash_partitions_[partition_idx];
+    // This partition is closed, meaning the build side for this partition was empty.
+    if (UNLIKELY(partition->is_closed())) {
+      DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
+      return true;
+    }
+    // This partition is not in memory, spill the probe row and move to the next row.
+    DCHECK(partition->is_spilled());
+    DCHECK(partition->probe_rows() != NULL);
+    if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
+      DCHECK(!status->ok());
+      return false;
+    }
+  }
+  // Finished this batch.
+  current_probe_row_ = NULL;
+  return false;
+}
+
+// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
+// codegen.
+template<int const JoinOp>
+int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
+    const HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
+  ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
+  const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
+  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
+  const int num_conjuncts = conjunct_ctxs_.size();
+
+  DCHECK(!out_batch->AtCapacity());
+  DCHECK_GE(probe_batch_pos_, 0);
+  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->AddRow());
+  const int max_rows = out_batch->capacity() - out_batch->num_rows();
+  // Note that 'probe_batch_pos_' is the row no. of the row after 'current_probe_row_'.
+  RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
+  int remaining_capacity = max_rows;
+
+  do {
+    if (current_probe_row_ != NULL) {
+      if (JoinOp == TJoinOp::INNER_JOIN) {
+        if (!ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts,
+            conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity)) {
+          break;
+        }
+      } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
+                 JoinOp == TJoinOp::RIGHT_ANTI_JOIN) {
+        if (!ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs,
+            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
+            &remaining_capacity)) {
+          break;
+        }
+      } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
+                 JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
+                 JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+        if (!ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs,
+            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
+            &remaining_capacity, status)) {
+          break;
+        }
       } else {
-        // This partition is not in memory, spill the probe row and move to the next row.
-        DCHECK(partition->is_spilled());
-        DCHECK(partition->probe_rows() != NULL);
-        if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
-          num_rows_added = -1;
-          goto update_probe_batch_pos;
+        DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
+            JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
+        if (!ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs,
+            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
+            &remaining_capacity)) {
+          break;
         }
-        goto next_row;
       }
     }
+    // Must have reached the end of the hash table iterator for the current row before
+    // moving to the next row.
+    DCHECK(hash_tbl_iterator_.AtEnd());
+    DCHECK(status->ok());
+  } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity,
+      num_other_join_conjuncts, status));
+  // Update where we are in the probe batch.
+  probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) /
+      probe_batch_->num_tuples_per_row();
+  int num_rows_added;
+  if (LIKELY(status->ok())) {
+    num_rows_added = max_rows - remaining_capacity;
+  } else {
+    num_rows_added = -1;
   }
-
-update_probe_batch_pos:
-  probe_batch_pos_ =
-      (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) / probe_batch_->num_tuples_per_row();
-done:
   DCHECK_GE(probe_batch_pos_, 0);
   DCHECK_LE(probe_batch_pos_, probe_batch_->capacity());
   DCHECK_LE(num_rows_added, max_rows);
@@ -241,7 +349,7 @@ done:
 int PartitionedHashJoinNode::ProcessProbeBatch(
     const TJoinOp::type join_op, RowBatch* out_batch,
     const HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
- switch (join_op) {
+  switch (join_op) {
     case TJoinOp::INNER_JOIN:
       return ProcessProbeBatch<TJoinOp::INNER_JOIN>(out_batch, ht_ctx, status);
     case TJoinOp::LEFT_OUTER_JOIN:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a376385/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 5312de4..a51a15a 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -173,12 +173,88 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// structures.
   Status BuildHashTables(RuntimeState* state);
 
+  /// Probes the hash table for rows matching the current probe row and appends
+  /// all the matching build rows (with probe row) to output batch. Returns true
+  /// if probing is done for the current probe row and should continue to next row.
+  ///
+  /// 'out_batch_iterator' is the iterator for the output batch.
+  /// 'remaining_capacity' tracks the number of additional rows that can be added to
+  /// the output batch. It's updated as rows are added to the output batch.
+  /// Using a separate variable is probably faster than calling
+  /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load.
+  bool inline ProcessProbeRowInnerJoin(
+      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+      ExprContext* const* conjunct_ctxs, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+
+  /// Probes and updates the hash table for the current probe row for either
+  /// RIGHT_SEMI_JOIN or RIGHT_ANTI_JOIN. For RIGHT_SEMI_JOIN, all matching build
+  /// rows will be appended to the 'out_batch'; For RIGHT_ANTI_JOIN, update the
+  /// hash table only if matches are found. The actual output happens in
+  /// OutputUnmatchedBuild(). Returns true if probing is done for the current
+  /// probe row and should continue to next row.
+  ///
+  /// 'out_batch_iterator' is the iterator for the output batch.
+  /// 'remaining_capacity' tracks the number of additional rows that can be added to
+  /// the output batch. It's updated as rows are added to the output batch.
+  /// Using a separate variable is probably faster than calling
+  /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load.
+  template<int const JoinOp>
+  bool inline ProcessProbeRowRightSemiJoins(
+      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+      ExprContext* const* conjunct_ctxs, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+
+  /// Probes the hash table for the current probe row for LEFT_SEMI_JOIN,
+  /// LEFT_ANTI_JOIN or NULL_AWARE_LEFT_ANTI_JOIN. The probe row will be appended
+  /// to 'out_batch' if it's part of the output. Returns true if probing
+  /// is done for the current probe row and should continue to next row.
+  ///
+  /// 'out_batch_iterator' is the iterator for the output batch.
+  /// 'remaining_capacity' tracks the number of additional rows that can be added to
+  /// the output batch. It's updated as rows are added to the output batch.
+  /// Using a separate variable is probably faster than calling
+  /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load.
+  template<int const JoinOp>
+  bool inline ProcessProbeRowLeftSemiJoins(
+      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+      ExprContext* const* conjunct_ctxs, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status);
+
+  /// Probes the hash table for the current probe row for LEFT_OUTER_JOIN,
+  /// RIGHT_OUTER_JOIN or FULL_OUTER_JOIN. The matching build and/or probe row
+  /// will appended to 'out_batch'. For RIGHT/FULL_OUTER_JOIN, some of the outputs
+  /// are added in OutputUnmatchedBuild(). Returns true if probing is done for the
+  /// current probe row and should continue to next row.
+  ///
+  /// 'out_batch_iterator' is the iterator for the output batch.
+  /// 'remaining_capacity' tracks the number of additional rows that can be added to
+  /// the output batch. It's updated as rows are added to the output batch.
+  /// Using a separate variable is probably faster than calling
+  /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load.
+  /// 'status' may be updated if appending to null aware BTS fails.
+  template<int const JoinOp>
+  bool inline ProcessProbeRowOuterJoins(
+      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+      ExprContext* const* conjunct_ctxs, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+
+  /// Find the next probe row. Returns true if a probe row is found. In which case,
+  /// 'current_probe_row_' and 'hash_tbl_iterator_' have been set up to point to the
+  /// next probe row and its corresponding partition. 'status' may be updated if
+  /// append to the spilled partitions' BTS or null probe rows' BTS fail.
+  template<int const JoinOp>
+  bool inline NextProbeRow(
+      const HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
+      int* remaining_capacity, int num_other_join_conjuncts, Status* status);
+
   /// Process probe rows from probe_batch_. Returns either if out_batch is full or
   /// probe_batch_ is entirely consumed.
   /// For RIGHT_ANTI_JOIN, all this function does is to mark whether each build row
   /// had a match.
   /// Returns the number of rows added to out_batch; -1 on error (and *status will be
-  /// set).
+  /// set). This function doesn't commit rows to the output batch so it's the caller's
+  /// responsibility to do so.
   template<int const JoinOp>
   int ProcessProbeBatch(RowBatch* out_batch, const HashTableCtx* ht_ctx, Status* status);
 
@@ -475,6 +551,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
    private:
     friend class PartitionedHashJoinNode;
 
+    /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' contains the
+    /// index of each row's index into the hash table's tuple stream. This function is
+    /// replaced with a codegen'd version.
+    bool InsertBatch(HashTableCtx* ctx, RowBatch* batch,
+        const std::vector<BufferedTupleStream::RowIdx>& indices);
+
     PartitionedHashJoinNode* parent_;
 
     /// This partition is completely processed and nothing needs to be done for it again.
@@ -499,12 +581,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// If NULL, ownership has been transfered.
     BufferedTupleStream* build_rows_;
     BufferedTupleStream* probe_rows_;
-
-    /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' contains the
-    /// index of each row's index into the hash table's tuple stream. This function is
-    /// replaced with a codegen'd version.
-    bool InsertBatch(HashTableCtx* ctx, RowBatch* batch,
-        const std::vector<BufferedTupleStream::RowIdx>& indices);
   };
 
   /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a376385/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 41d2e99..33fa482 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -173,6 +173,9 @@ class RowBatch {
     /// RowBatch::AtCapacity() instead.
     bool IR_ALWAYS_INLINE AtEnd() { return row_ >= row_batch_end_; }
 
+    /// Returns the row batch which this iterator is iterating through.
+    RowBatch* parent() { return parent_; }
+
    private:
     /// Number of tuples per row.
     const int num_tuples_per_row_;