You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/02 22:18:57 UTC

[5/6] impala git commit: IMPALA-110 (part 2): Refactor PartitionedAggregationNode

IMPALA-110 (part 2): Refactor PartitionedAggregationNode

This patch refactors PartitionedAggregationNode in preparation for
supporting multiple distinct operators in a query.

The primary goal of the refactor is to separate out the core
aggregation functionality into a new type of object called an
Aggregator. For now, each aggregation ExecNode will contain a single
Aggregator. Then, future patches will extend the aggregation ExecNode
to support taking a single input and processing it with multiple
Aggregators, allowing us to support more exotic combinations of
aggregate functions and groupings.

Specifically, this patch splits PartitionedAggregationNode into five
new classes:
- Aggregator: a superclass containing the functionality that's shared
  between GroupingAggregator and NonGroupingAggregator.
- GroupingAggregator: this class contains the bulk of the interesting
  aggregation code, including everything related to creating and
  updating partitions and hash tables, spilling, etc.
- NonGroupingAggregator: this class handles the case of aggregations
  that don't have grouping exprs. Since these aggregations always
  result in just a single output row, the functionality here is
  relatively simple (eg. no spilling or streaming).
- StreamingAggregationNode: this node performs a streaming
  preaggregation, where the input is retrieved from the child during
  GetNext() and passed to the GroupingAggregator (non-grouping do not
  support streaming) Eventually, we'll support a list of
  GroupingAggregators.
- AggregationNode: this node performs a final aggregation, where the
  input is retrieved from the child during Open() and passed to the
  Aggregator. Currently the Aggregator can be either grouping or
  non-grouping. Eventually we'll support a list of GroupingAggregator
  and/or a single NonGroupingAggregator.

Testing:
- Passed a full exhaustive run.

Change-Id: I9e7bb583f54aa4add3738bde7f57cf3511ac567e
Reviewed-on: http://gerrit.cloudera.org:8080/10394
Reviewed-by: Thomas Marshall <th...@cmu.edu>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/010321d4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/010321d4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/010321d4

Branch: refs/heads/master
Commit: 010321d404a9ad5b7338eeb24a4e9ac576cf4dff
Parents: dde9308
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Thu May 10 19:39:57 2018 +0000
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Jul 2 21:07:35 2018 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py          |   14 +-
 be/src/codegen/impala-ir.cc                    |    3 +-
 be/src/exec/CMakeLists.txt                     |   10 +-
 be/src/exec/aggregation-node.cc                |  132 ++
 be/src/exec/aggregation-node.h                 |   60 +
 be/src/exec/aggregator.cc                      |  609 ++++++
 be/src/exec/aggregator.h                       |  211 ++
 be/src/exec/exec-node.cc                       |    9 +-
 be/src/exec/exec-node.h                        |    9 +-
 be/src/exec/grouping-aggregator-ir.cc          |  241 +++
 be/src/exec/grouping-aggregator-partition.cc   |  218 +++
 be/src/exec/grouping-aggregator.cc             | 1098 +++++++++++
 be/src/exec/grouping-aggregator.h              |  624 ++++++
 be/src/exec/non-grouping-aggregator-ir.cc      |   30 +
 be/src/exec/non-grouping-aggregator.cc         |  174 ++
 be/src/exec/non-grouping-aggregator.h          |  111 ++
 be/src/exec/partitioned-aggregation-node-ir.cc |  253 ---
 be/src/exec/partitioned-aggregation-node.cc    | 1955 -------------------
 be/src/exec/partitioned-aggregation-node.h     |  734 -------
 be/src/exec/streaming-aggregation-node.cc      |  153 ++
 be/src/exec/streaming-aggregation-node.h       |   85 +
 21 files changed, 3774 insertions(+), 2959 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index dd2df9e..99a97ae 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -47,14 +47,12 @@ ir_functions = [
    "_ZNK6impala14AggFnEvaluator11input_evalsEv"],
   ["AGG_FN_EVALUATOR_AGG_FN_CTX",
    "_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv"],
-  ["PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED",
-   "_ZN6impala26PartitionedAggregationNode12ProcessBatchILb0EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"],
-  ["PART_AGG_NODE_PROCESS_BATCH_AGGREGATED",
-   "_ZN6impala26PartitionedAggregationNode12ProcessBatchILb1EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"],
-  ["PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING",
-   "_ZN6impala26PartitionedAggregationNode22ProcessBatchNoGroupingEPNS_8RowBatchE"],
-  ["PART_AGG_NODE_PROCESS_BATCH_STREAMING",
-   "_ZN6impala26PartitionedAggregationNode21ProcessBatchStreamingEbNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"],
+  ["GROUPING_AGG_ADD_BATCH_IMPL",
+   "_ZN6impala18GroupingAggregator12AddBatchImplILb0EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"],
+  ["NON_GROUPING_AGG_ADD_BATCH_IMPL",
+   "_ZN6impala21NonGroupingAggregator12AddBatchImplEPNS_8RowBatchE"],
+  ["GROUPING_AGG_ADD_BATCH_STREAMING_IMPL",
+   "_ZN6impala18GroupingAggregator21AddBatchStreamingImplEbNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"],
   ["AVG_UPDATE_BIGINT",
    "_ZN6impala18AggregateFunctions9AvgUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE"],
   ["AVG_UPDATE_DOUBLE",

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 9c5b3eb..0fa4fe9 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -26,11 +26,12 @@
 #pragma clang diagnostic ignored "-Wheader-hygiene"
 
 #include "codegen/codegen-anyval-ir.cc"
+#include "exec/grouping-aggregator-ir.cc"
 #include "exec/hash-table-ir.cc"
 #include "exec/hdfs-avro-scanner-ir.cc"
 #include "exec/hdfs-parquet-scanner-ir.cc"
 #include "exec/hdfs-scanner-ir.cc"
-#include "exec/partitioned-aggregation-node-ir.cc"
+#include "exec/non-grouping-aggregator-ir.cc"
 #include "exec/partitioned-hash-join-builder-ir.cc"
 #include "exec/partitioned-hash-join-node-ir.cc"
 #include "exec/select-node-ir.cc"

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 77c6e15..1753cb0 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -25,6 +25,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
 
 add_library(Exec
+  aggregation-node.cc
+  aggregator.cc
   analytic-eval-node.cc
   base-sequence-scanner.cc
   blocking-join-node.cc
@@ -38,6 +40,9 @@ add_library(Exec
   exchange-node.cc
   external-data-source-executor.cc
   filter-context.cc
+  grouping-aggregator.cc
+  grouping-aggregator-ir.cc
+  grouping-aggregator-partition.cc
   hash-table.cc
   hbase-table-sink.cc
   hbase-table-writer.cc
@@ -66,12 +71,12 @@ add_library(Exec
   incr-stats-util.cc
   nested-loop-join-builder.cc
   nested-loop-join-node.cc
+  non-grouping-aggregator.cc
+  non-grouping-aggregator-ir.cc
   parquet-column-readers.cc
   parquet-column-stats.cc
   parquet-metadata-utils.cc
   partial-sort-node.cc
-  partitioned-aggregation-node.cc
-  partitioned-aggregation-node-ir.cc
   partitioned-hash-join-builder.cc
   partitioned-hash-join-builder-ir.cc
   partitioned-hash-join-node.cc
@@ -91,6 +96,7 @@ add_library(Exec
   select-node-ir.cc
   singular-row-src-node.cc
   sort-node.cc
+  streaming-aggregation-node.cc
   subplan-node.cc
   text-converter.cc
   topn-node.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
new file mode 100644
index 0000000..d25284d
--- /dev/null
+++ b/be/src/exec/aggregation-node.cc
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/aggregation-node.h"
+
+#include <sstream>
+
+#include "exec/grouping-aggregator.h"
+#include "exec/non-grouping-aggregator.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+AggregationNode::AggregationNode(
+    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+  : ExecNode(pool, tnode, descs) {}
+
+Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+  if (tnode.agg_node.grouping_exprs.empty()) {
+    aggregator_.reset(new NonGroupingAggregator(this, pool_, tnode, state->desc_tbl()));
+  } else {
+    aggregator_.reset(new GroupingAggregator(this, pool_, tnode, state->desc_tbl()));
+  }
+  RETURN_IF_ERROR(aggregator_->Init(tnode, state));
+  runtime_profile_->AddChild(aggregator_->runtime_profile());
+  return Status::OK();
+}
+
+Status AggregationNode::Prepare(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Prepare(state));
+  aggregator_->SetDebugOptions(debug_options_);
+  RETURN_IF_ERROR(aggregator_->Prepare(state));
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
+  return Status::OK();
+}
+
+void AggregationNode::Codegen(RuntimeState* state) {
+  DCHECK(state->ShouldCodegen());
+  ExecNode::Codegen(state);
+  if (IsNodeCodegenDisabled()) return;
+  aggregator_->Codegen(state);
+}
+
+Status AggregationNode::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  // Open the child before consuming resources in this node.
+  RETURN_IF_ERROR(child(0)->Open(state));
+  RETURN_IF_ERROR(ExecNode::Open(state));
+
+  RETURN_IF_ERROR(aggregator_->Open(state));
+
+  RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+  // Read all the rows from the child and process them.
+  bool eos = false;
+  do {
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(QueryMaintenance(state));
+    RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
+    RETURN_IF_ERROR(aggregator_->AddBatch(state, &batch));
+    batch.Reset();
+  } while (!eos);
+
+  // The child can be closed at this point in most cases because we have consumed all of
+  // the input from the child and transfered ownership of the resources we need. The
+  // exception is if we are inside a subplan expecting to call Open()/GetNext() on the
+  // child again,
+  if (!IsInSubplan()) child(0)->Close(state);
+
+  RETURN_IF_ERROR(aggregator_->InputDone());
+  return Status::OK();
+}
+
+Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+
+  if (ReachedLimit()) {
+    *eos = true;
+    return Status::OK();
+  }
+
+  RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, eos));
+  num_rows_returned_ += row_batch->num_rows();
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  return Status::OK();
+}
+
+Status AggregationNode::Reset(RuntimeState* state) {
+  RETURN_IF_ERROR(aggregator_->Reset(state));
+  return ExecNode::Reset(state);
+}
+
+void AggregationNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  aggregator_->Close(state);
+  ExecNode::Close(state);
+}
+
+void AggregationNode::DebugString(int indentation_level, stringstream* out) const {
+  *out << string(indentation_level * 2, ' ');
+  *out << "AggregationNode("
+       << "aggregator=" << aggregator_->DebugString();
+  ExecNode::DebugString(indentation_level, out);
+  *out << ")";
+}
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
new file mode 100644
index 0000000..527a62d
--- /dev/null
+++ b/be/src/exec/aggregation-node.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_AGGREGATION_NODE_H
+#define IMPALA_EXEC_AGGREGATION_NODE_H
+
+#include <memory>
+
+#include "exec/aggregator.h"
+#include "exec/exec-node.h"
+
+namespace impala {
+
+class RowBatch;
+class RuntimeState;
+
+/// Node for doing partitioned hash aggregation.
+/// This node consumes the input from child(0) during Open() and then passes it to the
+/// Aggregator, which does the actual work of aggregating.
+class AggregationNode : public ExecNode {
+ public:
+  AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual void Codegen(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  virtual Status Reset(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
+
+  virtual void DebugString(int indentation_level, std::stringstream* out) const override;
+
+ private:
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// Performs the actual work of aggregating input rows.
+  std::unique_ptr<Aggregator> aggregator_;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_AGGREGATION_NODE_H

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
new file mode 100644
index 0000000..70178cc
--- /dev/null
+++ b/be/src/exec/aggregator.cc
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/aggregator.h"
+
+#include <sstream>
+
+#include "codegen/codegen-anyval.h"
+#include "codegen/llvm-codegen.h"
+#include "exec/exec-node.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "exprs/expr-value.h"
+#include "exprs/scalar-expr.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
+
+Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
+    const DescriptorTbl& descs, const std::string& name)
+  : id_(exec_node->id()),
+    pool_(pool),
+    intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
+    intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
+    output_tuple_id_(tnode.agg_node.output_tuple_id),
+    output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
+    row_desc_(*exec_node->row_desc()),
+    input_row_desc_(*exec_node->child(0)->row_desc()),
+    needs_finalize_(tnode.agg_node.need_finalize),
+    runtime_profile_(RuntimeProfile::Create(pool_, name)),
+    num_rows_returned_(0),
+    rows_returned_counter_(nullptr),
+    build_timer_(nullptr) {}
+
+Aggregator::~Aggregator() {}
+
+Status Aggregator::Init(const TPlanNode& tnode, RuntimeState* state) {
+  DCHECK(intermediate_tuple_desc_ != nullptr);
+  DCHECK(output_tuple_desc_ != nullptr);
+  DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
+
+  int j = num_grouping_exprs();
+  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
+    SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
+    SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
+    AggFn* agg_fn;
+    RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], input_row_desc_,
+        *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
+    agg_fns_.push_back(agg_fn);
+  }
+
+  RETURN_IF_ERROR(ScalarExpr::Create(tnode.conjuncts, row_desc_, state, &conjuncts_));
+  return Status::OK();
+}
+
+Status Aggregator::Prepare(RuntimeState* state) {
+  mem_tracker_.reset(new MemTracker(
+      runtime_profile_, -1, runtime_profile_->name(), state->instance_mem_tracker()));
+  expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
+  expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+
+  RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, expr_perm_pool_.get(),
+      expr_results_pool_.get(), &agg_fn_evals_));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_,
+      expr_perm_pool_.get(), expr_results_pool_.get(), &conjunct_evals_));
+  DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
+
+  rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
+  build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
+
+  return Status::OK();
+}
+
+Status Aggregator::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(conjunct_evals_, state));
+  return Status::OK();
+}
+
+void Aggregator::Close(RuntimeState* state) {
+  // Close all the agg-fn-evaluators
+  AggFnEvaluator::Close(agg_fn_evals_, state);
+  AggFn::Close(agg_fns_);
+  ScalarExprEvaluator::Close(conjunct_evals_, state);
+  ScalarExpr::Close(conjuncts_);
+
+  if (expr_perm_pool_.get() != nullptr) expr_perm_pool_->FreeAll();
+  if (expr_results_pool_.get() != nullptr) expr_results_pool_->FreeAll();
+  if (expr_mem_tracker_.get() != nullptr) expr_mem_tracker_->Close();
+  if (mem_tracker_.get() != nullptr) mem_tracker_->Close();
+}
+
+// TODO: codegen this function.
+void Aggregator::InitAggSlots(
+    const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
+  vector<SlotDescriptor*>::const_iterator slot_desc =
+      intermediate_tuple_desc_->slots().begin() + num_grouping_exprs();
+  for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) {
+    // To minimize branching on the UpdateTuple path, initialize the result value so that
+    // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for
+    // SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can
+    // just start adding to the destination value (rather than repeatedly checking the
+    // destination NULL bit. The codegen'd version of UpdateSlot() exploits this to
+    // eliminate a branch per value.
+    //
+    // For boolean and numeric types, the default values are false/0, so the nullable
+    // aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(),
+    // initialize the value to max/min possible value for the same effect.
+    AggFnEvaluator* eval = agg_fn_evals[i];
+    eval->Init(intermediate_tuple);
+
+    DCHECK(agg_fns_[i] == &(eval->agg_fn()));
+    const AggFn* agg_fn = agg_fns_[i];
+    const AggFn::AggregationOp agg_op = agg_fn->agg_op();
+    if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX)
+        && !agg_fn->intermediate_type().IsStringType()
+        && !agg_fn->intermediate_type().IsTimestampType()) {
+      ExprValue default_value;
+      void* default_value_ptr = nullptr;
+      if (agg_op == AggFn::MIN) {
+        default_value_ptr = default_value.SetToMax((*slot_desc)->type());
+      } else {
+        DCHECK_EQ(agg_op, AggFn::MAX);
+        default_value_ptr = default_value.SetToMin((*slot_desc)->type());
+      }
+      RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, nullptr);
+    }
+  }
+}
+
+void Aggregator::UpdateTuple(
+    AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, bool is_merge) noexcept {
+  DCHECK(tuple != nullptr || agg_fns_.empty());
+  for (int i = 0; i < agg_fns_.size(); ++i) {
+    if (is_merge) {
+      agg_fn_evals[i]->Merge(row->GetTuple(0), tuple);
+    } else {
+      agg_fn_evals[i]->Add(row, tuple);
+    }
+  }
+}
+
+Tuple* Aggregator::GetOutputTuple(
+    const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
+  DCHECK(tuple != nullptr || agg_fn_evals.empty()) << tuple;
+  Tuple* dst = tuple;
+  if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
+    dst = Tuple::Create(output_tuple_desc_->byte_size(), pool);
+  }
+  if (needs_finalize_) {
+    AggFnEvaluator::Finalize(agg_fn_evals, tuple, dst);
+  } else {
+    AggFnEvaluator::Serialize(agg_fn_evals, tuple);
+  }
+  // Copy grouping values from tuple to dst.
+  // TODO: Codegen this.
+  if (dst != tuple) {
+    int num_grouping_slots = num_grouping_exprs();
+    for (int i = 0; i < num_grouping_slots; ++i) {
+      SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
+      SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
+      bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
+      void* src_slot = nullptr;
+      if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
+      RawValue::Write(src_slot, dst, dst_slot_desc, nullptr);
+    }
+  }
+  return dst;
+}
+
+// IR Generation for updating a single aggregation slot. Signature is:
+// void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row)
+//
+// The IR for sum(double_col), which is constructed directly with the IRBuilder, is:
+//
+// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+//     <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #33 {
+// entry:
+//   %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
+//       @_ZNK6impala14AggFnEvaluator11input_evalsEv(
+//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+//   %0 = getelementptr %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
+//   %input_eval = load %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %0
+//   %input0 = call { i8, double } @GetSlotRef(%"class.impala::ScalarExprEvaluator"*
+//       %input_eval, %"class.impala::TupleRow"* %row)
+//   %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>,
+//       <{ double, i8 }>* %agg_tuple, i32 0, i32 0
+//   %dst_val = load double, double* %dst_slot_ptr
+//   %1 = extractvalue { i8, double } %input0, 0
+//   %is_null = trunc i8 %1 to i1
+//   br i1 %is_null, label %ret, label %not_null
+//
+// ret:                                              ; preds = %not_null, %entry
+//   ret void
+//
+// not_null:                                         ; preds = %entry
+//   %val = extractvalue { i8, double } %input0, 1
+//   %2 = fadd double %dst_val, %val
+//   %3 = bitcast <{ double, i8 }>* %agg_tuple to i8*
+//   %null_byte_ptr = getelementptr inbounds i8, i8* %3, i32 8
+//   %null_byte = load i8, i8* %null_byte_ptr
+//   %null_bit_cleared = and i8 %null_byte, -2
+//   store i8 %null_bit_cleared, i8* %null_byte_ptr
+//   store double %2, double* %dst_slot_ptr
+//   br label %ret
+// }
+//
+// The IR for ndv(timestamp_col), which uses the UDA interface, is:
+//
+// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+//     <{ [1024 x i8] }>* %agg_tuple,
+//     %"class.impala::TupleRow"* %row) #39 {
+// entry:
+//   %dst_lowered_ptr = alloca { i64, i8* }
+//   %0 = alloca { i64, i64 }
+//   %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
+//       @_ZNK6impala14AggFnEvaluator11input_evalsEv(
+//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+//   %1 = getelementptr %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
+//   %input_eval = load %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %1
+//   %input0 = call { i64, i64 } @GetSlotRef(
+//       %"class.impala::ScalarExprEvaluator"* %input_eval,
+//       %"class.impala::TupleRow"* %row)
+//   %dst_slot_ptr = getelementptr inbounds <{ [1024 x i8] }>,
+//       <{ [1024 x i8] }>* %agg_tuple, i32 0, i32 0
+//   %2 = bitcast [1024 x i8]* %dst_slot_ptr to i8*
+//   %dst = insertvalue { i64, i8* } zeroinitializer, i8* %2, 1
+//   %3 = extractvalue { i64, i8* } %dst, 0
+//   %4 = and i64 %3, 4294967295
+//   %5 = or i64 %4, 4398046511104
+//   %dst1 = insertvalue { i64, i8* } %dst, i64 %5, 0
+//   %agg_fn_ctx = call %"class.impala_udf::FunctionContext"*
+//       @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv(
+//          %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+//   store { i64, i64 } %input0, { i64, i64 }* %0
+//   %input_unlowered_ptr =
+//       bitcast { i64, i64 }* %0 to %"struct.impala_udf::TimestampVal"*
+//   store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr
+//   %dst_unlowered_ptr =
+//       bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
+//   call void @"void impala::AggregateFunctions::HllUpdate<impala_udf::TimestampVal>"(
+//       %"class.impala_udf::FunctionContext"* %agg_fn_ctx,
+//       %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr,
+//       %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
+//   %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
+//   br label %ret
+//
+// ret:                                              ; preds = %entry
+//   ret void
+// }
+//
+Status Aggregator::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
+    SlotDescriptor* slot_desc, llvm::Function** fn) {
+  llvm::PointerType* agg_fn_eval_type = codegen->GetStructPtrType<AggFnEvaluator>();
+  llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
+  if (tuple_struct == nullptr) {
+    return Status("Aggregator::CodegenUpdateSlot(): failed to generate "
+                  "intermediate tuple desc");
+  }
+  llvm::PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
+
+  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
+
+  LlvmBuilder builder(codegen->context());
+  llvm::Value* args[3];
+  *fn = prototype.GeneratePrototype(&builder, &args[0]);
+  llvm::Value* agg_fn_eval_arg = args[0];
+  llvm::Value* agg_tuple_arg = args[1];
+  llvm::Value* row_arg = args[2];
+
+  // Get the vector of input expressions' evaluators.
+  llvm::Value* input_evals_vector = codegen->CodegenCallFunction(&builder,
+      IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg,
+      "input_evals_vector");
+
+  AggFn* agg_fn = agg_fns_[agg_fn_idx];
+  const int num_inputs = agg_fn->GetNumChildren();
+  DCHECK_GE(num_inputs, 1);
+  vector<CodegenAnyVal> input_vals;
+  for (int i = 0; i < num_inputs; ++i) {
+    ScalarExpr* input_expr = agg_fn->GetChild(i);
+    llvm::Function* input_expr_fn;
+    RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(codegen, &input_expr_fn));
+    DCHECK(input_expr_fn != nullptr);
+
+    // Call input expr function with the matching evaluator to get src slot value.
+    llvm::Value* input_eval =
+        codegen->CodegenArrayAt(&builder, input_evals_vector, i, "input_eval");
+    string input_name = Substitute("input$0", i);
+    CodegenAnyVal input_val = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
+        input_expr->type(), input_expr_fn,
+        llvm::ArrayRef<llvm::Value*>({input_eval, row_arg}), input_name.c_str());
+    input_vals.push_back(input_val);
+  }
+
+  AggFn::AggregationOp agg_op = agg_fn->agg_op();
+  const ColumnType& dst_type = agg_fn->intermediate_type();
+  bool dst_is_int_or_float_or_bool = dst_type.IsIntegerType()
+      || dst_type.IsFloatingPointType() || dst_type.IsBooleanType();
+  bool dst_is_numeric_or_bool = dst_is_int_or_float_or_bool || dst_type.IsDecimalType();
+
+  llvm::BasicBlock* ret_block = llvm::BasicBlock::Create(codegen->context(), "ret", *fn);
+
+  // Emit the code to compute 'result' and set the NULL indicator if needed. First check
+  // for special cases where we can emit a very simple instruction sequence, then fall
+  // back to the general-purpose approach of calling the cross-compiled builtin UDA.
+  CodegenAnyVal& src = input_vals[0];
+
+  // 'dst_slot_ptr' points to the slot in the aggregate tuple to update.
+  llvm::Value* dst_slot_ptr = builder.CreateStructGEP(
+      nullptr, agg_tuple_arg, slot_desc->llvm_field_idx(), "dst_slot_ptr");
+  // TODO: consider moving the following codegen logic to AggFn.
+  if (agg_op == AggFn::COUNT) {
+    src.CodegenBranchIfNull(&builder, ret_block);
+    llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
+    llvm::Value* result = agg_fn->is_merge() ?
+        builder.CreateAdd(dst_value, src.GetVal(), "count_sum") :
+        builder.CreateAdd(dst_value, codegen->GetI64Constant(1), "count_inc");
+    builder.CreateStore(result, dst_slot_ptr);
+    DCHECK(!slot_desc->is_nullable());
+  } else if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && dst_is_numeric_or_bool) {
+    bool is_min = agg_op == AggFn::MIN;
+    src.CodegenBranchIfNull(&builder, ret_block);
+    codegen->CodegenMinMax(
+        &builder, slot_desc->type(), src.GetVal(), dst_slot_ptr, is_min, *fn);
+
+    // Dst may have been NULL, make sure to unset the NULL bit.
+    DCHECK(slot_desc->is_nullable());
+    slot_desc->CodegenSetNullIndicator(
+        codegen, &builder, agg_tuple_arg, codegen->false_value());
+  } else if (agg_op == AggFn::SUM && dst_is_int_or_float_or_bool) {
+    src.CodegenBranchIfNull(&builder, ret_block);
+    llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
+    llvm::Value* result = dst_type.IsFloatingPointType() ?
+        builder.CreateFAdd(dst_value, src.GetVal()) :
+        builder.CreateAdd(dst_value, src.GetVal());
+    builder.CreateStore(result, dst_slot_ptr);
+
+    if (slot_desc->is_nullable()) {
+      slot_desc->CodegenSetNullIndicator(
+          codegen, &builder, agg_tuple_arg, codegen->false_value());
+    } else {
+      // 'slot_desc' is not nullable if the aggregate function is sum_init_zero(),
+      // because the slot is initialized to be zero and the null bit is nonexistent.
+      DCHECK_EQ(agg_fn->fn_name(), "sum_init_zero");
+    }
+  } else {
+    // The remaining cases are implemented using the UDA interface.
+    // Create intermediate argument 'dst' from 'dst_value'
+    CodegenAnyVal dst = CodegenAnyVal::GetNonNullVal(codegen, &builder, dst_type, "dst");
+
+    // For a subset of builtins we generate a different code sequence that exploits two
+    // properties of the builtins. First, NULL input values can be skipped. Second, the
+    // value of the slot was initialized in the right way in InitAggSlots() (e.g. 0 for
+    // SUM) that we get the right result if UpdateSlot() pretends that the NULL bit of
+    // 'dst' is unset. Empirically this optimisation makes TPC-H Q1 5-10% faster.
+    bool special_null_handling = !agg_fn->intermediate_type().IsStringType()
+        && !agg_fn->intermediate_type().IsTimestampType()
+        && (agg_op == AggFn::MIN || agg_op == AggFn::MAX || agg_op == AggFn::SUM
+               || agg_op == AggFn::AVG || agg_op == AggFn::NDV);
+    if (slot_desc->is_nullable()) {
+      if (special_null_handling) {
+        src.CodegenBranchIfNull(&builder, ret_block);
+        slot_desc->CodegenSetNullIndicator(
+            codegen, &builder, agg_tuple_arg, codegen->false_value());
+      } else {
+        dst.SetIsNull(slot_desc->CodegenIsNull(codegen, &builder, agg_tuple_arg));
+      }
+    }
+    dst.LoadFromNativePtr(dst_slot_ptr);
+
+    // Get the FunctionContext object for the AggFnEvaluator.
+    llvm::Function* get_agg_fn_ctx_fn =
+        codegen->GetFunction(IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, false);
+    DCHECK(get_agg_fn_ctx_fn != nullptr);
+    llvm::Value* agg_fn_ctx_val =
+        builder.CreateCall(get_agg_fn_ctx_fn, {agg_fn_eval_arg}, "agg_fn_ctx");
+
+    // Call the UDA to update/merge 'src' into 'dst', with the result stored in
+    // 'updated_dst_val'.
+    CodegenAnyVal updated_dst_val;
+    RETURN_IF_ERROR(CodegenCallUda(
+        codegen, &builder, agg_fn, agg_fn_ctx_val, input_vals, dst, &updated_dst_val));
+    // Copy the value back to the slot. In the FIXED_UDA_INTERMEDIATE case, the
+    // UDA function writes directly to the slot so there is nothing to copy.
+    if (dst_type.type != TYPE_FIXED_UDA_INTERMEDIATE) {
+      updated_dst_val.StoreToNativePtr(dst_slot_ptr);
+    }
+
+    if (slot_desc->is_nullable() && !special_null_handling) {
+      // Set NULL bit in the slot based on the return value.
+      llvm::Value* result_is_null = updated_dst_val.GetIsNull("result_is_null");
+      slot_desc->CodegenSetNullIndicator(
+          codegen, &builder, agg_tuple_arg, result_is_null);
+    }
+  }
+  builder.CreateBr(ret_block);
+
+  builder.SetInsertPoint(ret_block);
+  builder.CreateRetVoid();
+
+  // Avoid producing huge UpdateTuple() function after inlining - LLVM's optimiser
+  // memory/CPU usage scales super-linearly with function size.
+  // E.g. compute stats on all columns of a 1000-column table previously took 4 minutes to
+  // codegen because all the UpdateSlot() functions were inlined.
+  if (agg_fn_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
+    codegen->SetNoInline(*fn);
+  }
+
+  *fn = codegen->FinalizeFunction(*fn);
+  if (*fn == nullptr) {
+    return Status("Aggregator::CodegenUpdateSlot(): codegen'd "
+                  "UpdateSlot() function failed verification, see log");
+  }
+  return Status::OK();
+}
+
+Status Aggregator::CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder,
+    AggFn* agg_fn, llvm::Value* agg_fn_ctx_val, const vector<CodegenAnyVal>& input_vals,
+    const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) {
+  llvm::Function* uda_fn;
+  RETURN_IF_ERROR(agg_fn->CodegenUpdateOrMergeFunction(codegen, &uda_fn));
+
+  // Set up arguments for call to UDA, which are the FunctionContext*, followed by
+  // pointers to all input values, followed by a pointer to the destination value.
+  vector<llvm::Value*> uda_fn_args;
+  uda_fn_args.push_back(agg_fn_ctx_val);
+
+  // Create pointers to input args to pass to uda_fn. We must use the unlowered type,
+  // e.g. IntVal, because the UDA interface expects the values to be passed as const
+  // references to the classes.
+  DCHECK_EQ(agg_fn->GetNumChildren(), input_vals.size());
+  for (int i = 0; i < input_vals.size(); ++i) {
+    uda_fn_args.push_back(input_vals[i].GetUnloweredPtr("input_unlowered_ptr"));
+  }
+
+  // Create pointer to dst to pass to uda_fn. We must use the unlowered type for the
+  // same reason as above.
+  llvm::Value* dst_lowered_ptr = dst_val.GetLoweredPtr("dst_lowered_ptr");
+  const ColumnType& dst_type = agg_fn->intermediate_type();
+  llvm::Type* dst_unlowered_ptr_type =
+      CodegenAnyVal::GetUnloweredPtrType(codegen, dst_type);
+  llvm::Value* dst_unlowered_ptr = builder->CreateBitCast(
+      dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr");
+  uda_fn_args.push_back(dst_unlowered_ptr);
+
+  // Call 'uda_fn'
+  builder->CreateCall(uda_fn, uda_fn_args);
+
+  // Convert intermediate 'dst_arg' back to the native type.
+  llvm::Value* anyval_result = builder->CreateLoad(dst_lowered_ptr, "anyval_result");
+
+  *updated_dst_val = CodegenAnyVal(codegen, builder, dst_type, anyval_result);
+  return Status::OK();
+}
+
+// IR codegen for the UpdateTuple loop.  This loop is query specific and based on the
+// aggregate functions.  The function signature must match the non- codegen'd UpdateTuple
+// exactly.
+// For the query:
+// select count(*), count(int_col), sum(double_col) the IR looks like:
+//
+// define void @UpdateTuple(%"class.impala::Aggregator"* %this_ptr,
+//     %"class.impala::AggFnEvaluator"** %agg_fn_evals, %"class.impala::Tuple"* %tuple,
+//     %"class.impala::TupleRow"* %row, i1 %is_merge) #33 {
+// entry:
+//   %tuple1 = bitcast %"class.impala::Tuple"* %tuple to <{ i64, i64, double, i8 }>*
+//   %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>,
+//       <{ i64, i64, double, i8 }>* %tuple1, i32 0, i32 0
+//   %count_star_val = load i64, i64* %src_slot
+//   %count_star_inc = add i64 %count_star_val, 1
+//   store i64 %count_star_inc, i64* %src_slot
+//   %0 = getelementptr %"class.impala::AggFnEvaluator"*,
+//       %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1
+//   %agg_fn_eval =
+//       load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %0
+//   call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+//       <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
+//   %1 = getelementptr %"class.impala::AggFnEvaluator"*,
+//       %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2
+//   %agg_fn_eval2 =
+//       load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %1
+//   call void @UpdateSlot.2(%"class.impala::AggFnEvaluator"* %agg_fn_eval2,
+//       <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
+//   ret void
+// }
+//
+Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) {
+  for (const SlotDescriptor* slot_desc : intermediate_tuple_desc_->slots()) {
+    if (slot_desc->type().type == TYPE_CHAR) {
+      return Status::Expected("Aggregator::CodegenUpdateTuple(): cannot "
+                              "codegen CHAR in aggregations");
+    }
+  }
+
+  if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == nullptr) {
+    return Status::Expected("Aggregator::CodegenUpdateTuple(): failed to"
+                            " generate intermediate tuple desc");
+  }
+
+  // Get the types to match the UpdateTuple signature
+  llvm::PointerType* agg_node_ptr_type = codegen->GetStructPtrType<Aggregator>();
+  llvm::PointerType* evals_type = codegen->GetStructPtrPtrType<AggFnEvaluator>();
+  llvm::PointerType* tuple_ptr_type = codegen->GetStructPtrType<Tuple>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
+
+  llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
+  llvm::PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct);
+  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_evals", evals_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->bool_type()));
+
+  LlvmBuilder builder(codegen->context());
+  llvm::Value* args[5];
+  *fn = prototype.GeneratePrototype(&builder, &args[0]);
+  llvm::Value* agg_fn_evals_arg = args[1];
+  llvm::Value* tuple_arg = args[2];
+  llvm::Value* row_arg = args[3];
+
+  // Cast the parameter types to the internal llvm runtime types.
+  // TODO: get rid of this by using right type in function signature
+  tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple");
+
+  // Loop over each expr and generate the IR for that slot.  If the expr is not
+  // count(*), generate a helper IR function to update the slot and call that.
+  int j = num_grouping_exprs();
+  for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
+    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
+    AggFn* agg_fn = agg_fns_[i];
+    if (agg_fn->is_count_star()) {
+      // TODO: we should be able to hoist this up to the loop over the batch and just
+      // increment the slot by the number of rows in the batch.
+      int field_idx = slot_desc->llvm_field_idx();
+      llvm::Value* const_one = codegen->GetI64Constant(1);
+      llvm::Value* slot_ptr =
+          builder.CreateStructGEP(nullptr, tuple_arg, field_idx, "src_slot");
+      llvm::Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
+      llvm::Value* count_inc =
+          builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
+      builder.CreateStore(count_inc, slot_ptr);
+    } else {
+      llvm::Function* update_slot_fn;
+      RETURN_IF_ERROR(CodegenUpdateSlot(codegen, i, slot_desc, &update_slot_fn));
+
+      // Load agg_fn_evals_[i]
+      llvm::Value* agg_fn_eval_val =
+          codegen->CodegenArrayAt(&builder, agg_fn_evals_arg, i, "agg_fn_eval");
+
+      // Call UpdateSlot(agg_fn_evals_[i], tuple, row);
+      llvm::Value* update_slot_args[] = {agg_fn_eval_val, tuple_arg, row_arg};
+      builder.CreateCall(update_slot_fn, update_slot_args);
+    }
+  }
+  builder.CreateRetVoid();
+
+  // Avoid inlining big UpdateTuple function into outer loop - we're unlikely to get
+  // any benefit from it since the function call overhead will be amortized.
+  if (agg_fns_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
+    codegen->SetNoInline(*fn);
+  }
+
+  // CodegenProcessBatch() does the final optimizations.
+  *fn = codegen->FinalizeFunction(*fn);
+  if (*fn == nullptr) {
+    return Status("Aggregator::CodegenUpdateTuple(): codegen'd "
+                  "UpdateTuple() function failed verification, see log");
+  }
+  return Status::OK();
+}
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
new file mode 100644
index 0000000..ab13d45
--- /dev/null
+++ b/be/src/exec/aggregator.h
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_AGGREGATOR_H
+#define IMPALA_EXEC_AGGREGATOR_H
+
+#include <vector>
+
+#include "common/global-types.h"
+#include "common/status.h"
+#include "gen-cpp/Types_types.h"
+#include "util/runtime-profile.h"
+
+namespace llvm {
+class Function;
+class Value;
+} // namespace llvm
+
+namespace impala {
+
+class AggFn;
+class AggFnEvaluator;
+class CodegenAnyVal;
+class DescriptorTbl;
+class ExecNode;
+class LlvmBuilder;
+class LlvmCodeGen;
+class MemPool;
+class MemTracker;
+class ObjectPool;
+class RowBatch;
+class RowDescriptor;
+class RuntimeState;
+class ScalarExpr;
+class ScalarExprEvaluator;
+class SlotDescriptor;
+class TPlanNode;
+class Tuple;
+class TupleDescriptor;
+class TupleRow;
+
+/// Base class for aggregating rows. Used in the AggregationNode and
+/// StreamingAggregationNode.
+///
+/// Rows are added by calling AddBatch(). Once all rows have been added, InputDone() must
+/// be called and the results can be fetched with GetNext().
+class Aggregator {
+ public:
+  Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
+      const DescriptorTbl& descs, const std::string& name);
+  virtual ~Aggregator();
+
+  /// Aggregators follow the same lifecycle as ExecNodes, except that after Open() and
+  /// before GetNext() rows should be added with AddBatch(), followed by InputDone()[
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual void Codegen(RuntimeState* state) = 0;
+  virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status GetNext(
+      RuntimeState* state, RowBatch* row_batch, bool* eos) WARN_UNUSED_RESULT = 0;
+  virtual Status Reset(RuntimeState* state) WARN_UNUSED_RESULT = 0;
+  virtual void Close(RuntimeState* state);
+
+  /// Adds all of the rows in 'batch' to the aggregation.
+  virtual Status AddBatch(RuntimeState* state, RowBatch* batch) = 0;
+  /// Indicates that all batches have been added. Must be called before GetNext().
+  virtual Status InputDone() = 0;
+
+  virtual int num_grouping_exprs() = 0;
+  RuntimeProfile* runtime_profile() { return runtime_profile_; }
+
+  virtual void SetDebugOptions(const TDebugOptions& debug_options) = 0;
+
+  virtual std::string DebugString(int indentation_level = 0) const = 0;
+  virtual void DebugString(int indentation_level, std::stringstream* out) const = 0;
+
+  static const char* LLVM_CLASS_NAME;
+
+ protected:
+  /// The id of the ExecNode this Aggregator corresponds to.
+  int id_;
+  ObjectPool* pool_;
+
+  /// Account for peak memory used by this aggregator.
+  std::unique_ptr<MemTracker> mem_tracker_;
+
+  /// MemTracker used by 'expr_perm_pool_' and 'expr_results_pool_'.
+  std::unique_ptr<MemTracker> expr_mem_tracker_;
+
+  /// MemPool for allocations made by expression evaluators in this aggregator that are
+  /// "permanent" and live until Close() is called. Created in Prepare().
+  std::unique_ptr<MemPool> expr_perm_pool_;
+
+  /// MemPool for allocations made by expression evaluators in this aggregator that hold
+  /// intermediate or final results of expression evaluation. Should be cleared
+  /// periodically to free accumulated memory. QueryMaintenance() clears this pool, but
+  /// it may be appropriate for Aggregator implementation to clear it at other points in
+  /// execution where the memory is not needed.
+  std::unique_ptr<MemPool> expr_results_pool_;
+
+  /// Tuple into which Update()/Merge()/Serialize() results are stored.
+  TupleId intermediate_tuple_id_;
+  TupleDescriptor* intermediate_tuple_desc_;
+
+  /// Tuple into which Finalize() results are stored. Possibly the same as
+  /// the intermediate tuple.
+  TupleId output_tuple_id_;
+  TupleDescriptor* output_tuple_desc_;
+
+  /// The RowDescriptor for the exec node this aggregator corresponds to.
+  const RowDescriptor& row_desc_;
+  /// The RowDescriptor for the child of the exec node this aggregator corresponds to.
+  const RowDescriptor& input_row_desc_;
+
+  /// Certain aggregates require a finalize step, which is the final step of the
+  /// aggregate after consuming all input rows. The finalize step converts the aggregate
+  /// value into its final form. This is true if this aggregator contains aggregate that
+  /// requires a finalize step.
+  const bool needs_finalize_;
+
+  /// The list of all aggregate operations for this aggregator.
+  std::vector<AggFn*> agg_fns_;
+
+  /// Evaluators for each aggregate function. If this is a grouping aggregation, these
+  /// evaluators are only used to create cloned per-partition evaluators. The cloned
+  /// evaluators are then used to evaluate the functions. If this is a non-grouping
+  /// aggregation these evaluators are used directly to evaluate the functions.
+  ///
+  /// Permanent and result allocations for these allocators are allocated from
+  /// 'expr_perm_pool_' and 'expr_results_pool_' respectively.
+  std::vector<AggFnEvaluator*> agg_fn_evals_;
+
+  /// Conjuncts and their evaluators in this aggregator. 'conjuncts_' live in the
+  /// query-state's object pool while the evaluators live in this aggregator's
+  /// object pool.
+  std::vector<ScalarExpr*> conjuncts_;
+  std::vector<ScalarExprEvaluator*> conjunct_evals_;
+
+  /// Runtime profile for this aggregator. Owned by 'pool_'.
+  RuntimeProfile* const runtime_profile_;
+
+  int64_t num_rows_returned_;
+  RuntimeProfile::Counter* rows_returned_counter_;
+
+  /// Time spent processing the child rows
+  RuntimeProfile::Counter* build_timer_;
+
+  /// Initializes the aggregate function slots of an intermediate tuple.
+  /// Any var-len data is allocated from the FunctionContexts.
+  void InitAggSlots(
+      const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple);
+
+  /// Updates the given aggregation intermediate tuple with aggregation values computed
+  /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls Update() or
+  /// Merge() is controlled by the evaluator itself, unless enforced explicitly by passing
+  /// in is_merge == true.  The override is needed to merge spilled and non-spilled rows
+  /// belonging to the same partition independent of whether the agg fn evaluators have
+  /// is_merge() == true.
+  /// This function is replaced by codegen (which is why we don't use a vector argument
+  /// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts.
+  /// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too.
+  void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row,
+      bool is_merge = false) noexcept;
+
+  /// Called on the intermediate tuple of each group after all input rows have been
+  /// consumed and aggregated. Computes the final aggregate values to be returned in
+  /// GetNext() using the agg fn evaluators' Serialize() or Finalize().
+  /// For the Finalize() case if the output tuple is different from the intermediate
+  /// tuple, then a new tuple is allocated from 'pool' to hold the final result.
+  /// Grouping values are copied into the output tuple and the the output tuple holding
+  /// the finalized/serialized aggregate values is returned.
+  /// TODO: Coordinate the allocation of new tuples with the release of memory
+  /// so as not to make memory consumption blow up.
+  Tuple* GetOutputTuple(
+      const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool);
+
+  /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx]
+  /// and returns the IR function in 'fn'. Returns non-OK status if codegen
+  /// is unsuccessful.
+  Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
+      SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT;
+
+  /// Codegen a call to a function implementing the UDA interface with input values
+  /// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate
+  /// function, and 'updated_dst_val' is set to the new value after the Update or Merge
+  /// operation is applied. The instruction sequence for the UDA call is inserted at
+  /// the insert position of 'builder'.
+  Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn,
+      llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals,
+      const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) WARN_UNUSED_RESULT;
+
+  /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
+  Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT;
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_AGGREGATOR_H

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index eeefaed..384d65d 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -29,6 +29,7 @@
 #include "common/status.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "exec/aggregation-node.h"
 #include "exec/analytic-eval-node.h"
 #include "exec/cardinality-check-node.h"
 #include "exec/data-source-scan-node.h"
@@ -42,11 +43,11 @@
 #include "exec/kudu-util.h"
 #include "exec/nested-loop-join-node.h"
 #include "exec/partial-sort-node.h"
-#include "exec/partitioned-aggregation-node.h"
 #include "exec/partitioned-hash-join-node.h"
 #include "exec/select-node.h"
 #include "exec/singular-row-src-node.h"
 #include "exec/sort-node.h"
+#include "exec/streaming-aggregation-node.h"
 #include "exec/subplan-node.h"
 #include "exec/topn-node.h"
 #include "exec/union-node.h"
@@ -303,7 +304,11 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       }
       break;
     case TPlanNodeType::AGGREGATION_NODE:
-      *node = pool->Add(new PartitionedAggregationNode(pool, tnode, descs));
+      if (tnode.agg_node.use_streaming_preaggregation) {
+        *node = pool->Add(new StreamingAggregationNode(pool, tnode, descs));
+      } else {
+        *node = pool->Add(new AggregationNode(pool, tnode, descs));
+      }
       break;
     case TPlanNodeType::HASH_JOIN_NODE:
       *node = pool->Add(new PartitionedHashJoinNode(pool, tnode, descs));

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index ad9ae10..9a87a56 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -211,6 +211,7 @@ class ExecNode {
   MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
   MemPool* expr_perm_pool() { return expr_perm_pool_.get(); }
   MemPool* expr_results_pool() { return expr_results_pool_.get(); }
+  const TBackendResourceProfile& resource_profile() { return resource_profile_; }
   bool is_closed() const { return is_closed_; }
 
   /// Return true if codegen was disabled by the planner for this ExecNode. Does not
@@ -220,6 +221,10 @@ class ExecNode {
   /// Extract node id from p->name().
   static int GetNodeIdFromProfile(RuntimeProfile* p);
 
+  /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode.
+  /// Valid to call in or after Prepare().
+  bool IsInSubplan() const { return containing_subplan_ != NULL; }
+
   /// Names of counters shared by all exec nodes
   static const std::string ROW_THROUGHPUT_COUNTER;
 
@@ -322,10 +327,6 @@ class ExecNode {
   /// Set by SubplanNode::Init(). Not owned.
   SubplanNode* containing_subplan_;
 
-  /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode.
-  /// Valid to call in or after Prepare().
-  bool IsInSubplan() const { return containing_subplan_ != NULL; }
-
   /// If true, codegen should be disabled for this exec node.
   const bool disable_codegen_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator-ir.cc b/be/src/exec/grouping-aggregator-ir.cc
new file mode 100644
index 0000000..d3dbf17
--- /dev/null
+++ b/be/src/exec/grouping-aggregator-ir.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/grouping-aggregator.h"
+
+#include "exec/hash-table.inline.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "runtime/row-batch.h"
+#include "runtime/tuple-row.h"
+
+using namespace impala;
+
+template <bool AGGREGATED_ROWS>
+Status GroupingAggregator::AddBatchImpl(RowBatch* batch,
+    TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) {
+  DCHECK(!hash_partitions_.empty());
+  DCHECK(!is_streaming_preagg_);
+
+  // Make sure that no resizes will happen when inserting individual rows to the hash
+  // table of each partition by pessimistically assuming that all the rows in each batch
+  // will end up to the same partition.
+  // TODO: Once we have a histogram with the number of rows per partition, we will have
+  // accurate resize calls.
+  RETURN_IF_ERROR(
+      CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx));
+
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int cache_size = expr_vals_cache->capacity();
+  const int num_rows = batch->num_rows();
+  for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
+    EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, prefetch_mode, ht_ctx);
+
+    FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
+      RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
+      expr_vals_cache->NextRow();
+    }
+    DCHECK(expr_vals_cache->AtEnd());
+  }
+  return Status::OK();
+}
+
+template <bool AGGREGATED_ROWS>
+void IR_ALWAYS_INLINE GroupingAggregator::EvalAndHashPrefetchGroup(RowBatch* batch,
+    int start_row_idx, TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int cache_size = expr_vals_cache->capacity();
+
+  expr_vals_cache->Reset();
+  FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) {
+    TupleRow* row = batch_iter.Get();
+    bool is_null;
+    if (AGGREGATED_ROWS) {
+      is_null = !ht_ctx->EvalAndHashBuild(row);
+    } else {
+      is_null = !ht_ctx->EvalAndHashProbe(row);
+    }
+    // Hoist lookups out of non-null branch to speed up non-null case.
+    const uint32_t hash = expr_vals_cache->CurExprValuesHash();
+    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+    HashTable* hash_tbl = GetHashTable(partition_idx);
+    if (is_null) {
+      expr_vals_cache->SetRowNull();
+    } else if (prefetch_mode != TPrefetchMode::NONE) {
+      if (LIKELY(hash_tbl != nullptr)) hash_tbl->PrefetchBucket<false>(hash);
+    }
+    expr_vals_cache->NextRow();
+  }
+
+  expr_vals_cache->ResetForRead();
+}
+
+template <bool AGGREGATED_ROWS>
+Status GroupingAggregator::ProcessRow(
+    TupleRow* __restrict__ row, HashTableCtx* __restrict__ ht_ctx) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  // Hoist lookups out of non-null branch to speed up non-null case.
+  const uint32_t hash = expr_vals_cache->CurExprValuesHash();
+  const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+  if (expr_vals_cache->IsRowNull()) return Status::OK();
+  // To process this row, we first see if it can be aggregated or inserted into this
+  // partition's hash table. If we need to insert it and that fails, due to OOM, we
+  // spill the partition. The partition to spill is not necessarily dst_partition,
+  // so we can try again to insert the row.
+  HashTable* hash_tbl = GetHashTable(partition_idx);
+  Partition* dst_partition = hash_partitions_[partition_idx];
+  DCHECK(dst_partition != nullptr);
+  DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == nullptr);
+  if (hash_tbl == nullptr) {
+    // This partition is already spilled, just append the row.
+    return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row);
+  }
+
+  DCHECK(dst_partition->aggregated_row_stream->is_pinned());
+  bool found;
+  // Find the appropriate bucket in the hash table. There will always be a free
+  // bucket because we checked the size above.
+  HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
+  DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
+  if (AGGREGATED_ROWS) {
+    // If the row is already an aggregate row, it cannot match anything in the
+    // hash table since we process the aggregate rows first. These rows should
+    // have been aggregated in the initial pass.
+    DCHECK(!found);
+  } else if (found) {
+    // Row is already in hash table. Do the aggregation and we're done.
+    UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row);
+    return Status::OK();
+  }
+
+  // If we are seeing this result row for the first time, we need to construct the
+  // result row and initialize it.
+  return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it);
+}
+
+template <bool AGGREGATED_ROWS>
+Status GroupingAggregator::AddIntermediateTuple(Partition* __restrict__ partition,
+    TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it) {
+  while (true) {
+    DCHECK(partition->aggregated_row_stream->is_pinned());
+    Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
+        partition->aggregated_row_stream.get(), &add_batch_status_);
+
+    if (LIKELY(intermediate_tuple != nullptr)) {
+      UpdateTuple(
+          partition->agg_fn_evals.data(), intermediate_tuple, row, AGGREGATED_ROWS);
+      // After copying and initializing the tuple, insert it into the hash table.
+      insert_it.SetTuple(intermediate_tuple, hash);
+      return Status::OK();
+    } else if (!add_batch_status_.ok()) {
+      return std::move(add_batch_status_);
+    }
+
+    // We did not have enough memory to add intermediate_tuple to the stream.
+    RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
+    if (partition->is_spilled()) {
+      return AppendSpilledRow<AGGREGATED_ROWS>(partition, row);
+    }
+  }
+}
+
+Status GroupingAggregator::AddBatchStreamingImpl(bool needs_serialize,
+    TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
+    HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
+  DCHECK(is_streaming_preagg_);
+  DCHECK_EQ(out_batch->num_rows(), 0);
+  DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
+
+  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int num_rows = in_batch->num_rows();
+  const int cache_size = expr_vals_cache->capacity();
+  for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
+    EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, ht_ctx);
+
+    FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
+      // Hoist lookups out of non-null branch to speed up non-null case.
+      TupleRow* in_row = in_batch_iter.Get();
+      const uint32_t hash = expr_vals_cache->CurExprValuesHash();
+      const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+      if (!expr_vals_cache->IsRowNull()
+          && !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
+                 GetHashTable(partition_idx), in_row, hash,
+                 &remaining_capacity[partition_idx], &add_batch_status_)) {
+        RETURN_IF_ERROR(std::move(add_batch_status_));
+        // Tuple is not going into hash table, add it to the output batch.
+        Tuple* intermediate_tuple = ConstructIntermediateTuple(
+            agg_fn_evals_, out_batch->tuple_data_pool(), &add_batch_status_);
+        if (UNLIKELY(intermediate_tuple == nullptr)) {
+          DCHECK(!add_batch_status_.ok());
+          return std::move(add_batch_status_);
+        }
+        UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row);
+        out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
+        out_batch_iterator.Next();
+        out_batch->CommitLastRow();
+      }
+      DCHECK(add_batch_status_.ok());
+      expr_vals_cache->NextRow();
+    }
+    DCHECK(expr_vals_cache->AtEnd());
+  }
+  if (needs_serialize) {
+    FOREACH_ROW(out_batch, 0, out_batch_iter) {
+      AggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.Get()->GetTuple(0));
+    }
+  }
+
+  return Status::OK();
+}
+
+bool GroupingAggregator::TryAddToHashTable(HashTableCtx* __restrict__ ht_ctx,
+    Partition* __restrict__ partition, HashTable* __restrict__ hash_tbl,
+    TupleRow* __restrict__ in_row, uint32_t hash, int* __restrict__ remaining_capacity,
+    Status* status) {
+  DCHECK(remaining_capacity != nullptr);
+  DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
+  DCHECK_GE(*remaining_capacity, 0);
+  bool found;
+  // This is called from ProcessBatchStreaming() so the rows are not aggregated.
+  HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
+  Tuple* intermediate_tuple;
+  if (found) {
+    intermediate_tuple = it.GetTuple();
+  } else if (*remaining_capacity == 0) {
+    return false;
+  } else {
+    intermediate_tuple = ConstructIntermediateTuple(
+        partition->agg_fn_evals, partition->aggregated_row_stream.get(), status);
+    if (LIKELY(intermediate_tuple != nullptr)) {
+      it.SetTuple(intermediate_tuple, hash);
+      --(*remaining_capacity);
+    } else {
+      // Avoid repeatedly trying to add tuples when under memory pressure.
+      *remaining_capacity = 0;
+      return false;
+    }
+  }
+
+  UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row);
+  return true;
+}
+
+// Instantiate required templates.
+template Status GroupingAggregator::AddBatchImpl<false>(
+    RowBatch*, TPrefetchMode::type, HashTableCtx*);
+template Status GroupingAggregator::AddBatchImpl<true>(
+    RowBatch*, TPrefetchMode::type, HashTableCtx*);

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator-partition.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
new file mode 100644
index 0000000..8fe08f4
--- /dev/null
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/grouping-aggregator.h"
+
+#include <set>
+#include <sstream>
+
+#include "exec/exec-node.h"
+#include "exec/hash-table.inline.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-pool.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+namespace impala {
+
+GroupingAggregator::Partition::~Partition() {
+  DCHECK(is_closed);
+}
+
+Status GroupingAggregator::Partition::InitStreams() {
+  agg_fn_perm_pool.reset(new MemPool(parent->expr_mem_tracker_.get()));
+  DCHECK_EQ(agg_fn_evals.size(), 0);
+  AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_perm_pool.get(),
+      parent->expr_results_pool_.get(), parent->agg_fn_evals_, &agg_fn_evals);
+  // Varlen aggregate function results are stored outside of aggregated_row_stream because
+  // BufferedTupleStream doesn't support relocating varlen data stored in the stream.
+  auto agg_slot =
+      parent->intermediate_tuple_desc_->slots().begin() + parent->grouping_exprs_.size();
+  std::set<SlotId> external_varlen_slots;
+  for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); ++agg_slot) {
+    if ((*agg_slot)->type().IsVarLenStringType()) {
+      external_varlen_slots.insert((*agg_slot)->id());
+    }
+  }
+
+  aggregated_row_stream.reset(
+      new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
+          parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+          parent->resource_profile_.max_row_buffer_size, external_varlen_slots));
+  RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id_, true));
+  bool got_buffer;
+  RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
+  DCHECK(got_buffer) << "Buffer included in reservation " << parent->id_ << "\n"
+                     << parent->buffer_pool_client()->DebugString() << "\n"
+                     << parent->DebugString(2);
+  if (!parent->is_streaming_preagg_) {
+    unaggregated_row_stream.reset(
+        new BufferedTupleStream(parent->state_, &parent->input_row_desc_,
+            parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+            parent->resource_profile_.max_row_buffer_size));
+    // This stream is only used to spill, no need to ever have this pinned.
+    RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id_, false));
+    // Save memory by waiting until we spill to allocate the write buffer for the
+    // unaggregated row stream.
+    DCHECK(!unaggregated_row_stream->has_write_iterator());
+  }
+  return Status::OK();
+}
+
+Status GroupingAggregator::Partition::InitHashTable(bool* got_memory) {
+  DCHECK(aggregated_row_stream != nullptr);
+  DCHECK(hash_tbl == nullptr);
+  // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
+  // remaining bits can be used for the hash table.
+  // TODO: we could switch to 64 bit hashes and then we don't need a max size.
+  // It might be reasonable to limit individual hash table size for other reasons
+  // though. Always start with small buffers.
+  hash_tbl.reset(HashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr,
+      1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ));
+  // Please update the error message in CreateHashPartitions() if initial size of
+  // hash table changes.
+  return hash_tbl->Init(got_memory);
+}
+
+Status GroupingAggregator::Partition::SerializeStreamForSpilling() {
+  DCHECK(!parent->is_streaming_preagg_);
+  if (parent->needs_serialize_) {
+    // We need to do a lot more work in this case. This step effectively does a merge
+    // aggregation in this node. We need to serialize the intermediates, spill the
+    // intermediates and then feed them into the aggregate function's merge step.
+    // This is often used when the intermediate is a string type, meaning the current
+    // (before serialization) in-memory layout is not the on-disk block layout.
+    // The disk layout does not support mutable rows. We need to rewrite the stream
+    // into the on disk format.
+    // TODO: if it happens to not be a string, we could serialize in place. This is
+    // a future optimization since it is very unlikely to have a serialize phase
+    // for those UDAs.
+    DCHECK(parent->serialize_stream_.get() != nullptr);
+    DCHECK(!parent->serialize_stream_->is_pinned());
+
+    // Serialize and copy the spilled partition's stream into the new stream.
+    Status status;
+    BufferedTupleStream* new_stream = parent->serialize_stream_.get();
+    HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
+    while (!it.AtEnd()) {
+      Tuple* tuple = it.GetTuple();
+      it.Next();
+      AggFnEvaluator::Serialize(agg_fn_evals, tuple);
+      if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) {
+        DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error";
+        // Even if we can't add to new_stream, finish up processing this agg stream to
+        // make clean up easier (someone has to finalize this stream and we don't want to
+        // remember where we are).
+        parent->CleanupHashTbl(agg_fn_evals, it);
+        hash_tbl->Close();
+        hash_tbl.reset();
+        aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+        return status;
+      }
+    }
+
+    aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+    aggregated_row_stream.swap(parent->serialize_stream_);
+    // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
+    // when we need to spill again. We need to have this available before we need
+    // to spill to make sure it is available. This should be acquirable since we just
+    // freed at least one buffer from this partition's (old) aggregated_row_stream.
+    parent->serialize_stream_.reset(
+        new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
+            parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+            parent->resource_profile_.max_row_buffer_size));
+    status = parent->serialize_stream_->Init(parent->id_, false);
+    if (status.ok()) {
+      bool got_buffer;
+      status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
+      DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation";
+    }
+    if (!status.ok()) {
+      hash_tbl->Close();
+      hash_tbl.reset();
+      return status;
+    }
+    DCHECK(parent->serialize_stream_->has_write_iterator());
+  }
+  return Status::OK();
+}
+
+Status GroupingAggregator::Partition::Spill(bool more_aggregate_rows) {
+  DCHECK(!parent->is_streaming_preagg_);
+  DCHECK(!is_closed);
+  DCHECK(!is_spilled());
+  RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker_.get()));
+
+  RETURN_IF_ERROR(SerializeStreamForSpilling());
+
+  // Free the in-memory result data.
+  AggFnEvaluator::Close(agg_fn_evals, parent->state_);
+  agg_fn_evals.clear();
+
+  if (agg_fn_perm_pool.get() != nullptr) {
+    agg_fn_perm_pool->FreeAll();
+    agg_fn_perm_pool.reset();
+  }
+
+  hash_tbl->Close();
+  hash_tbl.reset();
+
+  // Unpin the stream to free memory, but leave a write buffer in place so we can
+  // continue appending rows to one of the streams in the partition.
+  DCHECK(aggregated_row_stream->has_write_iterator());
+  DCHECK(!unaggregated_row_stream->has_write_iterator());
+  if (more_aggregate_rows) {
+    aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  } else {
+    aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    bool got_buffer;
+    RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
+    DCHECK(got_buffer) << "Accounted in min reservation"
+                       << parent->buffer_pool_client()->DebugString();
+  }
+
+  COUNTER_ADD(parent->num_spilled_partitions_, 1);
+  if (parent->num_spilled_partitions_->value() == 1) {
+    parent->runtime_profile()->AppendExecOption("Spilled");
+  }
+  return Status::OK();
+}
+
+void GroupingAggregator::Partition::Close(bool finalize_rows) {
+  if (is_closed) return;
+  is_closed = true;
+  if (aggregated_row_stream.get() != nullptr) {
+    if (finalize_rows && hash_tbl.get() != nullptr) {
+      // We need to walk all the rows and Finalize them here so the UDA gets a chance
+      // to cleanup. If the hash table is gone (meaning this was spilled), the rows
+      // should have been finalized/serialized in Spill().
+      parent->CleanupHashTbl(agg_fn_evals, hash_tbl->Begin(parent->ht_ctx_.get()));
+    }
+    aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+  if (hash_tbl.get() != nullptr) hash_tbl->Close();
+  if (unaggregated_row_stream.get() != nullptr) {
+    unaggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+  for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
+  if (agg_fn_perm_pool.get() != nullptr) agg_fn_perm_pool->FreeAll();
+}
+} // namespace impala