You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/17 17:09:13 UTC

[4/4] incubator-impala git commit: IMPALA-3286: prefetching for PartitionedAggregationNode

IMPALA-3286: prefetching for PartitionedAggregationNode

This patch builds on top of the prefetching infrastructure to add
prefetching to PartitionedAggregationNode. Input batches are evaluated
in prefetch groups and hash table buckets are prefetched if the
prefetch_mode query option is set to HT_BUCKET.

We avoid some pointer indirections on the critical path by caching hash
tables in a 'hash_tbls_' array.

There is also a bit of cleanup to directly instantiate the templated
ProcessBatch() method to remove the ProcessBatch_true() and
ProcessBatch_false() hack, and also to separate out
ProcessBatchNoGrouping() so that it doesn't have to have the same
argument list as ProcessBatch().

Co-author: Michael Ho <kw...@cloudera.com>

Change-Id: I7726454efb416d61080c4e11db0ee7ada18c149b
Reviewed-on: http://gerrit.cloudera.org:8080/3070
Reviewed-by: Michael Ho <kw...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9f4276ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9f4276ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9f4276ee

Branch: refs/heads/master
Commit: 9f4276eea80a2e162a3045cefcc38f936df271d4
Parents: 9172f4b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon May 16 13:58:13 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue May 17 10:09:06 2016 -0700

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py          |  12 +-
 be/src/exec/hash-table.cc                      |   9 +-
 be/src/exec/partitioned-aggregation-node-ir.cc | 152 +++++++++++++-------
 be/src/exec/partitioned-aggregation-node.cc    |  99 ++++++++-----
 be/src/exec/partitioned-aggregation-node.h     |  92 +++++++-----
 5 files changed, 230 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 71fb1a7..5cc992c 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -40,10 +40,14 @@ options, args = parser.parse_args()
 ir_functions = [
   ["AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING", "ProcessRowBatchWithGrouping"],
   ["AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING", "ProcessRowBatchNoGrouping"],
-  ["PART_AGG_NODE_PROCESS_BATCH_TRUE", "ProcessBatch_true"],
-  ["PART_AGG_NODE_PROCESS_BATCH_FALSE", "ProcessBatch_false"],
-  ["PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING", "ProcessBatchNoGrouping"],
-  ["PART_AGG_NODE_PROCESS_BATCH_STREAMING", "ProcessBatchStreaming"],
+  ["PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED",
+      "PartitionedAggregationNode12ProcessBatchILb0"],
+  ["PART_AGG_NODE_PROCESS_BATCH_AGGREGATED",
+      "PartitionedAggregationNode12ProcessBatchILb1"],
+  ["PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING",
+      "PartitionedAggregationNode22ProcessBatchNoGrouping"],
+  ["PART_AGG_NODE_PROCESS_BATCH_STREAMING",
+      "PartitionedAggregationNode21ProcessBatchStreaming"],
   ["AVG_UPDATE_BIGINT", "9AvgUpdateIN10impala_udf9BigIntVal"],
   ["AVG_UPDATE_DOUBLE", "9AvgUpdateIN10impala_udf9DoubleVal"],
   ["AVG_UPDATE_TIMESTAMP", "TimestampAvgUpdate"],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 953ddce..719b600 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -854,9 +854,12 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
   Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
 
   // Load cur_expr_values_null_ into a LLVM pointer.
-  Value* cur_expr_values_null_ptr =
-      codegen->CastPtrToLlvmPtr(buffer_ptr_type, &expr_values_cache_.cur_expr_values_null_);
-  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+  Value* cur_expr_values_null = NULL;
+  if (stores_nulls_) {
+    Value* cur_expr_values_null_ptr = codegen->CastPtrToLlvmPtr(
+        buffer_ptr_type, &expr_values_cache_.cur_expr_values_null_);
+    cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+  }
 
   // Call GetHashSeed() to get seeds_[level_]
   Function* get_hash_seed_fn =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index d575c01..d479189 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -23,8 +23,7 @@
 
 using namespace impala;
 
-Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch,
-    const HashTableCtx* ht_ctx) { // 'ht_ctx' is unused
+Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
   Tuple* output_tuple = singleton_output_tuple_;
   FOREACH_ROW(batch, 0, batch_iter) {
     UpdateTuple(&agg_fn_ctxs_[0], output_tuple, batch_iter.Get());
@@ -34,7 +33,7 @@ Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch,
 
 template<bool AGGREGATED_ROWS>
 Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
-    HashTableCtx* __restrict__ ht_ctx) {
+    TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) {
   DCHECK(!hash_partitions_.empty());
   DCHECK(!is_streaming_preagg_);
 
@@ -44,39 +43,78 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
   // TODO: Once we have a histogram with the number of rows per partition, we will have
   // accurate resize calls.
   RETURN_IF_ERROR(CheckAndResizeHashPartitions(batch->num_rows(), ht_ctx));
-  FOREACH_ROW(batch, 0, batch_iter) {
-    RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
+
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int cache_size = expr_vals_cache->capacity();
+  const int num_rows = batch->num_rows();
+  for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
+    EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, prefetch_mode, ht_ctx);
+
+    FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
+      RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
+      expr_vals_cache->NextRow();
+    }
+    DCHECK(expr_vals_cache->AtEnd());
   }
   return Status::OK();
 }
 
 template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
-    HashTableCtx* __restrict__ ht_ctx) {
-  if (AGGREGATED_ROWS) {
-    if (!ht_ctx->EvalAndHashBuild(row)) return Status::OK();
-  } else {
-    if (!ht_ctx->EvalAndHashProbe(row)) return Status::OK();
+void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
+    RowBatch* batch, int start_row_idx, TPrefetchMode::type prefetch_mode,
+    HashTableCtx* ht_ctx) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int cache_size = expr_vals_cache->capacity();
+
+  expr_vals_cache->Reset();
+  FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) {
+    TupleRow* row = batch_iter.Get();
+    bool is_null;
+    if (AGGREGATED_ROWS) {
+      is_null = !ht_ctx->EvalAndHashBuild(row);
+    } else {
+      is_null = !ht_ctx->EvalAndHashProbe(row);
+    }
+    // Hoist lookups out of non-null branch to speed up non-null case.
+    const uint32_t hash = expr_vals_cache->ExprValuesHash();
+    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+    HashTable* hash_tbl = GetHashTable(partition_idx);
+    if (is_null) {
+      expr_vals_cache->SetRowNull();
+    } else if (prefetch_mode != TPrefetchMode::NONE) {
+      if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<false>(hash);
+    }
+    expr_vals_cache->NextRow();
   }
 
-  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  expr_vals_cache->ResetForRead();
+}
+
+template<bool AGGREGATED_ROWS>
+Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
+    HashTableCtx* __restrict__ ht_ctx) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  // Hoist lookups out of non-null branch to speed up non-null case.
+  const uint32_t hash = expr_vals_cache->ExprValuesHash();
+  const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+  if (expr_vals_cache->IsRowNull()) return Status::OK();
   // To process this row, we first see if it can be aggregated or inserted into this
   // partition's hash table. If we need to insert it and that fails, due to OOM, we
   // spill the partition. The partition to spill is not necessarily dst_partition,
   // so we can try again to insert the row.
-  Partition* dst_partition = hash_partitions_[hash >> (32 - NUM_PARTITIONING_BITS)];
-  if (dst_partition->is_spilled()) {
+  HashTable* hash_tbl = GetHashTable(partition_idx);
+  Partition* dst_partition = hash_partitions_[partition_idx];
+  DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL);
+  if (hash_tbl == NULL) {
     // This partition is already spilled, just append the row.
     return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row);
   }
 
-  HashTable* ht = dst_partition->hash_tbl.get();
-  DCHECK(ht != NULL);
   DCHECK(dst_partition->aggregated_row_stream->is_pinned());
   bool found;
   // Find the appropriate bucket in the hash table. There will always be a free
   // bucket because we checked the size above.
-  HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, &found);
+  HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
   DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
   if (AGGREGATED_ROWS) {
     // If the row is already an aggregate row, it cannot match anything in the
@@ -130,50 +168,47 @@ Status PartitionedAggregationNode::AppendSpilledRow(Partition* __restrict__ part
   return AppendSpilledRow(stream, row);
 }
 
-Status PartitionedAggregationNode::ProcessBatch_false(
-    RowBatch* batch, HashTableCtx* __restrict__ ht_ctx) {
-  return ProcessBatch<false>(batch, ht_ctx);
-}
-
-Status PartitionedAggregationNode::ProcessBatch_true(
-    RowBatch* batch, HashTableCtx* __restrict__ ht_ctx) {
-  return ProcessBatch<true>(batch, ht_ctx);
-}
-
 Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
-    RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx,
-    int remaining_capacity[PARTITION_FANOUT]) {
+    TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
+    HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
   DCHECK(is_streaming_preagg_);
   DCHECK_EQ(out_batch->num_rows(), 0);
   DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
 
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
-  FOREACH_ROW(in_batch, 0, in_batch_iter) {
-    TupleRow* in_row = in_batch_iter.Get();
-    if (!ht_ctx->EvalAndHashProbe(in_row)) continue;
-    uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
-    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-
-    if (TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], in_row, hash,
-            &remaining_capacity[partition_idx], &process_batch_status_)) {
-      continue;
-    }
-    RETURN_IF_ERROR(process_batch_status_);
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int num_rows = in_batch->num_rows();
+  const int cache_size = expr_vals_cache->capacity();
+  for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
+    EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, ht_ctx);
 
-    // Tuple is not going into hash table, add it to the output batch.
-    Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_,
-        out_batch->tuple_data_pool(), &process_batch_status_);
-    if (UNLIKELY(intermediate_tuple == NULL)) {
-      DCHECK(!process_batch_status_.ok());
-      return process_batch_status_;
+    FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
+      // Hoist lookups out of non-null branch to speed up non-null case.
+      TupleRow* in_row = in_batch_iter.Get();
+      const uint32_t hash = expr_vals_cache->ExprValuesHash();
+      const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+      if (!expr_vals_cache->IsRowNull() &&
+          !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
+            GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx],
+            &process_batch_status_)) {
+        RETURN_IF_ERROR(process_batch_status_);
+        // Tuple is not going into hash table, add it to the output batch.
+        Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_,
+            out_batch->tuple_data_pool(), &process_batch_status_);
+        if (UNLIKELY(intermediate_tuple == NULL)) {
+          DCHECK(!process_batch_status_.ok());
+          return process_batch_status_;
+        }
+        UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row, false);
+        out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
+        out_batch_iterator.Next();
+        out_batch->CommitLastRow();
+      }
+      DCHECK(process_batch_status_.ok());
+      expr_vals_cache->NextRow();
     }
-    UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row, false);
-
-    out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
-    out_batch_iterator.Next();
-    out_batch->CommitLastRow();
+    DCHECK(expr_vals_cache->AtEnd());
   }
-
   if (needs_serialize) {
     FOREACH_ROW(out_batch, 0, out_batch_iter) {
       AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs_,
@@ -186,13 +221,14 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
 
 bool PartitionedAggregationNode::TryAddToHashTable(
     HashTableCtx* __restrict__ ht_ctx, Partition* __restrict__ partition,
-    TupleRow* __restrict__ in_row, uint32_t hash, int* __restrict__ remaining_capacity,
-    Status* status) {
+    HashTable* __restrict__ hash_tbl, TupleRow* __restrict__ in_row,
+    uint32_t hash, int* __restrict__ remaining_capacity, Status* status) {
   DCHECK(remaining_capacity != NULL);
+  DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
   DCHECK_GE(*remaining_capacity, 0);
   bool found;
   // This is called from ProcessBatchStreaming() so the rows are not aggregated.
-  HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, &found);
+  HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
   Tuple* intermediate_tuple;
   if (found) {
     intermediate_tuple = it.GetTuple();
@@ -214,3 +250,9 @@ bool PartitionedAggregationNode::TryAddToHashTable(
   UpdateTuple(&partition->agg_fn_ctxs[0], intermediate_tuple, in_row, false);
   return true;
 }
+
+// Instantiate required templates.
+template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
+    TPrefetchMode::type, HashTableCtx*);
+template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
+    TPrefetchMode::type, HashTableCtx*);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 2cde059..0a6a811 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -109,7 +109,8 @@ PartitionedAggregationNode::PartitionedAggregationNode(
     needs_serialize_(false),
     block_mgr_client_(NULL),
     output_partition_(NULL),
-    process_row_batch_fn_(NULL),
+    process_batch_no_grouping_fn_(NULL),
+    process_batch_fn_(NULL),
     process_batch_streaming_fn_(NULL),
     build_timer_(NULL),
     ht_resize_timer_(NULL),
@@ -274,25 +275,9 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
   bool codegen_enabled = false;
   Status codegen_status;
   if (state->codegen_enabled()) {
-    LlvmCodeGen* codegen;
-    RETURN_IF_ERROR(state->GetCodegen(&codegen));
-    if (is_streaming_preagg_) {
-      Function* codegen_process_batch_streaming_fn;
-      codegen_status = CodegenProcessBatchStreaming(&codegen_process_batch_streaming_fn);
-      if (codegen_status.ok()) {
-        codegen->AddFunctionToJit(codegen_process_batch_streaming_fn,
-            reinterpret_cast<void**>(&process_batch_streaming_fn_));
-        codegen_enabled = true;
-      }
-    } else {
-      Function* codegen_process_row_batch_fn;
-      codegen_status = CodegenProcessBatch(&codegen_process_row_batch_fn);
-      if (codegen_status.ok()) {
-        codegen->AddFunctionToJit(codegen_process_row_batch_fn,
-            reinterpret_cast<void**>(&process_row_batch_fn_));
-        codegen_enabled = true;
-      }
-    }
+    codegen_status = is_streaming_preagg_ ? CodegenProcessBatchStreaming()
+                                          : CodegenProcessBatch();
+    codegen_enabled = codegen_status.ok();
   }
   AddCodegenExecOption(codegen_enabled, codegen_status);
   return Status::OK();
@@ -330,14 +315,21 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
       }
     }
 
+    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
     SCOPED_TIMER(build_timer_);
-    if (process_row_batch_fn_ != NULL) {
-      RETURN_IF_ERROR(process_row_batch_fn_(this, &batch, ht_ctx_.get()));
-    } else if (grouping_expr_ctxs_.empty()) {
-      RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch));
+    if (grouping_expr_ctxs_.empty()) {
+      if (process_batch_no_grouping_fn_ != NULL) {
+        RETURN_IF_ERROR(process_batch_no_grouping_fn_(this, &batch));
+      } else {
+        RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch));
+      }
     } else {
       // There is grouping, so we will do partitioned aggregation.
-      RETURN_IF_ERROR(ProcessBatch<false>(&batch, ht_ctx_.get()));
+      if (process_batch_fn_ != NULL) {
+        RETURN_IF_ERROR(process_batch_fn_(this, &batch, prefetch_mode, ht_ctx_.get()));
+      } else {
+        RETURN_IF_ERROR(ProcessBatch<false>(&batch, prefetch_mode, ht_ctx_.get()));
+      }
     }
     batch.Reset();
   } while (!eos);
@@ -542,8 +534,9 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
     int remaining_capacity[PARTITION_FANOUT];
     bool ht_needs_expansion = false;
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
-      DCHECK(hash_partitions_[i]->hash_tbl != NULL);
-      remaining_capacity[i] = hash_partitions_[i]->hash_tbl->NumInsertsBeforeResize();
+      HashTable* hash_tbl = GetHashTable(i);
+      DCHECK(hash_tbl != NULL);
+      remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
       ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
     }
 
@@ -554,7 +547,7 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
     // should always use the remaining space in the hash table to avoid wasting memory.
     if (ht_needs_expansion && ShouldExpandPreaggHashTables()) {
       for (int i = 0; i < PARTITION_FANOUT; ++i) {
-        HashTable* ht = hash_partitions_[i]->hash_tbl.get();
+        HashTable* ht = GetHashTable(i);
         if (remaining_capacity[i] < child_batch_->num_rows()) {
           SCOPED_TIMER(ht_resize_timer_);
           if (ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get())) {
@@ -564,12 +557,13 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
       }
     }
 
+    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
     if (process_batch_streaming_fn_ != NULL) {
-      RETURN_IF_ERROR(process_batch_streaming_fn_(this, needs_serialize_,
+      RETURN_IF_ERROR(process_batch_streaming_fn_(this, needs_serialize_, prefetch_mode,
           child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
     } else {
-      RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_, child_batch_.get(),
-            out_batch, ht_ctx_.get(), remaining_capacity ));
+      RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_, prefetch_mode,
+          child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity ));
     }
 
     child_batch_->Reset(); // All rows from child_batch_ were processed.
@@ -1145,6 +1139,7 @@ Status PartitionedAggregationNode::CreateHashPartitions(int level) {
     DCHECK(new_partition != NULL);
     hash_partitions_.push_back(partition_pool_->Add(new_partition));
     RETURN_IF_ERROR(new_partition->InitStreams());
+    hash_tbls_[i] = NULL;
   }
   if (!is_streaming_preagg_) {
     DCHECK_GT(state_->block_mgr()->num_reserved_buffers_remaining(block_mgr_client_), 0);
@@ -1166,6 +1161,7 @@ Status PartitionedAggregationNode::CreateHashPartitions(int level) {
       }
       RETURN_IF_ERROR(hash_partitions_[i]->Spill());
     }
+    hash_tbls_[i] = hash_partitions_[i]->hash_tbl.get();
   }
 
   COUNTER_ADD(partitions_created_, hash_partitions_.size());
@@ -1293,12 +1289,14 @@ Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre
       RETURN_IF_ERROR(SpillPartition());
     }
 
+    TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
     bool eos = false;
     RowBatch batch(AGGREGATED_ROWS ? *intermediate_row_desc_ : children_[0]->row_desc(),
                    state_->batch_size(), mem_tracker());
     do {
       RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
-      RETURN_IF_ERROR(ProcessBatch<AGGREGATED_ROWS>(&batch, ht_ctx_.get()));
+      RETURN_IF_ERROR(
+          ProcessBatch<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get()));
       RETURN_IF_ERROR(state_->GetQueryStatus());
       FreeLocalAllocations();
       batch.Reset();
@@ -1331,6 +1329,7 @@ Status PartitionedAggregationNode::SpillPartition() {
     return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
   }
 
+  hash_tbls_[partition_idx] = NULL;
   return hash_partitions_[partition_idx]->Spill();
 }
 
@@ -1398,6 +1397,7 @@ void PartitionedAggregationNode::ClosePartitions() {
   aggregated_partitions_.clear();
   spilled_partitions_.clear();
   hash_partitions_.clear();
+  memset(hash_tbls_, 0, sizeof(hash_tbls_));
   partition_pool_->Clear();
 }
 
@@ -1792,7 +1792,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CodegenProcessBatch(Function** fn) {
+Status PartitionedAggregationNode::CodegenProcessBatch() {
   LlvmCodeGen* codegen;
   RETURN_IF_ERROR(state_->GetCodegen(&codegen));
   SCOPED_TIMER(codegen->codegen_timer());
@@ -1802,7 +1802,7 @@ Status PartitionedAggregationNode::CodegenProcessBatch(Function** fn) {
 
   // Get the cross compiled update row batch function
   IRFunction::Type ir_fn = (!grouping_expr_ctxs_.empty() ?
-      IRFunction::PART_AGG_NODE_PROCESS_BATCH_FALSE :
+      IRFunction::PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED :
       IRFunction::PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING);
   Function* process_batch_fn = codegen->GetFunction(ir_fn, true);
   DCHECK(process_batch_fn != NULL);
@@ -1810,6 +1810,13 @@ Status PartitionedAggregationNode::CodegenProcessBatch(Function** fn) {
   int replaced;
   if (!grouping_expr_ctxs_.empty()) {
     // Codegen for grouping using hash table
+
+    // Replace prefetch_mode with constant so branches can be optimised out.
+    TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
+    Value* prefetch_mode_arg = codegen->GetArgument(process_batch_fn, 3);
+    prefetch_mode_arg->replaceAllUsesWith(
+        ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
     // The codegen'd ProcessBatch function is only used in Open() with level_ = 0,
     // so don't use murmur hash
     Function* hash_fn;
@@ -1838,15 +1845,20 @@ Status PartitionedAggregationNode::CodegenProcessBatch(Function** fn) {
 
   replaced = codegen->ReplaceCallSites(process_batch_fn, update_tuple_fn, "UpdateTuple");
   DCHECK_GE(replaced, 1);
-  *fn = codegen->FinalizeFunction(process_batch_fn);
-  if (*fn == NULL) {
+  process_batch_fn = codegen->FinalizeFunction(process_batch_fn);
+  if (process_batch_fn == NULL) {
     return Status("PartitionedAggregationNode::CodegenProcessBatch(): codegen'd "
         "ProcessBatch() function failed verification, see log");
   }
+
+  void **codegened_fn_ptr = grouping_expr_ctxs_.empty() ?
+      reinterpret_cast<void**>(&process_batch_no_grouping_fn_) :
+      reinterpret_cast<void**>(&process_batch_fn_);
+  codegen->AddFunctionToJit(process_batch_fn, codegened_fn_ptr);
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CodegenProcessBatchStreaming(Function** fn) {
+Status PartitionedAggregationNode::CodegenProcessBatchStreaming() {
   DCHECK(is_streaming_preagg_);
   LlvmCodeGen* codegen;
   RETURN_IF_ERROR(state_->GetCodegen(&codegen));
@@ -1861,6 +1873,12 @@ Status PartitionedAggregationNode::CodegenProcessBatchStreaming(Function** fn) {
   needs_serialize_arg->replaceAllUsesWith(
       ConstantInt::get(Type::getInt1Ty(codegen->context()), needs_serialize_));
 
+  // Replace prefetch_mode with constant so branches can be optimised out.
+  TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
+  Value* prefetch_mode_arg = codegen->GetArgument(process_batch_streaming_fn, 3);
+  prefetch_mode_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
   Function* update_tuple_fn;
   RETURN_IF_ERROR(CodegenUpdateTuple(&update_tuple_fn));
 
@@ -1893,11 +1911,14 @@ Status PartitionedAggregationNode::CodegenProcessBatchStreaming(Function** fn) {
   DCHECK_EQ(replaced, 1);
 
   DCHECK(process_batch_streaming_fn != NULL);
-  *fn = codegen->FinalizeFunction(process_batch_streaming_fn);
-  if (*fn == NULL) {
+  process_batch_streaming_fn = codegen->FinalizeFunction(process_batch_streaming_fn);
+  if (process_batch_streaming_fn == NULL) {
     return Status("PartitionedAggregationNode::CodegenProcessBatchStreaming(): codegen'd "
         "ProcessBatchStreaming() function failed verification, see log");
   }
+
+  codegen->AddFunctionToJit(process_batch_streaming_fn,
+      reinterpret_cast<void**>(&process_batch_streaming_fn_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index ab560c5..73976c2 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -228,13 +228,17 @@ class PartitionedAggregationNode : public ExecNode {
   Partition* output_partition_;
   HashTable::Iterator output_iterator_;
 
-  typedef Status (*ProcessRowBatchFn)(
-      PartitionedAggregationNode*, RowBatch*, HashTableCtx*);
-  /// Jitted ProcessRowBatch function pointer.  Null if codegen is disabled.
-  ProcessRowBatchFn process_row_batch_fn_;
+  typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, RowBatch*);
+  /// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is disabled.
+  ProcessBatchNoGroupingFn process_batch_no_grouping_fn_;
 
-  typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool, RowBatch*,
-      RowBatch*, HashTableCtx*, int[PARTITION_FANOUT]);
+  typedef Status (*ProcessBatchFn)(
+      PartitionedAggregationNode*, RowBatch*, TPrefetchMode::type, HashTableCtx*);
+  /// Jitted ProcessBatch function pointer. Null if codegen is disabled.
+  ProcessBatchFn process_batch_fn_;
+
+  typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool,
+      TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, int[PARTITION_FANOUT]);
   /// Jitted ProcessBatchStreaming function pointer.  Null if codegen is disabled.
   ProcessBatchStreamingFn process_batch_streaming_fn_;
 
@@ -312,6 +316,9 @@ class PartitionedAggregationNode : public ExecNode {
   /// Current partitions we are partitioning into.
   std::vector<Partition*> hash_partitions_;
 
+  /// Cache for hash tables in 'hash_partitions_'.
+  HashTable* hash_tbls_[PARTITION_FANOUT];
+
   /// All partitions that have been spilled and need further processing.
   std::list<Partition*> spilled_partitions_;
 
@@ -390,6 +397,13 @@ class PartitionedAggregationNode : public ExecNode {
   /// a temporary buffer.
   boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
 
+  /// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
+  HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
+    HashTable* ht = hash_tbls_[partition_idx];
+    DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get());
+    return ht;
+  }
+
   /// Materializes 'row_batch' in either grouping or non-grouping case.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
 
@@ -465,19 +479,31 @@ class PartitionedAggregationNode : public ExecNode {
                         Tuple* tuple, MemPool* pool);
 
   /// Do the aggregation for all tuple rows in the batch when there is no grouping.
-  /// The HashTableCtx argument is unused, but included so the signature matches that of
-  /// ProcessBatch() for codegen. This function is replaced by codegen.
-  Status ProcessBatchNoGrouping(RowBatch* batch, const HashTableCtx* ht_ctx = NULL);
+  /// This function is replaced by codegen.
+  Status ProcessBatchNoGrouping(RowBatch* batch);
 
   /// Processes a batch of rows. This is the core function of the algorithm. We partition
   /// the rows into hash_partitions_, spilling as necessary.
   /// If AGGREGATED_ROWS is true, it means that the rows in the batch are already
   /// pre-aggregated.
+  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
+  ///     hash table buckets will be prefetched based on the hash values computed. Note
+  ///     that 'prefetch_mode' will be substituted with constants during codegen time.
   //
-  /// This function is replaced by codegen. It's inlined into ProcessBatch_true/false in
-  /// the IR module. We pass in ht_ctx_.get() as an argument for performance.
+  /// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for
+  /// performance.
+  template<bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch,
+      TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx);
+
+  /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in
+  /// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on
+  /// the capacity of the cache. 'prefetch_mode' specifies the prefetching mode in use.
+  /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes will be
+  /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant.
   template<bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, HashTableCtx* ht_ctx);
+  void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx,
+      TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx);
 
   /// This function processes each individual row in ProcessBatch(). Must be inlined into
   /// ProcessBatch for codegen to substitute function calls with codegen'd versions.
@@ -532,25 +558,28 @@ class PartitionedAggregationNode : public ExecNode {
   /// store all of the rows in 'in_batch'.
   /// 'needs_serialize' is an argument so that codegen can replace it with a constant,
   ///     rather than using the member variable 'needs_serialize_'.
+  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
+  ///     hash table buckets will be prefetched based on the hash values computed. Note
+  ///     that 'prefetch_mode' will be substituted with constants during codegen time.
   /// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the number of
   ///     additional rows that can be added to the hash table per partition. It is updated
   ///     by ProcessBatchStreaming() when it inserts new rows.
   /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser.
-  Status ProcessBatchStreaming(bool needs_serialize, RowBatch* in_batch,
-      RowBatch* out_batch, HashTableCtx* ht_ctx,
+  Status ProcessBatchStreaming(bool needs_serialize, TPrefetchMode::type prefetch_mode,
+      RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx,
       int remaining_capacity[PARTITION_FANOUT]);
 
-  /// Tries to add intermediate to the hash table of 'partition' for streaming aggregation.
-  /// The input row must have been evaluated with 'ht_ctx', with 'hash' set to the
-  /// corresponding hash. If the tuple already exists in the hash table, update the tuple
-  /// and return true. Otherwise try to create a new entry in the hash table, returning
-  /// true if successful or false if the table is full. 'remaining_capacity' keeps track
-  /// of how many more entries can be added to the hash table so we can avoid retrying
-  /// inserts. It is decremented if an insert succeeds and set to zero if an insert
-  /// fails. If an error occurs, returns false and sets 'status'.
+  /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming
+  /// aggregation. The input row must have been evaluated with 'ht_ctx', with 'hash' set
+  /// to the corresponding hash. If the tuple already exists in the hash table, update
+  /// the tuple and return true. Otherwise try to create a new entry in the hash table,
+  /// returning true if successful or false if the table is full. 'remaining_capacity'
+  /// keeps track of how many more entries can be added to the hash table so we can avoid
+  /// retrying inserts. It is decremented if an insert succeeds and set to zero if an
+  /// insert fails. If an error occurs, returns false and sets 'status'.
   bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx,
-      Partition* partition, TupleRow* in_row, uint32_t hash, int* remaining_capacity,
-      Status* status);
+      Partition* partition, HashTable* hash_tbl, TupleRow* in_row, uint32_t hash,
+      int* remaining_capacity, Status* status);
 
   /// Initializes hash_partitions_. 'level' is the level for the partitions to create.
   /// Also sets ht_ctx_'s level to 'level'.
@@ -601,18 +630,15 @@ class PartitionedAggregationNode : public ExecNode {
   /// Codegen the non-streaming process row batch loop. The loop has already been
   /// compiled to IR and loaded into the codegen object. UpdateAggTuple has also been
   /// codegen'd to IR. This function will modify the loop subsituting the statically
-  /// compiled functions with codegen'd ones.
+  /// compiled functions with codegen'd ones. 'process_batch_fn_' or
+  /// 'process_batch_no_grouping_fn_' will be updated with the codegened function
+  /// depending on whether this is a grouping or non-grouping aggregation.
   /// Assumes AGGREGATED_ROWS = false.
-  Status CodegenProcessBatch(llvm::Function** fn);
+  Status CodegenProcessBatch();
 
   /// Codegen the materialization loop for streaming preaggregations.
-  Status CodegenProcessBatchStreaming(llvm::Function** fn);
-
-  /// Functions to instantiate templated versions of ProcessBatch().
-  /// The xcompiled versions of these functions are used in CodegenProcessBatch().
-  /// TODO: is there a better way to do this?
-  Status ProcessBatch_false(RowBatch* batch, HashTableCtx* ht_ctx);
-  Status ProcessBatch_true(RowBatch* batch, HashTableCtx* ht_ctx);
+  /// 'process_batch_streaming_fn_' will be updated with the codegened function.
+  Status CodegenProcessBatchStreaming();
 
   /// We need two buffers per partition, one for the aggregated stream and one
   /// for the unaggregated stream. We need an additional buffer to read the stream