You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/17 08:30:19 UTC

[3/4] incubator-impala git commit: IMPALA-3286: Prefetching for PHJ probing.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index 45b5a0e..d575c01 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -53,13 +53,13 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
 template<bool AGGREGATED_ROWS>
 Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
     HashTableCtx* __restrict__ ht_ctx) {
-  uint32_t hash = 0;
   if (AGGREGATED_ROWS) {
-    if (!ht_ctx->EvalAndHashBuild(row, &hash)) return Status::OK();
+    if (!ht_ctx->EvalAndHashBuild(row)) return Status::OK();
   } else {
-    if (!ht_ctx->EvalAndHashProbe(row, &hash)) return Status::OK();
+    if (!ht_ctx->EvalAndHashProbe(row)) return Status::OK();
   }
 
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
   // To process this row, we first see if it can be aggregated or inserted into this
   // partition's hash table. If we need to insert it and that fails, due to OOM, we
   // spill the partition. The partition to spill is not necessarily dst_partition,
@@ -76,7 +76,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
   bool found;
   // Find the appropriate bucket in the hash table. There will always be a free
   // bucket because we checked the size above.
-  HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, hash, &found);
+  HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, &found);
   DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
   if (AGGREGATED_ROWS) {
     // If the row is already an aggregate row, it cannot match anything in the
@@ -149,9 +149,9 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
 
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
   FOREACH_ROW(in_batch, 0, in_batch_iter) {
-    uint32_t hash;
     TupleRow* in_row = in_batch_iter.Get();
-    if (!ht_ctx->EvalAndHashProbe(in_row, &hash)) continue;
+    if (!ht_ctx->EvalAndHashProbe(in_row)) continue;
+    uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
 
     if (TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], in_row, hash,
@@ -192,7 +192,7 @@ bool PartitionedAggregationNode::TryAddToHashTable(
   DCHECK_GE(*remaining_capacity, 0);
   bool found;
   // This is called from ProcessBatchStreaming() so the rows are not aggregated.
-  HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, hash, &found);
+  HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, &found);
   Tuple* intermediate_tuple;
   if (found) {
     intermediate_tuple = it.GetTuple();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index b7dca61..2cde059 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -252,9 +252,9 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
     RETURN_IF_ERROR(state_->GetQueryStatus());
     singleton_output_tuple_returned_ = false;
   } else {
-    ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, grouping_expr_ctxs_, true,
-        std::vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(),
-        MAX_PARTITION_DEPTH, 1));
+    RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, grouping_expr_ctxs_,
+        true, vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(),
+        MAX_PARTITION_DEPTH, 1, mem_tracker(), &ht_ctx_));
     RETURN_IF_ERROR(state_->block_mgr()->RegisterClient(
         Substitute("PartitionedAggregationNode id=$0 ptr=$1", id_, this),
         MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_));
@@ -984,9 +984,9 @@ int PartitionedAggregationNode::GroupingExprsVarlenSize() {
   int varlen_size = 0;
   // TODO: The hash table could compute this as it hashes.
   for (int expr_idx: string_grouping_exprs_) {
-    StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->last_expr_value(expr_idx));
+    StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx));
     // Avoid branching by multiplying length by null bit.
-    varlen_size += sv->len * !ht_ctx_->last_expr_value_null(expr_idx);
+    varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx);
   }
   return varlen_size;
 }
@@ -997,17 +997,17 @@ void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
   // Copy over all grouping slots (the variable length data is copied below).
   for (int i = 0; i < grouping_expr_ctxs_.size(); ++i) {
     SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i];
-    if (ht_ctx_->last_expr_value_null(i)) {
+    if (ht_ctx_->ExprValueNull(i)) {
       intermediate_tuple->SetNull(slot_desc->null_indicator_offset());
     } else {
-      void* src = ht_ctx_->last_expr_value(i);
+      void* src = ht_ctx_->ExprValue(i);
       void* dst = intermediate_tuple->GetSlot(slot_desc->tuple_offset());
       memcpy(dst, src, slot_desc->slot_size());
     }
   }
 
   for (int expr_idx: string_grouping_exprs_) {
-    if (ht_ctx_->last_expr_value_null(expr_idx)) continue;
+    if (ht_ctx_->ExprValueNull(expr_idx)) continue;
 
     SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx];
     // ptr and len were already copied to the fixed-len part of string value

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index ef4c010..d66eda0 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -219,67 +219,145 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
 }
 
 template<int const JoinOp>
+bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow(
+    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+    ExprContext* const* conjunct_ctxs, int num_conjuncts,
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
+  if (JoinOp == TJoinOp::INNER_JOIN) {
+    return ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts,
+        conjunct_ctxs, num_conjuncts, out_batch_iterator, remaining_capacity);
+  } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
+             JoinOp == TJoinOp::RIGHT_ANTI_JOIN) {
+    return ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs,
+        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+        remaining_capacity);
+  } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
+             JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
+             JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+    return ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs,
+        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+        remaining_capacity, status);
+  } else {
+    DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
+           JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
+    return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs,
+        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+        remaining_capacity);
+  }
+}
+
+template<int const JoinOp>
 bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
     HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
-    int* remaining_capacity, int num_other_join_conjuncts, Status* status) {
-  while (!probe_batch_iterator->AtEnd()) {
+    int* remaining_capacity, Status* status) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  while (!expr_vals_cache->AtEnd()) {
     // Establish current_probe_row_ and find its corresponding partition.
+    DCHECK(!probe_batch_iterator->AtEnd());
     current_probe_row_ = probe_batch_iterator->Get();
-    probe_batch_iterator->Next();
     matched_probe_ = false;
 
-    uint32_t hash;
-    if (!ht_ctx->EvalAndHashProbe(current_probe_row_, &hash)) {
+    // True if the current row should be skipped for probing.
+    bool skip_row = false;
+
+    // The hash of the expressions results for the current probe row.
+    uint32_t hash = expr_vals_cache->ExprValuesHash();
+    // Hoist the followings out of the else statement below to speed up non-null case.
+    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+    HashTable* hash_tbl = hash_tbls_[partition_idx];
+
+    // Fetch the hash and expr values' nullness for this row.
+    if (expr_vals_cache->IsRowNull()) {
       if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && non_empty_build_) {
+        const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
         // For NAAJ, we need to treat NULLs on the probe carefully. The logic is:
-        // 1. No build rows -> Return this row.
+        // 1. No build rows -> Return this row. The check for 'non_empty_build_'
+        //    is for this case.
         // 2. Has build rows & no other join predicates, skip row.
         // 3. Has build rows & other join predicates, we need to evaluate against all
         // build rows. First evaluate it against this partition, and if there is not
         // a match, save it to evaluate against other partitions later. If there
         // is a match, the row is skipped.
-        if (num_other_join_conjuncts > 0) {
-          if (UNLIKELY(!AppendRow(null_probe_rows_, current_probe_row_, status))) {
+        if (num_other_join_conjuncts == 0) {
+          // Condition 2 above.
+          skip_row = true;
+        } else if (LIKELY(AppendRow(null_probe_rows_, current_probe_row_, status))) {
+          // Condition 3 above.
+          matched_null_probe_.push_back(false);
+          skip_row = true;
+        } else {
+          // Condition 3 above but failed to append to 'null_probe_rows_'. Bail out.
+          DCHECK(!status->ok());
+          return false;
+        }
+      }
+    } else {
+      // The build partition is in memory. Return this row for probing.
+      if (LIKELY(hash_tbl != NULL)) {
+        hash_tbl_iterator_ = hash_tbl->FindProbeRow(ht_ctx);
+      } else {
+        // The build partition is either empty or spilled.
+        Partition* partition = hash_partitions_[partition_idx];
+        // This partition is closed, meaning the build side for this partition was empty.
+        if (UNLIKELY(partition->is_closed())) {
+          DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
+        } else {
+          // This partition is not in memory, spill the probe row and move to the next row.
+          DCHECK(partition->is_spilled());
+          DCHECK(partition->probe_rows() != NULL);
+          // Skip the current row if we manage to append to the spilled partition's BTS.
+          // Otherwise, we need to bail out and report the failure.
+          if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
             DCHECK(!status->ok());
             return false;
           }
-          matched_null_probe_.push_back(false);
+          skip_row = true;
         }
-        continue;
       }
-      return true;
-    }
-    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-    // The build partition is in memory. Return this row for probing.
-    if (LIKELY(hash_tbls_[partition_idx] != NULL)) {
-      hash_tbl_iterator_ = hash_tbls_[partition_idx]->FindProbeRow(ht_ctx, hash);
-      return true;
-    }
-    // The build partition is either empty or spilled.
-    Partition* partition = hash_partitions_[partition_idx];
-    // This partition is closed, meaning the build side for this partition was empty.
-    if (UNLIKELY(partition->is_closed())) {
-      DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
-      return true;
-    }
-    // This partition is not in memory, spill the probe row and move to the next row.
-    DCHECK(partition->is_spilled());
-    DCHECK(partition->probe_rows() != NULL);
-    if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
-      DCHECK(!status->ok());
-      return false;
     }
+    // Move to the next probe row and hash table context's cached value.
+    probe_batch_iterator->Next();
+    expr_vals_cache->NextRow();
+    if (skip_row) continue;
+    DCHECK(status->ok());
+    return true;
+  }
+  if (probe_batch_iterator->AtEnd()) {
+    // No more probe row.
+    current_probe_row_ = NULL;
   }
-  // Finished this batch.
-  current_probe_row_ = NULL;
   return false;
 }
 
-// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
-// codegen.
+void IR_ALWAYS_INLINE PartitionedHashJoinNode::EvalAndHashProbePrefetchGroup(
+    TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx) {
+  RowBatch* probe_batch = probe_batch_.get();
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int prefetch_size = expr_vals_cache->capacity();
+  DCHECK(expr_vals_cache->AtEnd());
+
+  expr_vals_cache->Reset();
+  FOREACH_ROW_LIMIT(probe_batch, probe_batch_pos_, prefetch_size, batch_iter) {
+    TupleRow* row = batch_iter.Get();
+    if (ht_ctx->EvalAndHashProbe(row)) {
+      if (prefetch_mode != TPrefetchMode::NONE) {
+        uint32_t hash = expr_vals_cache->ExprValuesHash();
+        const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+        HashTable* hash_tbl = hash_tbls_[partition_idx];
+        if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<true>(hash);
+      }
+    } else {
+      expr_vals_cache->SetRowNull();
+    }
+    expr_vals_cache->NextRow();
+  }
+  expr_vals_cache->ResetForRead();
+}
+
+// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by codegen.
 template<int const JoinOp>
-int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
-    HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
+int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode,
+    RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
   ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
   const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
@@ -292,48 +370,51 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
   // Note that 'probe_batch_pos_' is the row no. of the row after 'current_probe_row_'.
   RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
   int remaining_capacity = max_rows;
+  bool has_probe_rows = current_probe_row_ != NULL || !probe_batch_iterator.AtEnd();
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
 
-  do {
-    if (current_probe_row_ != NULL) {
-      if (JoinOp == TJoinOp::INNER_JOIN) {
-        if (!ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts,
-            conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity)) {
-          break;
-        }
-      } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
-                 JoinOp == TJoinOp::RIGHT_ANTI_JOIN) {
-        if (!ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs,
-            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
-            &remaining_capacity)) {
-          break;
-        }
-      } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
-                 JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
-                 JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-        if (!ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs,
-            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
-            &remaining_capacity, status)) {
-          break;
-        }
-      } else {
-        DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
-            JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
-        if (!ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs,
-            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
-            &remaining_capacity)) {
+  // Keep processing more probe rows if there are more to process and the output batch
+  // has room and we haven't hit any error yet.
+  while (has_probe_rows && remaining_capacity > 0 && status->ok()) {
+    // Prefetch for the current hash_tbl_iterator_.
+    if (prefetch_mode != TPrefetchMode::NONE) {
+      hash_tbl_iterator_.PrefetchBucket<true>();
+    }
+    // Evaluate and hash more rows if prefetch group is empty. A prefetch group is a cache
+    // of probe expressions results, nullness of the expression values and hash values
+    // against some consecutive number of rows in the probe batch. Prefetching, if
+    // enabled, is interleaved with the rows' evaluation and hashing. If the prefetch
+    // group is partially full (e.g. we returned before the current prefetch group was
+    // exhausted in the previous iteration), we will proceed with the remaining items in
+    // the values cache.
+    if (expr_vals_cache->AtEnd()) {
+      EvalAndHashProbePrefetchGroup(prefetch_mode, ht_ctx);
+    }
+    // Process the prefetch group.
+    do {
+      // 'current_probe_row_' can be NULL on the first iteration through this loop.
+      if (current_probe_row_ != NULL) {
+        if (!ProcessProbeRow<JoinOp>(other_join_conjunct_ctxs, num_other_join_conjuncts,
+            conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity,
+            status)) {
+          if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
           break;
         }
       }
-    }
-    // Must have reached the end of the hash table iterator for the current row before
-    // moving to the next row.
-    DCHECK(hash_tbl_iterator_.AtEnd());
-    DCHECK(status->ok());
-  } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity,
-      num_other_join_conjuncts, status));
-  // Update where we are in the probe batch.
-  probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) /
-      probe_batch_->num_tuples_per_row();
+      // Must have reached the end of the hash table iterator for the current row before
+      // moving to the next row.
+      DCHECK(hash_tbl_iterator_.AtEnd());
+      DCHECK(status->ok());
+    } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity,
+        status));
+    // Update whether there are more probe rows to process in the current batch.
+    has_probe_rows = current_probe_row_ != NULL;
+    if (!has_probe_rows) DCHECK(probe_batch_iterator.AtEnd());
+    // Update where we are in the probe batch.
+    probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) /
+        probe_batch_->num_tuples_per_row();
+  }
+
   int num_rows_added;
   if (LIKELY(status->ok())) {
     num_rows_added = max_rows - remaining_capacity;
@@ -346,42 +427,14 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
   return num_rows_added;
 }
 
-int PartitionedHashJoinNode::ProcessProbeBatch(
-    const TJoinOp::type join_op, RowBatch* out_batch,
-    HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
-  switch (join_op) {
-    case TJoinOp::INNER_JOIN:
-      return ProcessProbeBatch<TJoinOp::INNER_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::LEFT_OUTER_JOIN:
-      return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::LEFT_SEMI_JOIN:
-      return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::LEFT_ANTI_JOIN:
-      return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
-      return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(out_batch, ht_ctx,
-          status);
-    case TJoinOp::RIGHT_OUTER_JOIN:
-      return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::RIGHT_SEMI_JOIN:
-      return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::RIGHT_ANTI_JOIN:
-      return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::FULL_OUTER_JOIN:
-      return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(out_batch, ht_ctx, status);
-    default:
-      DCHECK(false) << "Unknown join type";
-      return -1;
-  }
-}
-
 Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
     bool build_filters) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx_->expr_values_cache();
+  expr_vals_cache->Reset();
   FOREACH_ROW(build_batch, 0, build_batch_iter) {
     DCHECK(build_status_.ok());
-    uint32_t hash;
     TupleRow* build_row = build_batch_iter.Get();
-    if (!ht_ctx_->EvalAndHashBuild(build_row, &hash)) {
+    if (!ht_ctx_->EvalAndHashBuild(build_row)) {
       if (null_aware_partition_ != NULL) {
         // TODO: remove with codegen/template
         // If we are NULL aware and this build row has NULL in the eq join slot,
@@ -405,6 +458,7 @@ Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
         ctx.local_bloom_filter->Insert(filter_hash);
       }
     }
+    const uint32_t hash = expr_vals_cache->ExprValuesHash();
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
     Partition* partition = hash_partitions_[partition_idx];
     const bool result = AppendRow(partition->build_rows(), build_row, &build_status_);
@@ -413,37 +467,69 @@ Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
   return Status::OK();
 }
 
-bool PartitionedHashJoinNode::Partition::InsertBatch(HashTableCtx* ht_ctx,
-    RowBatch* batch, const vector<BufferedTupleStream::RowIdx>& indices) {
-  DCHECK_LE(batch->num_rows(), hash_values_.size());
-  DCHECK_LE(batch->num_rows(), null_bitmap_.num_bits());
+bool PartitionedHashJoinNode::Partition::InsertBatch(
+    TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch,
+    const vector<BufferedTupleStream::RowIdx>& indices) {
   // Compute the hash values and prefetch the hash table buckets.
-  int i = 0;
-  uint32_t* hash_values = hash_values_.data();
-  null_bitmap_.SetAllBits(false);
-  FOREACH_ROW(batch, 0, batch_iter) {
-    if (ht_ctx->EvalAndHashBuild(batch_iter.Get(), &hash_values[i])) {
-      // TODO: Find the optimal prefetch batch size. This may be something
-      // processor dependent so we may need calibration at Impala startup time.
-      hash_tbl_->PrefetchBucket<false>(hash_values[i]);
-    } else {
-      null_bitmap_.Set<false>(i, true);
+  const int num_rows = batch->num_rows();
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int prefetch_size = expr_vals_cache->capacity();
+  const BufferedTupleStream::RowIdx* row_indices = indices.data();
+  for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
+       prefetch_group_row += prefetch_size) {
+    int cur_row = prefetch_group_row;
+    expr_vals_cache->Reset();
+    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
+      if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) {
+        if (prefetch_mode != TPrefetchMode::NONE) {
+          hash_tbl_->PrefetchBucket<false>(expr_vals_cache->ExprValuesHash());
+        }
+      } else {
+        expr_vals_cache->SetRowNull();
+      }
+      expr_vals_cache->NextRow();
     }
-    ++i;
-  }
-  // Do the insertion.
-  i = 0;
-  const BufferedTupleStream::RowIdx* row_idx = indices.data();
-  FOREACH_ROW(batch, 0, batch_iter) {
-    if (LIKELY(!null_bitmap_.Get<false>(i))) {
+    // Do the insertion.
+    expr_vals_cache->ResetForRead();
+    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
       TupleRow* row = batch_iter.Get();
-      if (UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx[i], row, hash_values[i]))) {
+      BufferedTupleStream::RowIdx row_idx = row_indices[cur_row];
+      if (!expr_vals_cache->IsRowNull() &&
+          UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) {
         return false;
       }
+      expr_vals_cache->NextRow();
+      ++cur_row;
     }
-    ++i;
   }
   return true;
 }
 
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 47c118a..1888e06 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -143,7 +143,9 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
   // Although ConstructBuildSide() maybe be run in a separate thread, it is safe to free
   // local allocations in QueryMaintenance() since the build thread is not run
   // concurrently with other expr evaluation in this join node.
-  AddExprCtxsToFree(probe_expr_ctxs_);
+  // Probe side expr is not included in QueryMaintenance(). We cache the probe expression
+  // values in ExprValuesCache. Local allocations need to survive until the cache is reset
+  // so we need to manually free probe expr local allocations.
   AddExprCtxsToFree(build_expr_ctxs_);
 
   // other_join_conjunct_ctxs_ are evaluated in the context of rows assembled from all
@@ -162,10 +164,10 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
       join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN ||
       std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), false,
                       std::logical_or<bool>());
-  ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, probe_expr_ctxs_, should_store_nulls,
-      is_not_distinct_from_, state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
-      child(1)->row_desc().tuple_descriptors().size()));
-
+  RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_,
+      should_store_nulls, is_not_distinct_from_, state->fragment_hash_seed(),
+      MAX_PARTITION_DEPTH, child(1)->row_desc().tuple_descriptors().size(), mem_tracker(),
+      &ht_ctx_));
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime");
   }
@@ -206,20 +208,20 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
         ht_ctx_->CodegenHashCurrentRow(state, true, &murmur_hash_fn));
 
     // Codegen for evaluating build rows
-    Function* eval_row_fn;
-    codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_row_fn));
+    Function* eval_build_row_fn;
+    codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_build_row_fn));
 
     if (codegen_status.ok()) {
       // Codegen for build path
       build_codegen_status =
-          CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_row_fn);
+          CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_build_row_fn);
       if (build_codegen_status.ok()) build_codegen_enabled = true;
       // Codegen for probe path
       probe_codegen_status = CodegenProcessProbeBatch(state, hash_fn, murmur_hash_fn);
       if (probe_codegen_status.ok()) probe_codegen_enabled = true;
       // Codegen for InsertBatch()
       insert_codegen_status = CodegenInsertBatch(state, hash_fn, murmur_hash_fn,
-          eval_row_fn);
+          eval_build_row_fn);
       if (insert_codegen_status.ok()) ht_construction_codegen_enabled = true;
     } else {
       build_codegen_status = codegen_status;
@@ -324,8 +326,7 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
   : parent_(parent),
     is_closed_(false),
     is_spilled_(false),
-    level_(level),
-    null_bitmap_(state->batch_size()) {
+    level_(level) {
   build_rows_ = new BufferedTupleStream(state, parent_->child(1)->row_desc(),
       state->block_mgr(), parent_->block_mgr_client_,
       true /* use_initial_small_buffers */, false /* read_write */);
@@ -334,8 +335,6 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
       state->block_mgr(), parent_->block_mgr_client_,
       true /* use_initial_small_buffers */, false /* read_write */ );
   DCHECK(probe_rows_ != NULL);
-  hash_values_.resize(state->batch_size());
-  null_bitmap_.SetAllBits(false);
 }
 
 PartitionedHashJoinNode::Partition::~Partition() {
@@ -465,6 +464,7 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
     DCHECK_EQ(batch.num_rows(), indices.size());
     DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
         << build_rows()->RowConsumesMemory();
+    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
     SCOPED_TIMER(parent_->build_timer_);
     if (parent_->insert_batch_fn_ != NULL) {
       InsertBatchFn insert_batch_fn;
@@ -474,9 +474,13 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
         insert_batch_fn = parent_->insert_batch_fn_;
       }
       DCHECK(insert_batch_fn != NULL);
-      if (UNLIKELY(!insert_batch_fn(this, ctx, &batch, indices))) goto not_built;
+      if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
+        goto not_built;
+      }
     } else {
-      if (UNLIKELY(!InsertBatch(ctx, &batch, indices))) goto not_built;
+      if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) {
+        goto not_built;
+      }
     }
     RETURN_IF_ERROR(state->GetQueryStatus());
     parent_->FreeLocalAllocations();
@@ -652,7 +656,6 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
     DCHECK(new_partition != NULL);
     hash_partitions_.push_back(partition_pool_->Add(new_partition));
     RETURN_IF_ERROR(new_partition->build_rows()->Init(id(), runtime_profile(), true));
-
     // Initialize a buffer for the probe here to make sure why have it if we need it.
     // While this is not strictly necessary (there are some cases where we won't need this
     // buffer), the benefit is low. Not grabbing this buffer means there is an additional
@@ -671,6 +674,8 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
   while (!eos) {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
+    // 'probe_expr_ctxs_' should have made no local allocations in this function.
+    DCHECK(!ExprContext::HasLocalAllocations(probe_expr_ctxs_));
     if (input_partition_ == NULL) {
       // If we are still consuming batches from the build side.
       {
@@ -887,6 +892,43 @@ int64_t PartitionedHashJoinNode::LargestSpilledPartition() const {
   return max_rows;
 }
 
+int PartitionedHashJoinNode::ProcessProbeBatch(
+    const TJoinOp::type join_op, TPrefetchMode::type prefetch_mode,
+    RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status) {
+  switch (join_op) {
+    case TJoinOp::INNER_JOIN:
+      return ProcessProbeBatch<TJoinOp::INNER_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::LEFT_OUTER_JOIN:
+      return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::LEFT_SEMI_JOIN:
+      return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::LEFT_ANTI_JOIN:
+      return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
+      return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(prefetch_mode,
+          out_batch, ht_ctx, status);
+    case TJoinOp::RIGHT_OUTER_JOIN:
+      return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::RIGHT_SEMI_JOIN:
+      return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::RIGHT_ANTI_JOIN:
+      return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::FULL_OUTER_JOIN:
+      return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    default:
+      DCHECK(false) << "Unknown join type";
+      return -1;
+  }
+}
+
 Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch,
     bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
@@ -951,16 +993,19 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
       // Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR
       // in the xcompiled function, so call it here instead.
       int rows_added = 0;
+      TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
       SCOPED_TIMER(probe_timer_);
       if (process_probe_batch_fn_ == NULL) {
-        rows_added = ProcessProbeBatch(join_op_, out_batch, ht_ctx_.get(), &status);
+        rows_added = ProcessProbeBatch(join_op_, prefetch_mode, out_batch, ht_ctx_.get(),
+            &status);
       } else {
         DCHECK(process_probe_batch_fn_level0_ != NULL);
         if (ht_ctx_->level() == 0) {
-          rows_added = process_probe_batch_fn_level0_(this, out_batch, ht_ctx_.get(),
-              &status);
+          rows_added = process_probe_batch_fn_level0_(this, prefetch_mode, out_batch,
+              ht_ctx_.get(), &status);
         } else {
-          rows_added = process_probe_batch_fn_(this, out_batch, ht_ctx_.get(), &status);
+          rows_added = process_probe_batch_fn_(this, prefetch_mode, out_batch,
+              ht_ctx_.get(), &status);
         }
       }
       if (UNLIKELY(rows_added < 0)) {
@@ -982,6 +1027,10 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
     } else {
       RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
     }
+    // Free local allocations of the probe side expressions only after ExprValuesCache
+    // has been reset.
+    DCHECK(ht_ctx_->expr_values_cache()->AtEnd());
+    ExprContext::FreeLocalAllocations(probe_expr_ctxs_);
 
     // We want to return as soon as we have attached a tuple stream to the out_batch
     // (before preparing a new partition). The attached tuple stream will be recycled
@@ -1615,6 +1664,7 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
       codegen->CloneFunction(process_build_batch_fn);
 
   // Always build runtime filters at level0 (if there are any).
+  // Note that the first argument of this function is the return value.
   Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 3);
   build_filters_l0_arg->replaceAllUsesWith(
       ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
@@ -1630,7 +1680,8 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
   DCHECK_EQ(replaced, 1);
 
   // Never build filters after repartitioning, as all rows have already been added to the
-  // filters during the level0 build.
+  // filters during the level0 build. Note that the first argument of this function is the
+  // return value.
   Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 3);
   build_filters_arg->replaceAllUsesWith(
       ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
@@ -1698,21 +1749,26 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
   DCHECK(process_probe_batch_fn != NULL);
   process_probe_batch_fn->setName("ProcessProbeBatch");
 
-  // Since ProcessProbeBatch() is a templated function, it has linkonce_odr linkage, which
-  // means the function can be removed if it's not referenced. Change to weak_odr, which
-  // has the same semantics except it can't be removed.
-  // See http://llvm.org/docs/LangRef.html#linkage-types
-  DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::LinkOnceODRLinkage)
+  // Verifies that ProcessProbeBatch() has weak_odr linkage so it's not discarded even
+  // if it's not referenced. See http://llvm.org/docs/LangRef.html#linkage-types
+  DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::WeakODRLinkage)
       << LlvmCodeGen::Print(process_probe_batch_fn);
-  process_probe_batch_fn->setLinkage(GlobalValue::WeakODRLinkage);
 
   // Bake in %this pointer argument to process_probe_batch_fn.
   Value* this_arg = codegen->GetArgument(process_probe_batch_fn, 0);
   Value* this_loc = codegen->CastPtrToLlvmPtr(this_arg->getType(), this);
   this_arg->replaceAllUsesWith(this_loc);
 
+  // Replace the parameter 'prefetch_mode' with constant.
+  Value* prefetch_mode_arg = codegen->GetArgument(process_probe_batch_fn, 1);
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
+  DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
+  prefetch_mode_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
   // Bake in %ht_ctx pointer argument to process_probe_batch_fn
-  Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 2);
+  Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 3);
   Value* ht_ctx_loc = codegen->CastPtrToLlvmPtr(ht_ctx_arg->getType(), ht_ctx_.get());
   ht_ctx_arg->replaceAllUsesWith(ht_ctx_loc);
 
@@ -1823,9 +1879,17 @@ Status PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state,
   Function* build_equals_fn;
   RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state, true, &build_equals_fn));
 
+  // Replace the parameter 'prefetch_mode' with constant.
+  Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
+  DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
+  prefetch_mode_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
   // Use codegen'd EvalBuildRow() function
   int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
-  DCHECK_EQ(replaced, 2);
+  DCHECK_EQ(replaced, 1);
 
   // Use codegen'd Equals() function
   replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 37c2c4f..a76dced 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -189,7 +189,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Probes and updates the hash table for the current probe row for either
   /// RIGHT_SEMI_JOIN or RIGHT_ANTI_JOIN. For RIGHT_SEMI_JOIN, all matching build
-  /// rows will be appended to the 'out_batch'; For RIGHT_ANTI_JOIN, update the
+  /// rows will be appended to the output batch; For RIGHT_ANTI_JOIN, update the
   /// hash table only if matches are found. The actual output happens in
   /// OutputUnmatchedBuild(). Returns true if probing is done for the current
   /// probe row and should continue to next row.
@@ -207,8 +207,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Probes the hash table for the current probe row for LEFT_SEMI_JOIN,
   /// LEFT_ANTI_JOIN or NULL_AWARE_LEFT_ANTI_JOIN. The probe row will be appended
-  /// to 'out_batch' if it's part of the output. Returns true if probing
-  /// is done for the current probe row and should continue to next row.
+  /// to output batch if there is a match (for LEFT_SEMI_JOIN) or if there is no
+  /// match (for LEFT_ANTI_JOIN). Returns true if probing is done for the current
+  /// probe row and should continue to next row.
   ///
   /// 'out_batch_iterator' is the iterator for the output batch.
   /// 'remaining_capacity' tracks the number of additional rows that can be added to
@@ -223,7 +224,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Probes the hash table for the current probe row for LEFT_OUTER_JOIN,
   /// RIGHT_OUTER_JOIN or FULL_OUTER_JOIN. The matching build and/or probe row
-  /// will appended to 'out_batch'. For RIGHT/FULL_OUTER_JOIN, some of the outputs
+  /// will be appended to output batch. For RIGHT/FULL_OUTER_JOIN, some of the outputs
   /// are added in OutputUnmatchedBuild(). Returns true if probing is done for the
   /// current probe row and should continue to next row.
   ///
@@ -239,6 +240,25 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
       ExprContext* const* conjunct_ctxs, int num_conjuncts,
       RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
 
+  /// Probes 'current_probe_row_' against the the hash tables and append outputs
+  /// to output batch. Wrapper around the join-type specific probe row functions
+  /// declared above.
+  template<int const JoinOp>
+  bool inline ProcessProbeRow(
+      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+      ExprContext* const* conjunct_ctxs, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status);
+
+  /// Evaluates some number of rows in 'probe_batch_' against the probe expressions
+  /// and hashes the results to 32-bit hash values. The evaluation results and the hash
+  /// values are stored in the expression values cache in 'ht_ctx'. The number of rows
+  /// processed depends on the capacity available in 'ht_ctx->expr_values_cache_'.
+  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
+  /// hash table buckets will be prefetched based on the hash values computed. Note
+  /// that 'prefetch_mode' will be substituted with constants during codegen time.
+  void EvalAndHashProbePrefetchGroup(TPrefetchMode::type prefetch_mode,
+      HashTableCtx* ctx);
+
   /// Find the next probe row. Returns true if a probe row is found. In which case,
   /// 'current_probe_row_' and 'hash_tbl_iterator_' have been set up to point to the
   /// next probe row and its corresponding partition. 'status' may be updated if
@@ -246,7 +266,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   template<int const JoinOp>
   bool inline NextProbeRow(
       HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
-      int* remaining_capacity, int num_other_join_conjuncts, Status* status);
+      int* remaining_capacity, Status* status);
 
   /// Process probe rows from probe_batch_. Returns either if out_batch is full or
   /// probe_batch_ is entirely consumed.
@@ -256,11 +276,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// set). This function doesn't commit rows to the output batch so it's the caller's
   /// responsibility to do so.
   template<int const JoinOp>
-  int ProcessProbeBatch(RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
+  int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, HashTableCtx* ht_ctx,
+      Status* status);
 
   /// Wrapper that calls the templated version of ProcessProbeBatch() based on 'join_op'.
-  int ProcessProbeBatch(const TJoinOp::type join_op, RowBatch* out_batch,
-      HashTableCtx* ht_ctx, Status* status);
+  int ProcessProbeBatch(const TJoinOp::type join_op, TPrefetchMode::type,
+      RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
 
   /// Sweep the hash_tbl_ of the partition that is at the front of
   /// output_build_partitions_, using hash_tbl_iterator_ and output any unmatched build
@@ -553,9 +574,13 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
     /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array
     /// containing the index of each row's index into the hash table's tuple stream.
-    /// This function is replaced with a codegen'd version.
-    bool InsertBatch(HashTableCtx* ctx, RowBatch* batch,
-        const std::vector<BufferedTupleStream::RowIdx>& indices);
+    /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
+    /// table buckets which the rows hashes to will be prefetched. This parameter is
+    /// replaced with a constant during codegen time. This function may be replaced with
+    /// a codegen'd version. Returns true if all rows in 'batch' are successfully
+    /// inserted.
+    bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
+        RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices);
 
     PartitionedHashJoinNode* parent_;
 
@@ -581,13 +606,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// If NULL, ownership has been transfered.
     BufferedTupleStream* build_rows_;
     BufferedTupleStream* probe_rows_;
-
-    /// Store hash values of each row for the current batch computed during prefetching.
-    std::vector<uint32_t> hash_values_;
-
-    /// Bitmap to indicate rows evaluated to NULL for the current batch when building
-    /// hash tables.
-    Bitmap null_bitmap_;
   };
 
   /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
@@ -600,14 +618,14 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   ProcessBuildBatchFn process_build_batch_fn_;
   ProcessBuildBatchFn process_build_batch_fn_level0_;
 
-  typedef int (*ProcessProbeBatchFn)(
-      PartitionedHashJoinNode*, RowBatch*, HashTableCtx*, Status*);
+  typedef int (*ProcessProbeBatchFn)(PartitionedHashJoinNode*,
+      TPrefetchMode::type, RowBatch*, HashTableCtx*, Status*);
   /// Jitted ProcessProbeBatch function pointers.  NULL if codegen is disabled.
   ProcessProbeBatchFn process_probe_batch_fn_;
   ProcessProbeBatchFn process_probe_batch_fn_level0_;
 
-  typedef bool (*InsertBatchFn)(Partition*, HashTableCtx*, RowBatch*,
-      const std::vector<BufferedTupleStream::RowIdx>&);
+  typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*,
+      RowBatch*, const std::vector<BufferedTupleStream::RowIdx>&);
   /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
   InsertBatchFn insert_batch_fn_;
   InsertBatchFn insert_batch_fn_level0_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h
index d63df1e..8ebeab3 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -26,6 +26,7 @@ inline void PartitionedHashJoinNode::ResetForProbe() {
   probe_batch_pos_ = 0;
   matched_probe_ = true;
   hash_tbl_iterator_.SetAtEnd();
+  ht_ctx_->expr_values_cache()->Reset();
 }
 
 inline bool PartitionedHashJoinNode::AppendRow(BufferedTupleStream* stream,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exprs/expr-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.cc b/be/src/exprs/expr-context.cc
index 7231c5e..2a01bfa 100644
--- a/be/src/exprs/expr-context.cc
+++ b/be/src/exprs/expr-context.cc
@@ -110,8 +110,23 @@ Status ExprContext::Clone(RuntimeState* state, ExprContext** new_ctx) {
   return root_->Open(state, *new_ctx, FunctionContext::THREAD_LOCAL);
 }
 
-void ExprContext::FreeLocalAllocations() {
-  FreeLocalAllocations(fn_contexts_);
+bool ExprContext::HasLocalAllocations(const vector<ExprContext*>& ctxs) {
+  for (int i = 0; i < ctxs.size(); ++i) {
+    if (ctxs[i]->HasLocalAllocations()) return true;
+  }
+  return false;
+}
+
+bool ExprContext::HasLocalAllocations() {
+  return HasLocalAllocations(fn_contexts_);
+}
+
+bool ExprContext::HasLocalAllocations(const std::vector<FunctionContext*>& fn_ctxs) {
+  for (int i = 0; i < fn_ctxs.size(); ++i) {
+    if (fn_ctxs[i]->impl()->closed()) continue;
+    if (fn_ctxs[i]->impl()->HasLocalAllocations()) return true;
+  }
+  return false;
 }
 
 void ExprContext::FreeLocalAllocations(const vector<ExprContext*>& ctxs) {
@@ -120,6 +135,10 @@ void ExprContext::FreeLocalAllocations(const vector<ExprContext*>& ctxs) {
   }
 }
 
+void ExprContext::FreeLocalAllocations() {
+  FreeLocalAllocations(fn_contexts_);
+}
+
 void ExprContext::FreeLocalAllocations(const vector<FunctionContext*>& fn_ctxs) {
   for (int i = 0; i < fn_ctxs.size(); ++i) {
     if (fn_ctxs[i]->impl()->closed()) continue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index 0816774..9078ce6 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -123,10 +123,16 @@ class ExprContext {
   TimestampVal GetTimestampVal(TupleRow* row);
   DecimalVal GetDecimalVal(TupleRow* row);
 
+  /// Returns true if any of the expression contexts in the array has local allocations.
+  /// The last two are helper functions.
+  static bool HasLocalAllocations(const std::vector<ExprContext*>& ctxs);
+  bool HasLocalAllocations();
+  static bool HasLocalAllocations(const std::vector<FunctionContext*>& fn_ctxs);
+
   /// Frees all local allocations made by fn_contexts_. This can be called when result
-  /// data from this context is no longer needed.
-  void FreeLocalAllocations();
+  /// data from this context is no longer needed. The last two are helper functions.
   static void FreeLocalAllocations(const std::vector<ExprContext*>& ctxs);
+  void FreeLocalAllocations();
   static void FreeLocalAllocations(const std::vector<FunctionContext*>& ctxs);
 
   static const char* LLVM_CLASS_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index aac0043..3f2ba29 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -143,16 +143,21 @@ class RowBatch {
   }
 
   /// An iterator for going through a row batch, starting at 'row_idx'.
-  /// This is more efficient than using GetRow() as it avoids loading the
-  /// row batch state and doing multiplication on each loop with GetRow().
+  /// If 'limit' is specified, it will iterate up to row number 'row_idx + limit'
+  /// or the last row, whichever comes first. Otherwise, it will iterate till the last
+  /// row in the batch. This is more efficient than using GetRow() as it avoids loading
+  /// the row batch state and doing multiplication on each loop with GetRow().
   class Iterator {
    public:
-    IR_ALWAYS_INLINE Iterator(RowBatch* parent, int row_idx) :
+    Iterator(RowBatch* parent, int row_idx, int limit = -1) :
         num_tuples_per_row_(parent->num_tuples_per_row_),
-        row_(parent->tuple_ptrs_ + row_idx * num_tuples_per_row_),
-        row_batch_end_(parent->tuple_ptrs_ + parent->num_rows_ * num_tuples_per_row_),
+        row_(parent->tuple_ptrs_ + num_tuples_per_row_ * row_idx),
+        row_batch_end_(parent->tuple_ptrs_ + num_tuples_per_row_ *
+            (limit == -1 ? parent->num_rows_ :
+                           std::min<int>(row_idx + limit, parent->num_rows_))),
         parent_(parent) {
       DCHECK_GE(row_idx, 0);
+      DCHECK_GT(num_tuples_per_row_, 0);
       /// We allow empty row batches with num_rows_ == capacity_ == 0.
       /// That's why we cannot call GetRow() above to initialize 'row_'.
       DCHECK_LE(row_idx, parent->capacity_);
@@ -397,12 +402,17 @@ class RowBatch {
 
 }
 
-/// Macro for iterating through '_row_batch', starting at '_start_row_idx'.
+/// Macros for iterating through '_row_batch', starting at '_start_row_idx'.
 /// '_row_batch' is the row batch to iterate through.
 /// '_start_row_idx' is the starting row index.
 /// '_iter' is the iterator.
+/// '_limit' is the max number of rows to iterate over.
 #define FOREACH_ROW(_row_batch, _start_row_idx, _iter)                  \
     for (RowBatch::Iterator _iter(_row_batch, _start_row_idx);          \
          !_iter.AtEnd(); _iter.Next())
 
+#define FOREACH_ROW_LIMIT(_row_batch, _start_row_idx, _limit, _iter)    \
+    for (RowBatch::Iterator _iter(_row_batch, _start_row_idx, _limit);  \
+         !_iter.AtEnd(); _iter.Next())
+
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 99b227c..39fc7cd 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -78,6 +78,7 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si
       block_mgr_parent_tracker_.get(), (*runtime_state)->runtime_profile(),
       tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
   (*runtime_state)->set_block_mgr(mgr);
+  (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
 
   query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state));
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 274776c..d12ce1f 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -390,6 +390,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
             iequals(value, "true") || iequals(value, "1"));
         break;
       }
+      case TImpalaQueryOptions::PREFETCH_MODE: {
+        if (iequals(value, "NONE") || iequals(value, "0")) {
+          query_options->__set_prefetch_mode(TPrefetchMode::NONE);
+        } else if (iequals(value, "HT_BUCKET") || iequals(value, "1")) {
+          query_options->__set_prefetch_mode(TPrefetchMode::HT_BUCKET);
+        } else {
+          return Status(Substitute("Invalid prefetch mode '$0'. Valid modes are "
+              "NONE(0) or HT_BUCKET(1)", value));
+        }
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index fb24530..60c122d 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE + 1);\
+      TImpalaQueryOptions::PREFETCH_MODE + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -78,7 +78,8 @@ class TQueryOptions;
   QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
   QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
   QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
-  QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE);
+  QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\
+  QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE);
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.
 void TQueryOptionsToMap(const TQueryOptions& query_options,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 3e5993b..10ce43b 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -84,6 +84,9 @@ class FunctionContextImpl {
   /// Frees all allocations returned by AllocateLocal().
   void FreeLocalAllocations();
 
+  /// Returns true if there are any allocations returned by AllocateLocal().
+  bool HasLocalAllocations() const { return !local_allocations_.empty(); }
+
   /// Sets constant_args_. The AnyVal* values are owned by the caller.
   void SetConstantArgs(const std::vector<impala_udf::AnyVal*>& constant_args);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 74bef28..ada1a20 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -193,6 +193,9 @@ struct TQueryOptions {
 
   // Maximum runtime filter size, in bytes
   47: optional i32 runtime_filter_max_size = 16777216
+
+  // Prefetching behavior during hash tables' building and probing.
+  48: optional Types.TPrefetchMode prefetch_mode = Types.TPrefetchMode.HT_BUCKET
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 9421ec4..29ae4cc 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -219,7 +219,10 @@ enum TImpalaQueryOptions {
   RUNTIME_FILTER_MAX_SIZE,
 
   // Minimum runtime filter size, in bytes.
-  RUNTIME_FILTER_MIN_SIZE
+  RUNTIME_FILTER_MIN_SIZE,
+
+  // Prefetching behavior during hash tables' building and probing.
+  PREFETCH_MODE
 }
 
 // The summary of an insert.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 85a14dc..24f4524 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -120,6 +120,14 @@ enum TRuntimeFilterMode {
   GLOBAL
 }
 
+enum TPrefetchMode {
+  // No prefetching at all.
+  NONE,
+
+  // Prefetch the hash table buckets.
+  HT_BUCKET
+}
+
 // A TNetworkAddress is the standard host, port representation of a
 // network address. The hostname field must be resolvable to an IPv4
 // address.