You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/03/20 06:47:33 UTC

[incubator-doris] branch master updated: Revert "[CodeStyle] Del obsolete code of partition_aggregation_node (#3154)" (#3160)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d3dbc2  Revert "[CodeStyle] Del obsolete code of partition_aggregation_node (#3154)" (#3160)
2d3dbc2 is described below

commit 2d3dbc2c4250499b9787bcc21f74d4060b573ce5
Author: lichaoyong <li...@baidu.com>
AuthorDate: Fri Mar 20 14:47:25 2020 +0800

    Revert "[CodeStyle] Del obsolete code of partition_aggregation_node (#3154)" (#3160)
    
    This reverts commit dae013d797c1c2c9e54246d5ace4bdd90b297d43.
---
 be/src/common/config.h                             |    3 +-
 be/src/exec/CMakeLists.txt                         |    6 +-
 be/src/exec/exec_node.cpp                          |    3 +
 ...node.cc => new_partitioned_aggregation_node.cc} |   94 +-
 ...n_node.h => new_partitioned_aggregation_node.h} |   14 +-
 ...r.cc => new_partitioned_aggregation_node_ir.cc} |   20 +-
 be/src/exec/partitioned_aggregation_node.cc        | 2218 ++++++++------------
 be/src/exec/partitioned_aggregation_node.h         | 1085 ++++------
 be/src/exec/partitioned_aggregation_node_ir.cc     |  300 +--
 be/src/runtime/row_batch.cpp                       |   12 +-
 10 files changed, 1523 insertions(+), 2232 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index f722ff8..d1faee8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -337,7 +337,8 @@ namespace config {
 
     // for partition
     CONF_Bool(enable_partitioned_hash_join, "false")
-    CONF_Bool(enable_partitioned_aggregation, "true")
+    CONF_Bool(enable_partitioned_aggregation, "false")
+    CONF_Bool(enable_new_partitioned_aggregation, "true")
 
     // for kudu
     // "The maximum size of the row batch queue, for Kudu scanners."
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 20e00ef..705c439 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -85,10 +85,12 @@ set(EXEC_FILES
     schema_scanner/schema_helper.cpp
     partitioned_hash_table.cc
     partitioned_hash_table_ir.cc
-    new_partitioned_hash_table.cc
-    new_partitioned_hash_table_ir.cc
     partitioned_aggregation_node.cc
     partitioned_aggregation_node_ir.cc
+    new_partitioned_hash_table.cc
+    new_partitioned_hash_table_ir.cc
+    new_partitioned_aggregation_node.cc
+    new_partitioned_aggregation_node_ir.cc
     local_file_writer.cpp
     broker_writer.cpp
     parquet_scanner.cpp
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index c236401..62df715 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -26,6 +26,7 @@
 #include "exprs/expr_context.h"
 #include "exec/aggregation_node.h"
 #include "exec/partitioned_aggregation_node.h"
+#include "exec/new_partitioned_aggregation_node.h"
 #include "exec/csv_scan_node.h"
 #include "exec/es_scan_node.h"
 #include "exec/es_http_scan_node.h"
@@ -382,6 +383,8 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
     case TPlanNodeType::AGGREGATION_NODE:
         if (config::enable_partitioned_aggregation) {
             *node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
+        } else if (config::enable_new_partitioned_aggregation) {
+            *node = pool->add(new NewPartitionedAggregationNode(pool, tnode, descs));
         } else {
             *node = pool->add(new AggregationNode(pool, tnode, descs));
         }
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/new_partitioned_aggregation_node.cc
similarity index 94%
copy from be/src/exec/partitioned_aggregation_node.cc
copy to be/src/exec/new_partitioned_aggregation_node.cc
index 4c68cdb..2468803 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/new_partitioned_aggregation_node.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/partitioned_aggregation_node.h"
+#include "exec/new_partitioned_aggregation_node.h"
 
 #include <math.h>
 #include <algorithm>
@@ -93,7 +93,7 @@ static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
 static const int STREAMING_HT_MIN_REDUCTION_SIZE =
     sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
-PartitionedAggregationNode::PartitionedAggregationNode(
+NewPartitionedAggregationNode::NewPartitionedAggregationNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : ExecNode(pool, tnode, descs),
     intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
@@ -142,7 +142,7 @@ PartitionedAggregationNode::PartitionedAggregationNode(
     }
 }
 
-Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {
+Status NewPartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::init(tnode));
   DCHECK(intermediate_tuple_desc_ != nullptr);
   DCHECK(output_tuple_desc_ != nullptr);
@@ -177,7 +177,7 @@ Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* st
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::prepare(RuntimeState* state) {
+Status NewPartitionedAggregationNode::prepare(RuntimeState* state) {
   SCOPED_TIMER(_runtime_profile->total_time_counter());
 
   RETURN_IF_ERROR(ExecNode::prepare(state));
@@ -231,7 +231,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::open(RuntimeState* state) {
+Status NewPartitionedAggregationNode::open(RuntimeState* state) {
   SCOPED_TIMER(_runtime_profile->total_time_counter());
   // Open the child before consuming resources in this node.
   RETURN_IF_ERROR(child(0)->open(state));
@@ -329,7 +329,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch,
+Status NewPartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch,
     bool* eos) {
   int first_row_idx = row_batch->num_rows();
   RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
@@ -337,7 +337,7 @@ Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_b
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
+Status NewPartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
     int first_row_idx) {
   if (!needs_finalize_ && !needs_serialize_) return Status::OK();
   // String data returned by Serialize() or Finalize() is from local expr allocations in
@@ -362,7 +362,7 @@ Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc,
+Status NewPartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc,
     RowBatch* row_batch, int first_row_idx, MemPool* pool) {
   DCHECK(slot_desc.type().is_var_len_string_type());
   DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
@@ -383,7 +383,7 @@ Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_des
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
+Status NewPartitionedAggregationNode::GetNextInternal(RuntimeState* state,
     RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(_runtime_profile->total_time_counter());
   RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
@@ -419,7 +419,7 @@ Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
   return Status::OK();
 }
 
-void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
+void NewPartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
   DCHECK(grouping_exprs_.empty());
   int row_idx = row_batch->add_row();
   TupleRow* row = row_batch->get_row(row_idx);
@@ -439,7 +439,7 @@ void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
   singleton_output_tuple_ = NULL;
 }
 
-Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
+Status NewPartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
     RowBatch* row_batch) {
   DCHECK(!row_batch->at_capacity());
   if (output_iterator_.AtEnd()) {
@@ -496,7 +496,7 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
+Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
     RowBatch* out_batch) {
   DCHECK(!child_eos_);
   DCHECK(is_streaming_preagg_);
@@ -565,7 +565,7 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
   return Status::OK();
 }
 
-bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
+bool NewPartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
   int64_t ht_mem = 0;
   int64_t ht_rows = 0;
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
@@ -615,7 +615,7 @@ bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
   return current_reduction > min_reduction;
 }
 
-void PartitionedAggregationNode::CleanupHashTbl(
+void NewPartitionedAggregationNode::CleanupHashTbl(
     const vector<NewAggFnEvaluator*>& agg_fn_evals, NewPartitionedHashTable::Iterator it) {
   if (!needs_finalize_ && !needs_serialize_) return;
 
@@ -640,7 +640,7 @@ void PartitionedAggregationNode::CleanupHashTbl(
   }
 }
 
-Status PartitionedAggregationNode::reset(RuntimeState* state) {
+Status NewPartitionedAggregationNode::reset(RuntimeState* state) {
   DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
   if (!grouping_exprs_.empty()) {
     child_eos_ = false;
@@ -652,7 +652,7 @@ Status PartitionedAggregationNode::reset(RuntimeState* state) {
   return ExecNode::reset(state);
 }
 
-Status PartitionedAggregationNode::close(RuntimeState* state) {
+Status NewPartitionedAggregationNode::close(RuntimeState* state) {
   if (is_closed()) return Status::OK();
 
   if (!singleton_output_tuple_returned_) {
@@ -688,11 +688,11 @@ Status PartitionedAggregationNode::close(RuntimeState* state) {
   return ExecNode::close(state);
 }
 
-PartitionedAggregationNode::Partition::~Partition() {
+NewPartitionedAggregationNode::Partition::~Partition() {
   DCHECK(is_closed);
 }
 
-Status PartitionedAggregationNode::Partition::InitStreams() {
+Status NewPartitionedAggregationNode::Partition::InitStreams() {
   agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
   DCHECK_EQ(agg_fn_evals.size(), 0);
   NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(),
@@ -735,7 +735,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
+Status NewPartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
   DCHECK(aggregated_row_stream != nullptr);
   DCHECK(hash_tbl == nullptr);
   // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
@@ -750,7 +750,7 @@ Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
   return hash_tbl->Init(got_memory);
 }
 
-Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
+Status NewPartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
   DCHECK(!parent->is_streaming_preagg_);
   if (parent->needs_serialize_) {
     // We need to do a lot more work in this case. This step effectively does a merge
@@ -813,7 +813,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
+Status NewPartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
   DCHECK(!parent->is_streaming_preagg_);
   DCHECK(!is_closed);
   DCHECK(!is_spilled());
@@ -858,7 +858,7 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
   return Status::OK();
 }
 
-void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
+void NewPartitionedAggregationNode::Partition::Close(bool finalize_rows) {
   if (is_closed) return;
   is_closed = true;
   if (aggregated_row_stream.get() != NULL) {
@@ -879,7 +879,7 @@ void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
   if (agg_fn_pool.get() != NULL) agg_fn_pool->free_all();
 }
 
-Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
+Tuple* NewPartitionedAggregationNode::ConstructSingletonOutputTuple(
     const vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
   DCHECK(grouping_exprs_.empty());
   Tuple* output_tuple = Tuple::create(intermediate_tuple_desc_->byte_size(), pool);
@@ -887,7 +887,7 @@ Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
   return output_tuple;
 }
 
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
+Tuple* NewPartitionedAggregationNode::ConstructIntermediateTuple(
     const vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool, Status* status) {
   const int fixed_size = intermediate_tuple_desc_->byte_size();
   const int varlen_size = GroupingExprsVarlenSize();
@@ -907,7 +907,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
   return intermediate_tuple;
 }
 
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
+Tuple* NewPartitionedAggregationNode::ConstructIntermediateTuple(
     const vector<NewAggFnEvaluator*>& agg_fn_evals, BufferedTupleStream3* stream,
     Status* status) {
   DCHECK(stream != NULL && status != NULL);
@@ -931,7 +931,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
   return tuple;
 }
 
-int PartitionedAggregationNode::GroupingExprsVarlenSize() {
+int NewPartitionedAggregationNode::GroupingExprsVarlenSize() {
   int varlen_size = 0;
   // TODO: The hash table could compute this as it hashes.
   for (int expr_idx: string_grouping_exprs_) {
@@ -943,7 +943,7 @@ int PartitionedAggregationNode::GroupingExprsVarlenSize() {
 }
 
 // TODO: codegen this function.
-void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
+void NewPartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
     uint8_t* buffer, int varlen_size) {
   // Copy over all grouping slots (the variable length data is copied below).
   for (int i = 0; i < grouping_exprs_.size(); ++i) {
@@ -971,7 +971,7 @@ void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
 }
 
 // TODO: codegen this function.
-void PartitionedAggregationNode::InitAggSlots(
+void NewPartitionedAggregationNode::InitAggSlots(
     const vector<NewAggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
   vector<SlotDescriptor*>::const_iterator slot_desc =
       intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size();
@@ -1008,7 +1008,7 @@ void PartitionedAggregationNode::InitAggSlots(
   }
 }
 
-void PartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals,
+void NewPartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals,
     Tuple* tuple, TupleRow* row, bool is_merge) {
   DCHECK(tuple != NULL || agg_fns_.empty());
   for (int i = 0; i < agg_fns_.size(); ++i) {
@@ -1020,7 +1020,7 @@ void PartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals,
   }
 }
 
-Tuple* PartitionedAggregationNode::GetOutputTuple(
+Tuple* NewPartitionedAggregationNode::GetOutputTuple(
     const vector<NewAggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
   DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple;
   Tuple* dst = tuple;
@@ -1049,7 +1049,7 @@ Tuple* PartitionedAggregationNode::GetOutputTuple(
 }
 
 template <bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AppendSpilledRow(
+Status NewPartitionedAggregationNode::AppendSpilledRow(
     Partition* partition, TupleRow* row) {
   DCHECK(!is_streaming_preagg_);
   DCHECK(partition->is_spilled());
@@ -1070,16 +1070,16 @@ Status PartitionedAggregationNode::AppendSpilledRow(
   }
 }
 
-string PartitionedAggregationNode::DebugString(int indentation_level) const {
+string NewPartitionedAggregationNode::DebugString(int indentation_level) const {
   stringstream ss;
   DebugString(indentation_level, &ss);
   return ss.str();
 }
 
-void PartitionedAggregationNode::DebugString(int indentation_level,
+void NewPartitionedAggregationNode::DebugString(int indentation_level,
     stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
-  *out << "PartitionedAggregationNode("
+  *out << "NewPartitionedAggregationNode("
        << "intermediate_tuple_id=" << intermediate_tuple_id_
        << " output_tuple_id=" << output_tuple_id_
        << " needs_finalize=" << needs_finalize_
@@ -1089,7 +1089,7 @@ void PartitionedAggregationNode::DebugString(int indentation_level,
   *out << ")";
 }
 
-Status PartitionedAggregationNode::CreateHashPartitions(
+Status NewPartitionedAggregationNode::CreateHashPartitions(
     int level, int single_partition_idx) {
   if (is_streaming_preagg_) DCHECK_EQ(level, 0);
   if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
@@ -1158,7 +1158,7 @@ Status PartitionedAggregationNode::CreateHashPartitions(
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
+Status NewPartitionedAggregationNode::CheckAndResizeHashPartitions(
     bool partitioning_aggregated_rows, int num_rows, const NewPartitionedHashTableCtx* ht_ctx) {
   DCHECK(!is_streaming_preagg_);
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
@@ -1177,7 +1177,7 @@ Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::NextPartition() {
+Status NewPartitionedAggregationNode::NextPartition() {
   DCHECK(output_partition_ == nullptr);
 
   if (!is_in_subplan() && spilled_partitions_.empty()) {
@@ -1226,7 +1226,7 @@ Status PartitionedAggregationNode::NextPartition() {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) {
+Status NewPartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) {
   DCHECK(!spilled_partitions_.empty());
   DCHECK(!is_streaming_preagg_);
   // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
@@ -1267,7 +1267,7 @@ Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_parti
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::RepartitionSpilledPartition() {
+Status NewPartitionedAggregationNode::RepartitionSpilledPartition() {
   DCHECK(!spilled_partitions_.empty());
   DCHECK(!is_streaming_preagg_);
   // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
@@ -1313,7 +1313,7 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() {
 }
 
 template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) {
+Status NewPartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) {
   DCHECK(!is_streaming_preagg_);
   if (input_stream->num_rows() > 0) {
     while (true) {
@@ -1340,7 +1340,7 @@ Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_str
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
+Status NewPartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
   int64_t max_freed_mem = 0;
   int partition_idx = -1;
 
@@ -1371,7 +1371,7 @@ Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
   return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
 }
 
-Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
+Status NewPartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
   DCHECK(!hash_partitions_.empty());
   std::stringstream ss;
   ss << "PA(node_id=" << id() << ") partitioned(level=" << hash_partitions_[0]->level
@@ -1415,7 +1415,7 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
   return Status::OK();
 }
 
-void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
+void NewPartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
   DCHECK(partition->is_spilled());
   DCHECK(partition->hash_tbl == nullptr);
   // Ensure all pages in the spilled partition's streams are unpinned by invalidating
@@ -1426,7 +1426,7 @@ void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
   spilled_partitions_.push_front(partition);
 }
 
-void PartitionedAggregationNode::ClosePartitions() {
+void NewPartitionedAggregationNode::ClosePartitions() {
   for (Partition* partition : hash_partitions_) {
     if (partition != nullptr) partition->Close(true);
   }
@@ -1439,7 +1439,7 @@ void PartitionedAggregationNode::ClosePartitions() {
   partition_pool_->clear();
 }
 
-//Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
+//Status NewPartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
 //  NewAggFnEvaluator::FreeLocalAllocations(agg_fn_evals_);
 //  for (Partition* partition : hash_partitions_) {
 //    if (partition != nullptr) {
@@ -1451,9 +1451,9 @@ void PartitionedAggregationNode::ClosePartitions() {
 //}
 
 // Instantiate required templates.
-template Status PartitionedAggregationNode::AppendSpilledRow<false>(
+template Status NewPartitionedAggregationNode::AppendSpilledRow<false>(
     Partition*, TupleRow*);
-template Status PartitionedAggregationNode::AppendSpilledRow<true>(Partition*, TupleRow*);
+template Status NewPartitionedAggregationNode::AppendSpilledRow<true>(Partition*, TupleRow*);
 
 }
 
diff --git a/be/src/exec/partitioned_aggregation_node.h b/be/src/exec/new_partitioned_aggregation_node.h
similarity index 98%
copy from be/src/exec/partitioned_aggregation_node.h
copy to be/src/exec/new_partitioned_aggregation_node.h
index 4a2f156..62a3da4 100644
--- a/be/src/exec/partitioned_aggregation_node.h
+++ b/be/src/exec/new_partitioned_aggregation_node.h
@@ -117,10 +117,10 @@ class SlotDescriptor;
 /// There are so many contexts in use that a plain "ctx" variable should never be used.
 /// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this.
 /// TODO: support an Init() method with an initial value in the UDAF interface.
-class PartitionedAggregationNode : public ExecNode {
+class NewPartitionedAggregationNode : public ExecNode {
  public:
   
-  PartitionedAggregationNode(ObjectPool* pool,
+  NewPartitionedAggregationNode(ObjectPool* pool,
       const TPlanNode& tnode, const DescriptorTbl& descs);
 
   virtual Status init(const TPlanNode& tnode, RuntimeState* state);
@@ -245,16 +245,16 @@ class PartitionedAggregationNode : public ExecNode {
   Partition* output_partition_;
   NewPartitionedHashTable::Iterator output_iterator_;
 
-  typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, RowBatch*);
+  typedef Status (*ProcessBatchNoGroupingFn)(NewPartitionedAggregationNode*, RowBatch*);
   /// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is disabled.
   ProcessBatchNoGroupingFn process_batch_no_grouping_fn_;
 
   typedef Status (*ProcessBatchFn)(
-      PartitionedAggregationNode*, RowBatch*, NewPartitionedHashTableCtx*);
+      NewPartitionedAggregationNode*, RowBatch*, NewPartitionedHashTableCtx*);
   /// Jitted ProcessBatch function pointer. Null if codegen is disabled.
   ProcessBatchFn process_batch_fn_;
 
-  typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool,
+  typedef Status (*ProcessBatchStreamingFn)(NewPartitionedAggregationNode*, bool,
       RowBatch*, RowBatch*, NewPartitionedHashTableCtx*, int[PARTITION_FANOUT]);
   /// Jitted ProcessBatchStreaming function pointer.  Null if codegen is disabled.
   ProcessBatchStreamingFn process_batch_streaming_fn_;
@@ -358,7 +358,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// initially use small buffers. Streaming pre-aggregations do not spill and do not
   /// require an unaggregated stream.
   struct Partition {
-    Partition(PartitionedAggregationNode* parent, int level, int idx)
+    Partition(NewPartitionedAggregationNode* parent, int level, int idx)
       : parent(parent), is_closed(false), level(level), idx(idx) {}
 
     ~Partition();
@@ -392,7 +392,7 @@ class PartitionedAggregationNode : public ExecNode {
 
     bool is_spilled() const { return hash_tbl.get() == NULL; }
 
-    PartitionedAggregationNode* parent;
+    NewPartitionedAggregationNode* parent;
 
     /// If true, this partition is closed and there is nothing left to do.
     bool is_closed;
diff --git a/be/src/exec/partitioned_aggregation_node_ir.cc b/be/src/exec/new_partitioned_aggregation_node_ir.cc
similarity index 93%
copy from be/src/exec/partitioned_aggregation_node_ir.cc
copy to be/src/exec/new_partitioned_aggregation_node_ir.cc
index ad5a52e..6674bbd 100644
--- a/be/src/exec/partitioned_aggregation_node_ir.cc
+++ b/be/src/exec/new_partitioned_aggregation_node_ir.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/partitioned_aggregation_node.h"
+#include "exec/new_partitioned_aggregation_node.h"
 
 #include "exec/new_partitioned_hash_table.inline.h"
 #include "exprs/new_agg_fn_evaluator.h"
@@ -27,7 +27,7 @@
 
 using namespace doris;
 
-Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
+Status NewPartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
   Tuple* output_tuple = singleton_output_tuple_;
   FOREACH_ROW(batch, 0, batch_iter) {
     UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.get());
@@ -36,7 +36,7 @@ Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
 }
 
 template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
+Status NewPartitionedAggregationNode::ProcessBatch(RowBatch* batch,
     NewPartitionedHashTableCtx* ht_ctx) {
   DCHECK(!hash_partitions_.empty());
   DCHECK(!is_streaming_preagg_);
@@ -64,7 +64,7 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
 }
 
 template<bool AGGREGATED_ROWS>
-void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
+void IR_ALWAYS_INLINE NewPartitionedAggregationNode::EvalAndHashPrefetchGroup(
     RowBatch* batch, int start_row_idx,
     NewPartitionedHashTableCtx* ht_ctx) {
   NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
@@ -95,7 +95,7 @@ void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
 }
 
 template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessRow(TupleRow* row,
+Status NewPartitionedAggregationNode::ProcessRow(TupleRow* row,
     NewPartitionedHashTableCtx* ht_ctx) {
   NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
   // Hoist lookups out of non-null branch to speed up non-null case.
@@ -138,7 +138,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* row,
 }
 
 template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AddIntermediateTuple(Partition* partition,
+Status NewPartitionedAggregationNode::AddIntermediateTuple(Partition* partition,
     TupleRow* row, uint32_t hash, NewPartitionedHashTable::Iterator insert_it) {
   while (true) {
     DCHECK(partition->aggregated_row_stream->is_pinned());
@@ -162,7 +162,7 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* partition,
   }
 }
 
-Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
+Status NewPartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
     RowBatch* in_batch, RowBatch* out_batch,
     NewPartitionedHashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
   DCHECK(is_streaming_preagg_);
@@ -212,7 +212,7 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
   return Status::OK();
 }
 
-bool PartitionedAggregationNode::TryAddToHashTable(
+bool NewPartitionedAggregationNode::TryAddToHashTable(
     NewPartitionedHashTableCtx* ht_ctx, Partition* partition,
     NewPartitionedHashTable* hash_tbl, TupleRow* in_row,
     uint32_t hash, int* remaining_capacity, Status* status) {
@@ -244,8 +244,8 @@ bool PartitionedAggregationNode::TryAddToHashTable(
 }
 
 // Instantiate required templates.
-template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
+template Status NewPartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
     NewPartitionedHashTableCtx*);
-template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
+template Status NewPartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
     NewPartitionedHashTableCtx*);
 
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc
index 4c68cdb..8d31a90 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -18,1081 +18,795 @@
 #include "exec/partitioned_aggregation_node.h"
 
 #include <math.h>
-#include <algorithm>
-#include <set>
 #include <sstream>
+#include <thrift/protocol/TDebugProtocol.h>
 
-#include "exec/new_partitioned_hash_table.h"
-#include "exec/new_partitioned_hash_table.inline.h"
-#include "exprs/new_agg_fn_evaluator.h"
-#include "exprs/anyval_util.h"
+#include "exec/partitioned_hash_table.inline.h"
+#include "exprs/agg_fn_evaluator.h"
+#include "exprs/expr.h"
 #include "exprs/expr_context.h"
-// #include "exprs/scalar_expr_evaluator.h"
 #include "exprs/slot_ref.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/buffered_tuple_stream3.inline.h"
+#include "runtime/buffered_tuple_stream2.inline.h"
 #include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
 #include "runtime/mem_pool.h"
-#include "runtime/mem_tracker.h"
 #include "runtime/raw_value.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/tuple_row.h"
 #include "runtime/tuple.h"
+#include "runtime/tuple_row.h"
 #include "udf/udf_internal.h"
+#include "util/runtime_profile.h"
+#include "util/stack_util.h"
 
 #include "gen_cpp/Exprs_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 
-#include "common/names.h"
-
-using namespace strings;
+using std::list;
 
 namespace doris {
 
-/// The minimum reduction factor (input rows divided by output rows) to grow hash tables
-/// in a streaming preaggregation, given that the hash tables are currently the given
-/// size or above. The sizes roughly correspond to hash table sizes where the bucket
-/// arrays will fit in  a cache level. Intuitively, we don't want the working set of the
-/// aggregation to expand to the next level of cache unless we're reducing the input
-/// enough to outweigh the increased memory latency we'll incur for each hash table
-/// lookup.
-///
-/// Note that the current reduction achieved is not always a good estimate of the
-/// final reduction. It may be biased either way depending on the ordering of the
-/// input. If the input order is random, we will underestimate the final reduction
-/// factor because the probability of a row having the same key as a previous row
-/// increases as more input is processed.  If the input order is correlated with the
-/// key, skew may bias the estimate. If high cardinality keys appear first, we
-/// may overestimate and if low cardinality keys appear first, we underestimate.
-/// To estimate the eventual reduction achieved, we estimate the final reduction
-/// using the planner's estimated input cardinality and the assumption that input
-/// is in a random order. This means that we assume that the reduction factor will
-/// increase over time.
-struct StreamingHtMinReductionEntry {
-  // Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in
-  // bytes is greater than this threshold.
-  int min_ht_mem;
-  // The minimum reduction factor to expand the hash tables.
-  double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the cache size
-// of the machine that we're running on.
-static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
-  // Expand up to L2 cache always.
-  {0, 0.0},
-  // Expand into L3 cache if we look like we're getting some reduction.
-  {256 * 1024, 1.1},
-  // Expand into main memory if we're getting a significant reduction.
-  {2 * 1024 * 1024, 2.0},
-};
-
-static const int STREAMING_HT_MIN_REDUCTION_SIZE =
-    sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
-
 PartitionedAggregationNode::PartitionedAggregationNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
-    intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
-    intermediate_tuple_desc_(descs.get_tuple_descriptor(intermediate_tuple_id_)),
-    intermediate_row_desc_(intermediate_tuple_desc_, false),
-    output_tuple_id_(tnode.agg_node.output_tuple_id),
-    output_tuple_desc_(descs.get_tuple_descriptor(output_tuple_id_)),
-    needs_finalize_(tnode.agg_node.need_finalize),
-    needs_serialize_(false),
-    output_partition_(NULL),
-    process_batch_no_grouping_fn_(NULL),
-    process_batch_fn_(NULL),
-    process_batch_streaming_fn_(NULL),
-    build_timer_(NULL),
-    ht_resize_timer_(NULL),
-    get_results_timer_(NULL),
-    num_hash_buckets_(NULL),
-    partitions_created_(NULL),
-    max_partition_level_(NULL),
-    num_row_repartitioned_(NULL),
-    num_repartitions_(NULL),
-    num_spilled_partitions_(NULL),
-    largest_partition_percent_(NULL),
-    streaming_timer_(NULL),
-    num_passthrough_rows_(NULL),
-    preagg_estimated_reduction_(NULL),
-    preagg_streaming_ht_min_reduction_(NULL),
-//    estimated_input_cardinality_(tnode.agg_node.estimated_input_cardinality),
-    singleton_output_tuple_(NULL),
-    singleton_output_tuple_returned_(true),
-    partition_eos_(false),
-    child_eos_(false),
-    partition_pool_(new ObjectPool()) {
-
-  DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
-
-    if (tnode.agg_node.__isset.use_streaming_preaggregation) {
-        is_streaming_preagg_ = tnode.agg_node.use_streaming_preaggregation;
-        if (is_streaming_preagg_) {
-            DCHECK(_conjunct_ctxs.empty()) << "Preaggs have no conjuncts";
-            DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
-            DCHECK(_limit == -1) << "Preaggs have no limits";
-        }
-    } else {
-        is_streaming_preagg_ = false;
-    }
+        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) :
+        ExecNode(pool, tnode, descs),
+        _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
+        _intermediate_tuple_desc(NULL),
+        _output_tuple_id(tnode.agg_node.output_tuple_id),
+        _output_tuple_desc(NULL),
+        _needs_finalize(tnode.agg_node.need_finalize),
+        _needs_serialize(false),
+        _block_mgr_client(NULL),
+        _output_partition(NULL),
+        _process_row_batch_fn(NULL),
+        _build_timer(NULL),
+        _ht_resize_timer(NULL),
+        _get_results_timer(NULL),
+        _num_hash_buckets(NULL),
+        _partitions_created(NULL),
+        // _max_partition_level(NULL),
+        _num_row_repartitioned(NULL),
+        _num_repartitions(NULL),
+        _singleton_output_tuple(NULL),
+        _singleton_output_tuple_returned(true),
+        _partition_pool(new ObjectPool()) {
+    DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
 }
 
 Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::init(tnode));
-  DCHECK(intermediate_tuple_desc_ != nullptr);
-  DCHECK(output_tuple_desc_ != nullptr);
-  DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
-
-  const RowDescriptor& row_desc = child(0)->row_desc();
-  RETURN_IF_ERROR(Expr::create(tnode.agg_node.grouping_exprs, row_desc,
-      state, &grouping_exprs_, mem_tracker()));
-  // Construct build exprs from intermediate_row_desc_
-  for (int i = 0; i < grouping_exprs_.size(); ++i) {
-    SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
-    //DCHECK(desc->type().type == TYPE_NULL || desc->type() == grouping_exprs_[i]->type());
-    // Hack to avoid TYPE_NULL SlotRefs.
-    SlotRef* build_expr = _pool->add(desc->type().type != TYPE_NULL ?
-        new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN));
-    build_exprs_.push_back(build_expr);
-    // TODO chenhao 
-    RETURN_IF_ERROR(build_expr->prepare(state, intermediate_row_desc_, nullptr));
-    if (build_expr->type().is_var_len_string_type()) string_grouping_exprs_.push_back(i);
-  }
-
-  int j = grouping_exprs_.size();
-  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
-    SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
-    SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
-    AggFn* agg_fn;
-    RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], row_desc,
-        *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
-    agg_fns_.push_back(agg_fn);
-    needs_serialize_ |= agg_fn->SupportsSerialize();
-  }
-  return Status::OK();
+    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+    RETURN_IF_ERROR(
+            Expr::create_expr_trees(_pool, tnode.agg_node.grouping_exprs, &_probe_expr_ctxs));
+    for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
+        AggFnEvaluator* evaluator = NULL;
+        RETURN_IF_ERROR(AggFnEvaluator::create(
+                    _pool, tnode.agg_node.aggregate_functions[i], &evaluator));
+        _aggregate_evaluators.push_back(evaluator);
+    }
+    return Status::OK();
 }
 
 Status PartitionedAggregationNode::prepare(RuntimeState* state) {
-  SCOPED_TIMER(_runtime_profile->total_time_counter());
-
-  RETURN_IF_ERROR(ExecNode::prepare(state));
-  state_ = state;
-
-  mem_pool_.reset(new MemPool(mem_tracker()));
-  agg_fn_pool_.reset(new MemPool(expr_mem_tracker()));
-
-  ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime");
-  get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
-  num_hash_buckets_ =
-      ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
-  partitions_created_ =
-      ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
-  largest_partition_percent_ =
-      runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT);
-  if (is_streaming_preagg_) {
-    runtime_profile()->append_exec_option("Streaming Preaggregation");
-    streaming_timer_ = ADD_TIMER(runtime_profile(), "StreamingTime");
-    num_passthrough_rows_ =
-        ADD_COUNTER(runtime_profile(), "RowsPassedThrough", TUnit::UNIT);
-    preagg_estimated_reduction_ = ADD_COUNTER(
-        runtime_profile(), "ReductionFactorEstimate", TUnit::DOUBLE_VALUE);
-    preagg_streaming_ht_min_reduction_ = ADD_COUNTER(
-        runtime_profile(), "ReductionFactorThresholdToExpand", TUnit::DOUBLE_VALUE);
-  } else {
-    build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
-    num_row_repartitioned_ =
-        ADD_COUNTER(runtime_profile(), "RowsRepartitioned", TUnit::UNIT);
-    num_repartitions_ =
-        ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
-    num_spilled_partitions_ =
-        ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
-    max_partition_level_ = runtime_profile()->AddHighWaterMarkCounter(
-        "MaxPartitionLevel", TUnit::UNIT);
-  }
-  // TODO chenhao
-  const RowDescriptor& row_desc = child(0)->row_desc();
-  RETURN_IF_ERROR(NewAggFnEvaluator::Create(agg_fns_, state, _pool, agg_fn_pool_.get(),
-      &agg_fn_evals_, expr_mem_tracker(), row_desc));
-  
-  expr_results_pool_.reset(new MemPool(_expr_mem_tracker.get()));
-  if (!grouping_exprs_.empty()) {
-    RowDescriptor build_row_desc(intermediate_tuple_desc_, false);
-    RETURN_IF_ERROR(NewPartitionedHashTableCtx::Create(_pool, state, build_exprs_,
-        grouping_exprs_, true, vector<bool>(build_exprs_.size(), true),
-        state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(),
-        expr_results_pool_.get(), expr_mem_tracker(), build_row_desc, row_desc, &ht_ctx_));
-  }
-  // AddCodegenDisabledMessage(state);
-  return Status::OK();
-}
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _state = state;
+
+    _mem_pool.reset(new MemPool(mem_tracker()));
+    _agg_fn_pool.reset(new MemPool(expr_mem_tracker()));
+
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _ht_resize_timer = ADD_TIMER(runtime_profile(), "HTResizeTime");
+    _get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
+    _num_hash_buckets = ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
+    _partitions_created = ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
+    // _max_partition_level = runtime_profile()->AddHighWaterMarkCounter(
+    //         "MaxPartitionLevel", TUnit::UNIT);
+    _num_row_repartitioned = ADD_COUNTER(
+            runtime_profile(), "RowsRepartitioned", TUnit::UNIT);
+    _num_repartitions = ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
+    _num_spilled_partitions = ADD_COUNTER(
+            runtime_profile(), "SpilledPartitions", TUnit::UNIT);
+    // _largest_partition_percent = runtime_profile()->AddHighWaterMarkCounter(
+    //         "LargestPartitionPercent", TUnit::UNIT);
+
+    _intermediate_tuple_desc =
+        state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
+    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+    DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
 
-Status PartitionedAggregationNode::open(RuntimeState* state) {
-  SCOPED_TIMER(_runtime_profile->total_time_counter());
-  // Open the child before consuming resources in this node.
-  RETURN_IF_ERROR(child(0)->open(state));
-  RETURN_IF_ERROR(ExecNode::open(state));
-
-  // Claim reservation after the child has been opened to reduce the peak reservation
-  // requirement.
-  if (!_buffer_pool_client.is_registered() && !grouping_exprs_.empty()) {
-    DCHECK_GE(_resource_profile.min_reservation, MinReservation());
-    RETURN_IF_ERROR(claim_buffer_reservation(state));
-  }
-
-  if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state));
-  RETURN_IF_ERROR(NewAggFnEvaluator::Open(agg_fn_evals_, state));
-  if (grouping_exprs_.empty()) {
-    // Create the single output tuple for this non-grouping agg. This must happen after
-    // opening the aggregate evaluators.
-    singleton_output_tuple_ =
-        ConstructSingletonOutputTuple(agg_fn_evals_, mem_pool_.get());
-    // Check for failures during NewAggFnEvaluator::Init().
-    RETURN_IF_ERROR(state_->query_status());
-    singleton_output_tuple_returned_ = false;
-  } else {
-    if (ht_allocator_ == nullptr) {
-      // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call.
-      ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(),
-          &_buffer_pool_client, _resource_profile.spillable_buffer_size));
-
-      if (!is_streaming_preagg_ && needs_serialize_) {
-        serialize_stream_.reset(new BufferedTupleStream3(state, &intermediate_row_desc_,
-            &_buffer_pool_client, _resource_profile.spillable_buffer_size,
-            _resource_profile.max_row_buffer_size));
-        RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
-        bool got_buffer;
-        // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up
-        // another buffer during spilling.
-        RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
-        DCHECK(got_buffer)
-            << "Accounted in min reservation" << _buffer_pool_client.DebugString();
-        DCHECK(serialize_stream_->has_write_iterator());
-      }
-    }
-    RETURN_IF_ERROR(CreateHashPartitions(0));
-  }
-
-  // Streaming preaggregations do all processing in GetNext().
-  if (is_streaming_preagg_) return Status::OK();
-
-  RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
-  // Read all the rows from the child and process them.
-  bool eos = false;
-  do {
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(state->check_query_state(
-            "New partitioned aggregation, while getting next from child 0."));
-    RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
-    if (UNLIKELY(VLOG_ROW_IS_ON)) {
-      for (int i = 0; i < batch.num_rows(); ++i) {
-        TupleRow* row = batch.get_row(i);
-        VLOG_ROW << "input row: " << row->to_string(_children[0]->row_desc());
-      }
+    RETURN_IF_ERROR(
+            Expr::prepare(_probe_expr_ctxs, state, child(0)->row_desc(), expr_mem_tracker()));
+    // AddExprCtxsToFree(_probe_expr_ctxs);
+
+    _contains_var_len_grouping_exprs = false;
+    // Construct build exprs from _intermediate_agg_tuple_desc
+    for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
+        SlotDescriptor* desc = _intermediate_tuple_desc->slots()[i];
+        DCHECK(desc->type().type == TYPE_NULL ||
+                desc->type().type == _probe_expr_ctxs[i]->root()->type().type);
+        // Hack to avoid TYPE_NULL SlotRefs.
+        Expr* expr = desc->type().type != TYPE_NULL ?
+            new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN);
+        state->obj_pool()->add(expr);
+        _build_expr_ctxs.push_back(new ExprContext(expr));
+        state->obj_pool()->add(_build_expr_ctxs.back());
+        _contains_var_len_grouping_exprs |= expr->type().is_string_type();
     }
-
-    SCOPED_TIMER(build_timer_);
-    if (grouping_exprs_.empty()) {
-      if (process_batch_no_grouping_fn_ != NULL) {
-        RETURN_IF_ERROR(process_batch_no_grouping_fn_(this, &batch));
-      } else {
-        RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch));
-      }
-    } else {
-      // There is grouping, so we will do partitioned aggregation.
-      if (process_batch_fn_ != NULL) {
-        RETURN_IF_ERROR(process_batch_fn_(this, &batch, ht_ctx_.get()));
-      } else {
-        RETURN_IF_ERROR(ProcessBatch<false>(&batch, ht_ctx_.get()));
-      }
+    // Construct a new row desc for preparing the build exprs because neither the child's
+    // nor this node's output row desc may contain the intermediate tuple, e.g.,
+    // in a single-node plan with an intermediate tuple different from the output tuple.
+    _intermediate_row_desc.reset(new RowDescriptor(_intermediate_tuple_desc, false));
+    RETURN_IF_ERROR(
+            Expr::prepare(_build_expr_ctxs, state, *_intermediate_row_desc, expr_mem_tracker()));
+    // AddExprCtxsToFree(_build_expr_ctxs);
+
+    int j = _probe_expr_ctxs.size();
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
+        // Skip non-materialized slots; we don't have evaluators instantiated for those.
+        while (!_intermediate_tuple_desc->slots()[j]->is_materialized()) {
+            DCHECK_LT(j, _intermediate_tuple_desc->slots().size() - 1)
+                << "#eval= " << _aggregate_evaluators.size()
+                << " #probe=" << _probe_expr_ctxs.size();
+            ++j;
+        }
+        // SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
+        SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
+        FunctionContext* agg_fn_ctx = NULL;
+        // RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(),
+        //             intermediate_slot_desc, output_slot_desc, _agg_fn_pool.get(), &agg_fn_ctx));
+        RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(),
+                        _agg_fn_pool.get(), output_slot_desc, output_slot_desc,
+                        expr_mem_tracker(), &agg_fn_ctx));
+        _agg_fn_ctxs.push_back(agg_fn_ctx);
+        state->obj_pool()->add(agg_fn_ctx);
+        _needs_serialize |= _aggregate_evaluators[i]->supports_serialize();
     }
-    batch.reset();
-  } while (!eos);
-
-  // The child can be closed at this point in most cases because we have consumed all of
-  // the input from the child and transfered ownership of the resources we need. The
-  // exception is if we are inside a subplan expecting to call Open()/GetNext() on the
-  // child again,
-  if (!is_in_subplan()) child(0)->close(state);
-  child_eos_ = true;
-
-  // Done consuming child(0)'s input. Move all the partitions in hash_partitions_
-  // to spilled_partitions_ or aggregated_partitions_. We'll finish the processing in
-  // GetNext().
-  if (!grouping_exprs_.empty()) {
-    RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned()));
-  }
-  return Status::OK();
-}
 
-Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch,
-    bool* eos) {
-  int first_row_idx = row_batch->num_rows();
-  RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
-  RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx));
-  return Status::OK();
-}
-
-Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
-    int first_row_idx) {
-  if (!needs_finalize_ && !needs_serialize_) return Status::OK();
-  // String data returned by Serialize() or Finalize() is from local expr allocations in
-  // the agg function contexts, and will be freed on the next GetNext() call by
-  // FreeLocalAllocations(). The data either needs to be copied out now or sent up the
-  // plan and copied out by a blocking ancestor. (See IMPALA-3311)
-  for (const AggFn* agg_fn : agg_fns_) {
-    const SlotDescriptor& slot_desc = agg_fn->output_slot_desc();
-    DCHECK(!slot_desc.type().is_collection_type()) << "producing collections NYI";
-    if (!slot_desc.type().is_var_len_string_type()) continue;
-    if (is_in_subplan()) {
-      // Copy string data to the row batch's pool. This is more efficient than
-      // MarkNeedsDeepCopy() in a subplan since we are likely producing many small
-      // batches.
-      RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch,
-          first_row_idx, row_batch->tuple_data_pool()));
+    if (_probe_expr_ctxs.empty()) {
+        // Create single output tuple now; we need to output something
+        // even if our input is empty.
+        _singleton_output_tuple =
+                construct_intermediate_tuple(_agg_fn_ctxs, _mem_pool.get(), NULL, NULL);
+        // Check for failures during AggFnEvaluator::init().
+        RETURN_IF_ERROR(_state->query_status());
+        _singleton_output_tuple_returned = false;
     } else {
-      row_batch->mark_needs_deep_copy();
-      break;
+        _ht_ctx.reset(new PartitionedHashTableCtx(_build_expr_ctxs, _probe_expr_ctxs, true, true,
+                state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1));
+        RETURN_IF_ERROR(_state->block_mgr2()->register_client(
+                    min_required_buffers(), mem_tracker(), state, &_block_mgr_client));
+        RETURN_IF_ERROR(create_hash_partitions(0));
     }
-  }
-  return Status::OK();
-}
 
-Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc,
-    RowBatch* row_batch, int first_row_idx, MemPool* pool) {
-  DCHECK(slot_desc.type().is_var_len_string_type());
-  DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
-  FOREACH_ROW(row_batch, first_row_idx, batch_iter) {
-    Tuple* tuple = batch_iter.get()->get_tuple(0);
-    StringValue* sv = reinterpret_cast<StringValue*>(
-        tuple->get_slot(slot_desc.tuple_offset()));
-    if (sv == NULL || sv->len == 0) continue;
-    char* new_ptr = reinterpret_cast<char*>(pool->try_allocate(sv->len));
-    if (UNLIKELY(new_ptr == NULL)) {
-      string details = Substitute("Cannot perform aggregation at node with id $0."
-          " Failed to allocate $1 output bytes.", _id, sv->len);
-      return pool->mem_tracker()->MemLimitExceeded(state_, details, sv->len);
+    // TODO: Is there a need to create the stream here? If memory reservations work we may
+    // be able to create this stream lazily and only whenever we need to spill.
+    if (_needs_serialize && _block_mgr_client != NULL) {
+        _serialize_stream.reset(new BufferedTupleStream2(state, *_intermediate_row_desc,
+                    state->block_mgr2(), _block_mgr_client, false /* use_initial_small_buffers */,
+                    false /* read_write */));
+        RETURN_IF_ERROR(_serialize_stream->init(id(), runtime_profile(), false));
+        DCHECK(_serialize_stream->has_write_block());
     }
-    memcpy(new_ptr, sv->ptr, sv->len);
-    sv->ptr = new_ptr;
-  }
-  return Status::OK();
-}
 
-Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
-    RowBatch* row_batch, bool* eos) {
-  SCOPED_TIMER(_runtime_profile->total_time_counter());
-  RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
-  RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(state->check_query_state("New partitioned aggregation, while getting next."));
-  // clear tmp expr result alocations
-  expr_results_pool_->clear();
-
-  if (reached_limit()) {
-    *eos = true;
-    return Status::OK();
-  }
-
-  if (grouping_exprs_.empty()) {
-    // There was no grouping, so evaluate the conjuncts and return the single result row.
-    // We allow calling GetNext() after eos, so don't return this row again.
-    if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
-    singleton_output_tuple_returned_ = true;
-    *eos = true;
     return Status::OK();
-  }
-
-  if (!child_eos_) {
-    // For streaming preaggregations, we process rows from the child as we go.
-    DCHECK(is_streaming_preagg_);
-    RETURN_IF_ERROR(GetRowsStreaming(state, row_batch));
-  } else if (!partition_eos_) {
-    RETURN_IF_ERROR(GetRowsFromPartition(state, row_batch));
-  }
-
-  *eos = partition_eos_ && child_eos_;
-  COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-  return Status::OK();
 }
 
-void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
-  DCHECK(grouping_exprs_.empty());
-  int row_idx = row_batch->add_row();
-  TupleRow* row = row_batch->get_row(row_idx);
-  Tuple* output_tuple = GetOutputTuple(agg_fn_evals_,
-      singleton_output_tuple_, row_batch->tuple_data_pool());
-  row->set_tuple(0, output_tuple);
-  if (ExecNode::eval_conjuncts(
-          _conjunct_ctxs.data(), _conjunct_ctxs.size(), row)) {
-    row_batch->commit_last_row();
-    ++_num_rows_returned;
-    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-  }
-  // Keep the current chunk to amortize the memory allocation over a series
-  // of Reset()/Open()/GetNext()* calls.
-  row_batch->tuple_data_pool()->acquire_data(mem_pool_.get(), true);
-  // This node no longer owns the memory for singleton_output_tuple_.
-  singleton_output_tuple_ = NULL;
-}
+Status PartitionedAggregationNode::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::open(state));
 
-Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
-    RowBatch* row_batch) {
-  DCHECK(!row_batch->at_capacity());
-  if (output_iterator_.AtEnd()) {
-    // Done with this partition, move onto the next one.
-    if (output_partition_ != NULL) {
-      output_partition_->Close(false);
-      output_partition_ = NULL;
-    }
-    if (aggregated_partitions_.empty() && spilled_partitions_.empty()) {
-      // No more partitions, all done.
-      partition_eos_ = true;
-      return Status::OK();
-    }
-    // Process next partition.
-    RETURN_IF_ERROR(NextPartition());
-    DCHECK(output_partition_ != NULL);
-  }
-
-  SCOPED_TIMER(get_results_timer_);
-  int count = 0;
-  const int N = BitUtil::next_power_of_two(state->batch_size());
-  // Keeping returning rows from the current partition.
-  while (!output_iterator_.AtEnd()) {
-    // This loop can go on for a long time if the conjuncts are very selective. Do query
-    // maintenance every N iterations.
-    if ((count++ & (N - 1)) == 0) {
-      RETURN_IF_CANCELLED(state);
-      RETURN_IF_ERROR(state->check_query_state(
-            "New partitioned aggregation, while getting rows from partition."));
-    }
+    RETURN_IF_ERROR(Expr::open(_probe_expr_ctxs, state));
+    RETURN_IF_ERROR(Expr::open(_build_expr_ctxs, state));
 
-    int row_idx = row_batch->add_row();
-    TupleRow* row = row_batch->get_row(row_idx);
-    Tuple* intermediate_tuple = output_iterator_.GetTuple();
-    Tuple* output_tuple = GetOutputTuple(
-        output_partition_->agg_fn_evals, intermediate_tuple, row_batch->tuple_data_pool());
-    output_iterator_.Next();
-    row->set_tuple(0, output_tuple);
-    // TODO chenhao
-    // DCHECK_EQ(_conjunct_ctxs.size(), _conjuncts.size());
-    if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), row)) {
-      row_batch->commit_last_row();
-      ++_num_rows_returned;
-      if (reached_limit() || row_batch->at_capacity()) {
-        break;
-      }
+    DCHECK_EQ(_aggregate_evaluators.size(), _agg_fn_ctxs.size());
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state, _agg_fn_ctxs[i]));
     }
-  }
 
-  COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-  partition_eos_ = reached_limit();
-  if (output_iterator_.AtEnd()) row_batch->mark_needs_deep_copy();
+    // Read all the rows from the child and process them.
+    RETURN_IF_ERROR(_children[0]->open(state));
+    RowBatch batch(_children[0]->row_desc(), state->batch_size(), mem_tracker());
+    bool eos = false;
+    do {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while getting next from child 0."));
+        RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
+
+        if (UNLIKELY(VLOG_ROW_IS_ON)) {
+            for (int i = 0; i < batch.num_rows(); ++i) {
+                TupleRow* row = batch.get_row(i);
+                VLOG_ROW << "partition-agg-node input row: "
+                        << row->to_string(_children[0]->row_desc());
+            }
+        }
 
-  return Status::OK();
-}
+        SCOPED_TIMER(_build_timer);
+        if (_process_row_batch_fn != NULL) {
+            RETURN_IF_ERROR(_process_row_batch_fn(this, &batch, _ht_ctx.get()));
+        } else if (_probe_expr_ctxs.empty()) {
+            RETURN_IF_ERROR(process_batch_no_grouping(&batch));
+        } else {
+            // VLOG_ROW << "partition-agg-node batch: " << batch->to_string();
+            // There is grouping, so we will do partitioned aggregation.
+            RETURN_IF_ERROR(process_batch<false>(&batch, _ht_ctx.get()));
+        }
+        batch.reset();
+    } while (!eos);
 
-Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
-    RowBatch* out_batch) {
-  DCHECK(!child_eos_);
-  DCHECK(is_streaming_preagg_);
+    // Unless we are inside a subplan expecting to call open()/get_next() on the child
+    // again, the child can be closed at this point. We have consumed all of the input
+    // from the child and transfered ownership of the resources we need.
+    // if (!IsInSubplan()) {
+    child(0)->close(state);
+    // }
 
-  if (child_batch_ == NULL) {
-    child_batch_.reset(new RowBatch(child(0)->row_desc(), state->batch_size(),
-        mem_tracker()));
-  }
+    // Done consuming child(0)'s input. Move all the partitions in _hash_partitions
+    // to _spilled_partitions/_aggregated_partitions. We'll finish the processing in
+    // get_next().
+    if (!_probe_expr_ctxs.empty()) {
+        RETURN_IF_ERROR(move_hash_partitions(child(0)->rows_returned()));
+    }
+    return Status::OK();
+}
 
-  do {
-    DCHECK_EQ(out_batch->num_rows(), 0);
+Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(state->check_query_state(
-            "New partitioned aggregation, while getting rows in streaming."));
-
-    RETURN_IF_ERROR(child(0)->get_next(state, child_batch_.get(), &child_eos_));
-    SCOPED_TIMER(streaming_timer_);
+    RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, before evaluating conjuncts."));
 
-    int remaining_capacity[PARTITION_FANOUT];
-    bool ht_needs_expansion = false;
-    for (int i = 0; i < PARTITION_FANOUT; ++i) {
-      NewPartitionedHashTable* hash_tbl = GetHashTable(i);
-      remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
-      ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
+    if (reached_limit()) {
+        *eos = true;
+        return Status::OK();
     }
 
-    // Stop expanding hash tables if we're not reducing the input sufficiently. As our
-    // hash tables expand out of each level of cache hierarchy, every hash table lookup
-    // will take longer. We also may not be able to expand hash tables because of memory
-    // pressure. In this case HashTable::CheckAndResize() will fail. In either case we
-    // should always use the remaining space in the hash table to avoid wasting memory.
-    if (ht_needs_expansion && ShouldExpandPreaggHashTables()) {
-      for (int i = 0; i < PARTITION_FANOUT; ++i) {
-        NewPartitionedHashTable* ht = GetHashTable(i);
-        if (remaining_capacity[i] < child_batch_->num_rows()) {
-          SCOPED_TIMER(ht_resize_timer_);
-          bool resized;
-          RETURN_IF_ERROR(
-              ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get(), &resized));
-          if (resized) {
-            remaining_capacity[i] = ht->NumInsertsBeforeResize();
-          }
+    ExprContext** ctxs = &_conjunct_ctxs[0];
+    int num_ctxs = _conjunct_ctxs.size();
+    if (_probe_expr_ctxs.empty()) {
+        // There was grouping, so evaluate the conjuncts and return the single result row.
+        // We allow calling get_next() after eos, so don't return this row again.
+        if (!_singleton_output_tuple_returned) {
+            int row_idx = row_batch->add_row();
+            TupleRow* row = row_batch->get_row(row_idx);
+            Tuple* output_tuple = get_output_tuple(
+                    _agg_fn_ctxs, _singleton_output_tuple, row_batch->tuple_data_pool());
+            row->set_tuple(0, output_tuple);
+            if (ExecNode::eval_conjuncts(ctxs, num_ctxs, row)) {
+                row_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+            _singleton_output_tuple_returned = true;
         }
-      }
+        // Keep the current chunk to amortize the memory allocation over a series
+        // of reset()/open()/get_next()* calls.
+        row_batch->tuple_data_pool()->acquire_data(_mem_pool.get(), true);
+        *eos = true;
+        COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+        return Status::OK();
     }
 
-    if (process_batch_streaming_fn_ != NULL) {
-      RETURN_IF_ERROR(process_batch_streaming_fn_(this, needs_serialize_,
-          child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
-    } else {
-      RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_,
-          child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
+    if (_output_iterator.at_end()) {
+        // Done with this partition, move onto the next one.
+        if (_output_partition != NULL) {
+            _output_partition->close(false);
+            _output_partition = NULL;
+        }
+        if (_aggregated_partitions.empty() && _spilled_partitions.empty()) {
+            // No more partitions, all done.
+            *eos = true;
+            return Status::OK();
+        }
+        // Process next partition.
+        RETURN_IF_ERROR(next_partition());
+        DCHECK(_output_partition != NULL);
     }
 
-    child_batch_->reset(); // All rows from child_batch_ were processed.
-  } while (out_batch->num_rows() == 0 && !child_eos_);
-
-  if (child_eos_) {
-    child(0)->close(state);
-    child_batch_.reset();
-     RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned()));
-  }
-
-  _num_rows_returned += out_batch->num_rows();
-  COUNTER_SET(num_passthrough_rows_, _num_rows_returned);
-  return Status::OK();
-}
+    SCOPED_TIMER(_get_results_timer);
+    int count = 0;
+    const int N = BitUtil::next_power_of_two(state->batch_size());
+    // Keeping returning rows from the current partition.
+    while (!_output_iterator.at_end() && !row_batch->at_capacity()) {
+        // This loop can go on for a long time if the conjuncts are very selective. Do query
+        // maintenance every N iterations.
+        if ((count++ & (N - 1)) == 0) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while evaluating conjuncts."));
+        }
 
-bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
-  int64_t ht_mem = 0;
-  int64_t ht_rows = 0;
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    NewPartitionedHashTable* ht = hash_partitions_[i]->hash_tbl.get();
-    ht_mem += ht->CurrentMemSize();
-    ht_rows += ht->size();
-  }
-
-  // Need some rows in tables to have valid statistics.
-  if (ht_rows == 0) return true;
-
-  // Find the appropriate reduction factor in our table for the current hash table sizes.
-  int cache_level = 0;
-  while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
-      ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
-    ++cache_level;
-  }
-
-  // Compare the number of rows in the hash table with the number of input rows that
-  // were aggregated into it. Exclude passed through rows from this calculation since
-  // they were not in hash tables.
-  const int64_t input_rows = _children[0]->rows_returned();
-  const int64_t aggregated_input_rows = input_rows - _num_rows_returned;
-  // TODO chenhao
-//  const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_;
-  double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows;
-
-  // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be
-  // inaccurate, which could lead to a divide by zero below.
-  if (aggregated_input_rows <= 0) return true;
-
-  // Extrapolate the current reduction factor (r) using the formula
-  // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data
-  // set, N is the number of input rows, excluding passed-through rows, and n is the
-  // number of rows inserted or merged into the hash tables. This is a very rough
-  // approximation but is good enough to be useful.
-  // TODO: consider collecting more statistics to better estimate reduction.
-//  double estimated_reduction = aggregated_input_rows >= expected_input_rows
-//      ? current_reduction
-//      : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1);
-  double min_reduction =
-    STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
-//  COUNTER_SET(preagg_estimated_reduction_, estimated_reduction);
-  COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
-//  return estimated_reduction > min_reduction;
-  return current_reduction > min_reduction;
+        int row_idx = row_batch->add_row();
+        TupleRow* row = row_batch->get_row(row_idx);
+        Tuple* intermediate_tuple = _output_iterator.get_tuple();
+        Tuple* output_tuple = get_output_tuple(
+                _output_partition->agg_fn_ctxs, intermediate_tuple, row_batch->tuple_data_pool());
+        _output_iterator.next();
+        row->set_tuple(0, output_tuple);
+        if (ExecNode::eval_conjuncts(ctxs, num_ctxs, row)) {
+            row_batch->commit_last_row();
+            ++_num_rows_returned;
+            if (reached_limit()) {
+                break; // TODO: remove this check? is this expensive?
+            }
+        }
+    }
+    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+    *eos = reached_limit();
+    if (_output_iterator.at_end()) {
+        row_batch->mark_need_to_return();
+    }
+    return Status::OK();
 }
 
-void PartitionedAggregationNode::CleanupHashTbl(
-    const vector<NewAggFnEvaluator*>& agg_fn_evals, NewPartitionedHashTable::Iterator it) {
-  if (!needs_finalize_ && !needs_serialize_) return;
-
-  // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
-  // them in order to free any memory allocated by UDAs.
-  if (needs_finalize_) {
-    // Finalize() requires a dst tuple but we don't actually need the result,
-    // so allocate a single dummy tuple to avoid accumulating memory.
-    Tuple* dummy_dst = NULL;
-    dummy_dst = Tuple::create(output_tuple_desc_->byte_size(), mem_pool_.get());
-    while (!it.AtEnd()) {
-      Tuple* tuple = it.GetTuple();
-      NewAggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst);
-      it.Next();
+void PartitionedAggregationNode::cleanup_hash_tbl(
+        const vector<FunctionContext*>& agg_fn_ctxs, PartitionedHashTable::Iterator it) {
+    if (!_needs_finalize && !_needs_serialize) {
+        return;
     }
-  } else {
-    while (!it.AtEnd()) {
-      Tuple* tuple = it.GetTuple();
-      NewAggFnEvaluator::Serialize(agg_fn_evals, tuple);
-      it.Next();
+
+    // Iterate through the remaining rows in the hash table and call serialize/finalize on
+    // them in order to free any memory allocated by UDAs.
+    if (_needs_finalize) {
+        // finalize() requires a dst tuple but we don't actually need the result,
+        // so allocate a single dummy tuple to avoid accumulating memory.
+        Tuple* dummy_dst = NULL;
+        dummy_dst = Tuple::create(_output_tuple_desc->byte_size(), _mem_pool.get());
+        while (!it.at_end()) {
+            Tuple* tuple = it.get_tuple();
+            AggFnEvaluator::finalize(_aggregate_evaluators, agg_fn_ctxs, tuple, dummy_dst);
+            it.next();
+        }
+    } else {
+        while (!it.at_end()) {
+            Tuple* tuple = it.get_tuple();
+            AggFnEvaluator::serialize(_aggregate_evaluators, agg_fn_ctxs, tuple);
+            it.next();
+        }
     }
-  }
 }
 
 Status PartitionedAggregationNode::reset(RuntimeState* state) {
-  DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
-  if (!grouping_exprs_.empty()) {
-    child_eos_ = false;
-    partition_eos_ = false;
-    // Reset the HT and the partitions for this grouping agg.
-    ht_ctx_->set_level(0);
-    ClosePartitions();
-  }
-  return ExecNode::reset(state);
+    if (_probe_expr_ctxs.empty()) {
+        // Re-create the single output tuple for this non-grouping agg.
+        _singleton_output_tuple =
+            construct_intermediate_tuple(_agg_fn_ctxs, _mem_pool.get(), NULL, NULL);
+        // Check for failures during AggFnEvaluator::init().
+        RETURN_IF_ERROR(_state->query_status());
+        _singleton_output_tuple_returned = false;
+    } else {
+        // Reset the HT and the partitions for this grouping agg.
+        _ht_ctx->set_level(0);
+        close_partitions();
+        create_hash_partitions(0);
+    }
+    // return ExecNode::reset(state);
+    return Status::OK();
 }
 
 Status PartitionedAggregationNode::close(RuntimeState* state) {
-  if (is_closed()) return Status::OK();
-
-  if (!singleton_output_tuple_returned_) {
-    GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, mem_pool_.get());
-  }
-
-  // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
-  // them in order to free any memory allocated by UDAs
-  if (output_partition_ != NULL) {
-    CleanupHashTbl(output_partition_->agg_fn_evals, output_iterator_);
-    output_partition_->Close(false);
-  }
-
-  ClosePartitions();
-  child_batch_.reset();
-
-  // Close all the agg-fn-evaluators
-  NewAggFnEvaluator::Close(agg_fn_evals_, state);
-
-  if (expr_results_pool_.get() != nullptr) {
-      expr_results_pool_->free_all();
-  }
-  if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->free_all();
-  if (mem_pool_.get() != nullptr) mem_pool_->free_all();
-  if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
-  ht_ctx_.reset();
-  if (serialize_stream_.get() != nullptr) {
-    serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  }
-  Expr::close(grouping_exprs_);
-  Expr::close(build_exprs_);
-  AggFn::Close(agg_fns_);
-  return ExecNode::close(state);
-}
+    if (is_closed()) {
+        return Status::OK();
+    }
+
+    if (!_singleton_output_tuple_returned) {
+        DCHECK_EQ(_agg_fn_ctxs.size(), _aggregate_evaluators.size());
+        get_output_tuple(_agg_fn_ctxs, _singleton_output_tuple, _mem_pool.get());
+    }
+
+    // Iterate through the remaining rows in the hash table and call serialize/finalize on
+    // them in order to free any memory allocated by UDAs
+    if (_output_partition != NULL) {
+        cleanup_hash_tbl(_output_partition->agg_fn_ctxs, _output_iterator);
+        _output_partition->close(false);
+    }
+
+    close_partitions();
+
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        _aggregate_evaluators[i]->close(state);
+    }
+    for (int i = 0; i < _agg_fn_ctxs.size(); ++i) {
+        _agg_fn_ctxs[i]->impl()->close();
+    }
+    if (_agg_fn_pool.get() != NULL) {
+        _agg_fn_pool->free_all();
+    }
+    if (_mem_pool.get() != NULL) {
+        _mem_pool->free_all();
+    }
+    if (_ht_ctx.get() != NULL) {
+        _ht_ctx->close();
+    }
+    if (_serialize_stream.get() != NULL) {
+        _serialize_stream->close();
+    }
+
+    if (_block_mgr_client != NULL) {
+        state->block_mgr2()->clear_reservations(_block_mgr_client);
+    }
 
-PartitionedAggregationNode::Partition::~Partition() {
-  DCHECK(is_closed);
+    Expr::close(_probe_expr_ctxs, state);
+    Expr::close(_build_expr_ctxs, state);
+    return ExecNode::close(state);
 }
 
-Status PartitionedAggregationNode::Partition::InitStreams() {
-  agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
-  DCHECK_EQ(agg_fn_evals.size(), 0);
-  NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(),
-      parent->agg_fn_evals_, &agg_fn_evals);
-
-  // Varlen aggregate function results are stored outside of aggregated_row_stream because
-  // BufferedTupleStream3 doesn't support relocating varlen data stored in the stream.
-  auto agg_slot = parent->intermediate_tuple_desc_->slots().begin() +
-      parent->grouping_exprs_.size();
-  std::set<SlotId> external_varlen_slots;
-  for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); ++agg_slot) {
-    if ((*agg_slot)->type().is_var_len_string_type()) {
-      external_varlen_slots.insert((*agg_slot)->id());
+Status PartitionedAggregationNode::Partition::init_streams() {
+    agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
+    DCHECK_EQ(agg_fn_ctxs.size(), 0);
+    for (int i = 0; i < parent->_agg_fn_ctxs.size(); ++i) {
+        agg_fn_ctxs.push_back(parent->_agg_fn_ctxs[i]->impl()->clone(agg_fn_pool.get()));
+        parent->_partition_pool->add(agg_fn_ctxs[i]);
     }
-  }
-
-  aggregated_row_stream.reset(new BufferedTupleStream3(parent->state_,
-      &parent->intermediate_row_desc_, &parent->_buffer_pool_client,
-      parent->_resource_profile.spillable_buffer_size,
-      parent->_resource_profile.max_row_buffer_size, external_varlen_slots));
-  RETURN_IF_ERROR(
-      aggregated_row_stream->Init(parent->id(), true));
-  bool got_buffer;
-  RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
-  DCHECK(got_buffer) << "Buffer included in reservation " << parent->_id << "\n"
-                     << parent->_buffer_pool_client.DebugString() << "\n"
-                     << parent->DebugString(2);
-
-  if (!parent->is_streaming_preagg_) {
-    unaggregated_row_stream.reset(new BufferedTupleStream3(parent->state_,
-        &(parent->child(0)->row_desc()), &parent->_buffer_pool_client,
-        parent->_resource_profile.spillable_buffer_size,
-        parent->_resource_profile.max_row_buffer_size));
+
+    aggregated_row_stream.reset(new BufferedTupleStream2(parent->_state,
+                *parent->_intermediate_row_desc, parent->_state->block_mgr2(),
+                parent->_block_mgr_client, true /* use_initial_small_buffers */,
+                false /* read_write */));
+    RETURN_IF_ERROR(aggregated_row_stream->init(parent->id(), parent->runtime_profile(), true));
+
+    unaggregated_row_stream.reset(new BufferedTupleStream2(parent->_state,
+                parent->child(0)->row_desc(), parent->_state->block_mgr2(),
+                parent->_block_mgr_client, true /* use_initial_small_buffers */,
+                false /* read_write */));
     // This stream is only used to spill, no need to ever have this pinned.
-    RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), false));
-    // Save memory by waiting until we spill to allocate the write buffer for the
-    // unaggregated row stream.
-    DCHECK(!unaggregated_row_stream->has_write_iterator());
-  }
-  return Status::OK();
+    RETURN_IF_ERROR(unaggregated_row_stream->init(parent->id(), parent->runtime_profile(), false));
+    DCHECK(unaggregated_row_stream->has_write_block());
+    return Status::OK();
 }
 
-Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
-  DCHECK(aggregated_row_stream != nullptr);
-  DCHECK(hash_tbl == nullptr);
-  // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
-  // remaining bits can be used for the hash table.
-  // TODO: we could switch to 64 bit hashes and then we don't need a max size.
-  // It might be reasonable to limit individual hash table size for other reasons
-  // though. Always start with small buffers.
-  hash_tbl.reset(NewPartitionedHashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr,
-      1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ));
-  // Please update the error message in CreateHashPartitions() if initial size of
-  // hash table changes.
-  return hash_tbl->Init(got_memory);
+bool PartitionedAggregationNode::Partition::init_hash_table() {
+    DCHECK(hash_tbl.get() == NULL);
+    // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
+    // remaining bits can be used for the hash table.
+    // TODO: we could switch to 64 bit hashes and then we don't need a max size.
+    // It might be reasonable to limit individual hash table size for other reasons
+    // though. Always start with small buffers.
+    // TODO: How many buckets? We currently use a default value, 1024.
+    static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
+    hash_tbl.reset(PartitionedHashTable::create(parent->_state, parent->_block_mgr_client, 1,
+                NULL, 1 << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ));
+    return hash_tbl->init();
 }
 
-Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
-  DCHECK(!parent->is_streaming_preagg_);
-  if (parent->needs_serialize_) {
-    // We need to do a lot more work in this case. This step effectively does a merge
-    // aggregation in this node. We need to serialize the intermediates, spill the
-    // intermediates and then feed them into the aggregate function's merge step.
-    // This is often used when the intermediate is a string type, meaning the current
-    // (before serialization) in-memory layout is not the on-disk block layout.
-    // The disk layout does not support mutable rows. We need to rewrite the stream
-    // into the on disk format.
-    // TODO: if it happens to not be a string, we could serialize in place. This is
-    // a future optimization since it is very unlikely to have a serialize phase
-    // for those UDAs.
-    DCHECK(parent->serialize_stream_.get() != NULL);
-    DCHECK(!parent->serialize_stream_->is_pinned());
-
-    // Serialize and copy the spilled partition's stream into the new stream.
-    Status status = Status::OK();
-    BufferedTupleStream3* new_stream = parent->serialize_stream_.get();
-    NewPartitionedHashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
-    while (!it.AtEnd()) {
-      Tuple* tuple = it.GetTuple();
-      it.Next();
-      NewAggFnEvaluator::Serialize(agg_fn_evals, tuple);
-      if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) {
-        DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error";
+Status PartitionedAggregationNode::Partition::clean_up() {
+    if (parent->_needs_serialize && aggregated_row_stream->num_rows() != 0) {
+        // We need to do a lot more work in this case. This step effectively does a merge
+        // aggregation in this node. We need to serialize the intermediates, spill the
+        // intermediates and then feed them into the aggregate function's merge step.
+        // This is often used when the intermediate is a string type, meaning the current
+        // (before serialization) in-memory layout is not the on-disk block layout.
+        // The disk layout does not support mutable rows. We need to rewrite the stream
+        // into the on disk format.
+        // TODO: if it happens to not be a string, we could serialize in place. This is
+        // a future optimization since it is very unlikely to have a serialize phase
+        // for those UDAs.
+        DCHECK(parent->_serialize_stream.get() != NULL);
+        DCHECK(!parent->_serialize_stream->is_pinned());
+        DCHECK(parent->_serialize_stream->has_write_block());
+
+        const vector<AggFnEvaluator*>& evaluators = parent->_aggregate_evaluators;
+
+        // serialize and copy the spilled partition's stream into the new stream.
+        Status status = Status::OK();
+        bool failed_to_add = false;
+        BufferedTupleStream2* new_stream = parent->_serialize_stream.get();
+        PartitionedHashTable::Iterator it = hash_tbl->begin(parent->_ht_ctx.get());
+        while (!it.at_end()) {
+            Tuple* tuple = it.get_tuple();
+            it.next();
+            AggFnEvaluator::serialize(evaluators, agg_fn_ctxs, tuple);
+            if (UNLIKELY(!new_stream->add_row(reinterpret_cast<TupleRow*>(&tuple), &status))) {
+                failed_to_add = true;
+                break;
+            }
+        }
+
         // Even if we can't add to new_stream, finish up processing this agg stream to make
         // clean up easier (someone has to finalize this stream and we don't want to remember
         // where we are).
-        parent->CleanupHashTbl(agg_fn_evals, it);
-        hash_tbl->Close();
-        hash_tbl.reset();
-        aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-        return status;
-      }
+        if (failed_to_add) {
+            parent->cleanup_hash_tbl(agg_fn_ctxs, it);
+            hash_tbl->close();
+            hash_tbl.reset();
+            aggregated_row_stream->close();
+            RETURN_IF_ERROR(status);
+            return parent->_state->block_mgr2()->mem_limit_too_low_error(parent->_block_mgr_client,
+                    parent->id());
+        }
+        DCHECK(status.ok());
+
+        aggregated_row_stream->close();
+        aggregated_row_stream.swap(parent->_serialize_stream);
+        // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
+        // when we need to spill again. We need to have this available before we need
+        // to spill to make sure it is available. This should be acquirable since we just
+        // freed at least one buffer from this partition's (old) aggregated_row_stream.
+        parent->_serialize_stream.reset(new BufferedTupleStream2(parent->_state,
+                    *parent->_intermediate_row_desc, parent->_state->block_mgr2(),
+                    parent->_block_mgr_client, false /* use_initial_small_buffers */,
+                    false /* read_write */));
+        status = parent->_serialize_stream->init(parent->id(), parent->runtime_profile(), false);
+        if (!status.ok()) {
+            hash_tbl->close();
+            hash_tbl.reset();
+            return status;
+        }
+        DCHECK(parent->_serialize_stream->has_write_block());
     }
+    return Status::OK();
+}
+
+Status PartitionedAggregationNode::Partition::spill() {
+    DCHECK(!is_closed);
+    DCHECK(!is_spilled());
 
-    aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-    aggregated_row_stream.swap(parent->serialize_stream_);
-    // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
-    // when we need to spill again. We need to have this available before we need
-    // to spill to make sure it is available. This should be acquirable since we just
-    // freed at least one buffer from this partition's (old) aggregated_row_stream.
-    parent->serialize_stream_.reset(new BufferedTupleStream3(parent->state_,
-        &parent->intermediate_row_desc_, &parent->_buffer_pool_client,
-        parent->_resource_profile.spillable_buffer_size,
-        parent->_resource_profile.max_row_buffer_size));
-    status = parent->serialize_stream_->Init(parent->id(), false);
-    if (status.ok()) {
-      bool got_buffer;
-      status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
-      DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation";
+    RETURN_IF_ERROR(clean_up());
+
+    // Free the in-memory result data.
+    for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
+        agg_fn_ctxs[i]->impl()->close();
     }
-    if (!status.ok()) {
-      hash_tbl->Close();
-      hash_tbl.reset();
-      return status;
+
+    if (agg_fn_pool.get() != NULL) {
+        agg_fn_pool->free_all();
+        agg_fn_pool.reset();
     }
-    DCHECK(parent->serialize_stream_->has_write_iterator());
-  }
-  return Status::OK();
-}
 
-Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
-  DCHECK(!parent->is_streaming_preagg_);
-  DCHECK(!is_closed);
-  DCHECK(!is_spilled());
-  // TODO(ml): enable spill
-  std::stringstream msg;
-  msg << "New partitioned Aggregation in spill";
-  LIMIT_EXCEEDED(parent->mem_tracker(), parent->state_, msg.str());
-  // RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker()));
-
-  RETURN_IF_ERROR(SerializeStreamForSpilling());
-
-  // Free the in-memory result data.
-  NewAggFnEvaluator::Close(agg_fn_evals, parent->state_);
-  agg_fn_evals.clear();
-
-  if (agg_fn_pool.get() != NULL) {
-    agg_fn_pool->free_all();
-    agg_fn_pool.reset();
-  }
-
-  hash_tbl->Close();
-  hash_tbl.reset();
-
-  // Unpin the stream to free memory, but leave a write buffer in place so we can
-  // continue appending rows to one of the streams in the partition.
-  DCHECK(aggregated_row_stream->has_write_iterator());
-  DCHECK(!unaggregated_row_stream->has_write_iterator());
-  if (more_aggregate_rows) {
-//    aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL_EXCEPT_CURRENT);
-  } else {
-//    aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
-    bool got_buffer;
-    RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
-    DCHECK(got_buffer)
-        << "Accounted in min reservation" << parent->_buffer_pool_client.DebugString();
-  }
-
-  COUNTER_UPDATE(parent->num_spilled_partitions_, 1);
-  if (parent->num_spilled_partitions_->value() == 1) {
-    parent->add_runtime_exec_option("Spilled");
-  }
-  return Status::OK();
-}
+    hash_tbl->close();
+    hash_tbl.reset();
 
-void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
-  if (is_closed) return;
-  is_closed = true;
-  if (aggregated_row_stream.get() != NULL) {
-    if (finalize_rows && hash_tbl.get() != NULL) {
-      // We need to walk all the rows and Finalize them here so the UDA gets a chance
-      // to cleanup. If the hash table is gone (meaning this was spilled), the rows
-      // should have been finalized/serialized in Spill().
-      parent->CleanupHashTbl(agg_fn_evals, hash_tbl->Begin(parent->ht_ctx_.get()));
+    // Try to switch both streams to IO-sized buffers to avoid allocating small buffers
+    // for spilled partition.
+    bool got_buffer = true;
+    if (aggregated_row_stream->using_small_buffers()) {
+        RETURN_IF_ERROR(aggregated_row_stream->switch_to_io_buffers(&got_buffer));
+    }
+    // Unpin the stream as soon as possible to increase the changes that the
+    // switch_to_io_buffers() call below will succeed.
+    DCHECK(!got_buffer || aggregated_row_stream->has_write_block())
+            << aggregated_row_stream->debug_string();
+    RETURN_IF_ERROR(aggregated_row_stream->unpin_stream(false));
+
+    if (got_buffer && unaggregated_row_stream->using_small_buffers()) {
+        RETURN_IF_ERROR(unaggregated_row_stream->switch_to_io_buffers(&got_buffer));
+    }
+    if (!got_buffer) {
+        // We'll try again to get the buffers when the stream fills up the small buffers.
+        VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition "
+            << this << " of agg=" << parent->_id << " agg small buffers="
+            << aggregated_row_stream->using_small_buffers()
+            << " unagg small buffers="
+            << unaggregated_row_stream->using_small_buffers();
+        VLOG_FILE << get_stack_trace();
     }
-    aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  }
-  if (hash_tbl.get() != NULL) hash_tbl->Close();
-  if (unaggregated_row_stream.get() != NULL) {
-    unaggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  }
-
-  for (NewAggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
-  if (agg_fn_pool.get() != NULL) agg_fn_pool->free_all();
-}
-
-Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
-    const vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
-  DCHECK(grouping_exprs_.empty());
-  Tuple* output_tuple = Tuple::create(intermediate_tuple_desc_->byte_size(), pool);
-  InitAggSlots(agg_fn_evals, output_tuple);
-  return output_tuple;
-}
 
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
-    const vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool, Status* status) {
-  const int fixed_size = intermediate_tuple_desc_->byte_size();
-  const int varlen_size = GroupingExprsVarlenSize();
-  const int tuple_data_size = fixed_size + varlen_size;
-  uint8_t* tuple_data = pool->try_allocate(tuple_data_size);
-  if (UNLIKELY(tuple_data == NULL)) {
-    string details = Substitute("Cannot perform aggregation at node with id $0. Failed "
-        "to allocate $1 bytes for intermediate tuple.", _id, tuple_data_size);
-    *status = pool->mem_tracker()->MemLimitExceeded(state_, details, tuple_data_size);
-    return NULL;
-  }
-  memset(tuple_data, 0, fixed_size);
-  Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(tuple_data);
-  uint8_t* varlen_data = tuple_data + fixed_size;
-  CopyGroupingValues(intermediate_tuple, varlen_data, varlen_size);
-  InitAggSlots(agg_fn_evals, intermediate_tuple);
-  return intermediate_tuple;
+    COUNTER_UPDATE(parent->_num_spilled_partitions, 1);
+    if (parent->_num_spilled_partitions->value() == 1) {
+        parent->add_runtime_exec_option("Spilled");
+    }
+    return Status::OK();
 }
 
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
-    const vector<NewAggFnEvaluator*>& agg_fn_evals, BufferedTupleStream3* stream,
-    Status* status) {
-  DCHECK(stream != NULL && status != NULL);
-  // Allocate space for the entire tuple in the stream.
-  const int fixed_size = intermediate_tuple_desc_->byte_size();
-  const int varlen_size = GroupingExprsVarlenSize();
-  const int tuple_size = fixed_size + varlen_size;
-  uint8_t* tuple_data = stream->AddRowCustomBegin(tuple_size, status);
-  if (UNLIKELY(tuple_data == nullptr)) {
-    // If we failed to allocate and did not hit an error (indicated by a non-ok status),
-    // the caller of this function can try to free some space, e.g. through spilling, and
-    // re-attempt to allocate space for this row.
-    return nullptr;
-  }
-  Tuple* tuple = reinterpret_cast<Tuple*>(tuple_data);
-  tuple->init(fixed_size);
-  uint8_t* varlen_buffer = tuple_data + fixed_size;
-  CopyGroupingValues(tuple, varlen_buffer, varlen_size);
-  InitAggSlots(agg_fn_evals, tuple);
-  stream->AddRowCustomEnd(tuple_size);
-  return tuple;
-}
+void PartitionedAggregationNode::Partition::close(bool finalize_rows) {
+    if (is_closed) {
+        return;
+    }
+    is_closed = true;
+    if (aggregated_row_stream.get() != NULL) {
+        if (finalize_rows && hash_tbl.get() != NULL) {
+            // We need to walk all the rows and finalize them here so the UDA gets a chance
+            // to cleanup. If the hash table is gone (meaning this was spilled), the rows
+            // should have been finalized/serialized in spill().
+            parent->cleanup_hash_tbl(agg_fn_ctxs, hash_tbl->begin(parent->_ht_ctx.get()));
+        }
+        aggregated_row_stream->close();
+    }
+    if (hash_tbl.get() != NULL) {
+        hash_tbl->close();
+    }
+    if (unaggregated_row_stream.get() != NULL) {
+        unaggregated_row_stream->close();
+    }
 
-int PartitionedAggregationNode::GroupingExprsVarlenSize() {
-  int varlen_size = 0;
-  // TODO: The hash table could compute this as it hashes.
-  for (int expr_idx: string_grouping_exprs_) {
-    StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx));
-    // Avoid branching by multiplying length by null bit.
-    varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx);
-  }
-  return varlen_size;
+    for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
+        agg_fn_ctxs[i]->impl()->close();
+    }
+    if (agg_fn_pool.get() != NULL) {
+        agg_fn_pool->free_all();
+    }
 }
 
-// TODO: codegen this function.
-void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
-    uint8_t* buffer, int varlen_size) {
-  // Copy over all grouping slots (the variable length data is copied below).
-  for (int i = 0; i < grouping_exprs_.size(); ++i) {
-    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i];
-    if (ht_ctx_->ExprValueNull(i)) {
-      intermediate_tuple->set_null(slot_desc->null_indicator_offset());
+Tuple* PartitionedAggregationNode::construct_intermediate_tuple(
+        const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool,
+        BufferedTupleStream2* stream, Status* status) {
+    Tuple* intermediate_tuple = NULL;
+    uint8_t* buffer = NULL;
+    if (pool != NULL) {
+        DCHECK(stream == NULL && status == NULL);
+        intermediate_tuple = Tuple::create(_intermediate_tuple_desc->byte_size(), pool);
     } else {
-      void* src = ht_ctx_->ExprValue(i);
-      void* dst = intermediate_tuple->get_slot(slot_desc->tuple_offset());
-      memcpy(dst, src, slot_desc->slot_size());
+        DCHECK(stream != NULL && status != NULL);
+        // Figure out how big it will be to copy the entire tuple. We need the tuple to end
+        // up in one block in the stream.
+        int size = _intermediate_tuple_desc->byte_size();
+        if (_contains_var_len_grouping_exprs) {
+            // TODO: This is likely to be too slow. The hash table could maintain this as
+            // it hashes.
+            for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
+                if (!_probe_expr_ctxs[i]->root()->type().is_string_type()) {
+                    continue;
+                }
+                if (_ht_ctx->last_expr_value_null(i)) {
+                    continue;
+                }
+                StringValue* sv = reinterpret_cast<StringValue*>(_ht_ctx->last_expr_value(i));
+                size += sv->len;
+            }
+        }
+
+        // Now that we know the size of the row, allocate space for it in the stream.
+        buffer = stream->allocate_row(size, status);
+        if (buffer == NULL) {
+            if (!status->ok() || !stream->using_small_buffers()) {
+                return NULL;
+            }
+            // IMPALA-2352: Make a best effort to switch to IO buffers and re-allocate.
+            // If switch_to_io_buffers() fails the caller of this function can try to free
+            // some space, e.g. through spilling, and re-attempt to allocate space for
+            // this row.
+            bool got_buffer = false;
+            *status = stream->switch_to_io_buffers(&got_buffer);
+            if (!status->ok() || !got_buffer) {
+                return NULL;
+            }
+            buffer = stream->allocate_row(size, status);
+            if (buffer == NULL) {
+                return NULL;
+            }
+        }
+        intermediate_tuple = reinterpret_cast<Tuple*>(buffer);
+        // TODO: remove this. we shouldn't need to zero the entire tuple.
+        intermediate_tuple->init(size);
+        buffer += _intermediate_tuple_desc->byte_size();
+    }
+
+    // Copy grouping values.
+    vector<SlotDescriptor*>::const_iterator slot_desc = _intermediate_tuple_desc->slots().begin();
+    for (int i = 0; i < _probe_expr_ctxs.size(); ++i, ++slot_desc) {
+        if (_ht_ctx->last_expr_value_null(i)) {
+            intermediate_tuple->set_null((*slot_desc)->null_indicator_offset());
+        } else {
+            void* src = _ht_ctx->last_expr_value(i);
+            void* dst = intermediate_tuple->get_slot((*slot_desc)->tuple_offset());
+            if (stream == NULL) {
+                RawValue::write(src, dst, (*slot_desc)->type(), pool);
+            } else {
+                RawValue::write(src, (*slot_desc)->type(), dst, &buffer);
+            }
+        }
     }
-  }
-
-  for (int expr_idx: string_grouping_exprs_) {
-    if (ht_ctx_->ExprValueNull(expr_idx)) continue;
-
-    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx];
-    // ptr and len were already copied to the fixed-len part of string value
-    StringValue* sv = reinterpret_cast<StringValue*>(
-        intermediate_tuple->get_slot(slot_desc->tuple_offset()));
-    memcpy(buffer, sv->ptr, sv->len);
-    sv->ptr = reinterpret_cast<char*>(buffer);
-    buffer += sv->len;
-  }
+
+    // Initialize aggregate output.
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++slot_desc) {
+        while (!(*slot_desc)->is_materialized()) {
+            ++slot_desc;
+        }
+        AggFnEvaluator* evaluator = _aggregate_evaluators[i];
+        evaluator->init(agg_fn_ctxs[i], intermediate_tuple);
+        // Codegen specific path for min/max.
+        // To minimize branching on the update_tuple path, initialize the result value
+        // so that update_tuple doesn't have to check if the aggregation
+        // dst slot is null.
+        // TODO: remove when we don't use the irbuilder for codegen here.  This optimization
+        // will no longer be necessary when all aggregates are implemented with the UDA
+        // interface.
+        // if ((*slot_desc)->type().type != TYPE_STRING &&
+        //         (*slot_desc)->type().type != TYPE_VARCHAR &&
+        //         (*slot_desc)->type().type != TYPE_TIMESTAMP &&
+        //         (*slot_desc)->type().type != TYPE_CHAR &&
+        //         (*slot_desc)->type().type != TYPE_DECIMAL) {
+        if (!(*slot_desc)->type().is_string_type()
+                && !(*slot_desc)->type().is_date_type()) {
+            ExprValue default_value;
+            void* default_value_ptr = NULL;
+            switch (evaluator->agg_op()) {
+                case AggFnEvaluator::MIN:
+                    default_value_ptr = default_value.set_to_max((*slot_desc)->type());
+                    RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
+                    break;
+                case AggFnEvaluator::MAX:
+                    default_value_ptr = default_value.set_to_min((*slot_desc)->type());
+                    RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+    return intermediate_tuple;
 }
 
-// TODO: codegen this function.
-void PartitionedAggregationNode::InitAggSlots(
-    const vector<NewAggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
-  vector<SlotDescriptor*>::const_iterator slot_desc =
-      intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size();
-  for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) {
-    // To minimize branching on the UpdateTuple path, initialize the result value so that
-    // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for
-    // SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can
-    // just start adding to the destination value (rather than repeatedly checking the
-    // destination NULL bit. The codegen'd version of UpdateSlot() exploits this to
-    // eliminate a branch per value.
-    //
-    // For boolean and numeric types, the default values are false/0, so the nullable
-    // aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(),
-    // initialize the value to max/min possible value for the same effect.
-    NewAggFnEvaluator* eval = agg_fn_evals[i];
-    eval->Init(intermediate_tuple);
-
-    DCHECK(agg_fns_[i] == &(eval->agg_fn()));
-    const AggFn* agg_fn = agg_fns_[i];
-    const AggFn::AggregationOp agg_op = agg_fn->agg_op();
-    if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) &&
-        !agg_fn->intermediate_type().is_string_type() &&
-        !agg_fn->intermediate_type().is_date_type()) {
-      ExprValue default_value;
-      void* default_value_ptr = NULL;
-      if (agg_op == AggFn::MIN) {
-        default_value_ptr = default_value.set_to_max((*slot_desc)->type());
-      } else {
-        DCHECK_EQ(agg_op, AggFn::MAX);
-        default_value_ptr = default_value.set_to_min((*slot_desc)->type());
-      }
-      RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
+void PartitionedAggregationNode::update_tuple(FunctionContext** agg_fn_ctxs,
+        Tuple* tuple, TupleRow* row, bool is_merge) {
+    DCHECK(tuple != NULL || _aggregate_evaluators.empty());
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        if (is_merge) {
+            _aggregate_evaluators[i]->merge(agg_fn_ctxs[i], row->get_tuple(0), tuple);
+        } else {
+            _aggregate_evaluators[i]->add(agg_fn_ctxs[i], row, tuple);
+        }
     }
-  }
 }
 
-void PartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals,
-    Tuple* tuple, TupleRow* row, bool is_merge) {
-  DCHECK(tuple != NULL || agg_fns_.empty());
-  for (int i = 0; i < agg_fns_.size(); ++i) {
-    if (is_merge) {
-      agg_fn_evals[i]->Merge(row->get_tuple(0), tuple);
+Tuple* PartitionedAggregationNode::get_output_tuple(
+        const vector<FunctionContext*>& agg_fn_ctxs, Tuple* tuple, MemPool* pool) {
+    DCHECK(tuple != NULL || _aggregate_evaluators.empty()) << tuple;
+    Tuple* dst = tuple;
+    // if (_needs_finalize && _intermediate_tuple_id != _output_tuple_id) {
+    if (_needs_finalize) {
+        dst = Tuple::create(_output_tuple_desc->byte_size(), pool);
+    }
+    if (_needs_finalize) {
+        AggFnEvaluator::finalize(_aggregate_evaluators, agg_fn_ctxs, tuple, dst);
     } else {
-      agg_fn_evals[i]->Add(row, tuple);
+        AggFnEvaluator::serialize(_aggregate_evaluators, agg_fn_ctxs, tuple);
+    }
+    // Copy grouping values from tuple to dst.
+    // TODO: Codegen this.
+    if (dst != tuple) {
+        int num_grouping_slots = _probe_expr_ctxs.size();
+        for (int i = 0; i < num_grouping_slots; ++i) {
+            SlotDescriptor* src_slot_desc = _intermediate_tuple_desc->slots()[i];
+            SlotDescriptor* dst_slot_desc = _output_tuple_desc->slots()[i];
+            bool src_slot_null = tuple->is_null(src_slot_desc->null_indicator_offset());
+            void* src_slot = NULL;
+            if (!src_slot_null) {
+                src_slot = tuple->get_slot(src_slot_desc->tuple_offset());
+            }
+            RawValue::write(src_slot, dst, dst_slot_desc, NULL);
+        }
     }
-  }
+    return dst;
 }
 
-Tuple* PartitionedAggregationNode::GetOutputTuple(
-    const vector<NewAggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
-  DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple;
-  Tuple* dst = tuple;
-  if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
-    dst = Tuple::create(output_tuple_desc_->byte_size(), pool);
-  }
-  if (needs_finalize_) {
-    NewAggFnEvaluator::Finalize(agg_fn_evals, tuple, dst);
-  } else {
-    NewAggFnEvaluator::Serialize(agg_fn_evals, tuple);
-  }
-  // Copy grouping values from tuple to dst.
-  // TODO: Codegen this.
-  if (dst != tuple) {
-    int num_grouping_slots = grouping_exprs_.size();
-    for (int i = 0; i < num_grouping_slots; ++i) {
-      SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
-      SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
-      bool src_slot_null = tuple->is_null(src_slot_desc->null_indicator_offset());
-      void* src_slot = NULL;
-      if (!src_slot_null) src_slot = tuple->get_slot(src_slot_desc->tuple_offset());
-      RawValue::write(src_slot, dst, dst_slot_desc, NULL);
+Status PartitionedAggregationNode::append_spilled_row(BufferedTupleStream2* stream, TupleRow* row) {
+    DCHECK(stream != NULL);
+    DCHECK(!stream->is_pinned());
+    DCHECK(stream->has_write_block());
+    if (LIKELY(stream->add_row(row, &_process_batch_status))) {
+        return Status::OK();
     }
-  }
-  return dst;
-}
 
-template <bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AppendSpilledRow(
-    Partition* partition, TupleRow* row) {
-  DCHECK(!is_streaming_preagg_);
-  DCHECK(partition->is_spilled());
-  BufferedTupleStream3* stream = AGGREGATED_ROWS ?
-      partition->aggregated_row_stream.get() :
-      partition->unaggregated_row_stream.get();
-  DCHECK(!stream->is_pinned());
-  Status status;
-  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
-  RETURN_IF_ERROR(status);
-
-  // Keep trying to free memory by spilling until we succeed or hit an error.
-  // Running out of partitions to spill is treated as an error by SpillPartition().
-  while (true) {
-    RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
-    if (stream->AddRow(row, &status)) return Status::OK();
-    RETURN_IF_ERROR(status);
-  }
-}
+    // Adding fails iff either we hit an error or haven't switched to I/O buffers.
+    RETURN_IF_ERROR(_process_batch_status);
+    while (true) {
+        bool got_buffer = false;
+        RETURN_IF_ERROR(stream->switch_to_io_buffers(&got_buffer));
+        if (got_buffer) {
+            break;
+        }
+        RETURN_IF_ERROR(spill_partition());
+    }
 
-string PartitionedAggregationNode::DebugString(int indentation_level) const {
-  stringstream ss;
-  DebugString(indentation_level, &ss);
-  return ss.str();
+    // Adding the row should succeed after the I/O buffer switch.
+    if (stream->add_row(row, &_process_batch_status)) {
+        return Status::OK();
+    }
+    DCHECK(!_process_batch_status.ok());
+    return _process_batch_status;
 }
 
-void PartitionedAggregationNode::DebugString(int indentation_level,
-    stringstream* out) const {
-  *out << string(indentation_level * 2, ' ');
-  *out << "PartitionedAggregationNode("
-       << "intermediate_tuple_id=" << intermediate_tuple_id_
-       << " output_tuple_id=" << output_tuple_id_
-       << " needs_finalize=" << needs_finalize_
-       << " grouping_exprs=" << Expr::debug_string(grouping_exprs_)
-       << " agg_exprs=" << AggFn::DebugString(agg_fns_);
-  ExecNode::debug_string(indentation_level, out);
-  *out << ")";
+void PartitionedAggregationNode::debug_string(int indentation_level, stringstream* out) const {
+    *out << string(indentation_level * 2, ' ');
+    *out << "PartitionedAggregationNode("
+        << "intermediate_tuple_id=" << _intermediate_tuple_id
+        << " output_tuple_id=" << _output_tuple_id
+        << " needs_finalize=" << _needs_finalize
+        << " probe_exprs=" << Expr::debug_string(_probe_expr_ctxs)
+        << " agg_exprs=" << AggFnEvaluator::debug_string(_aggregate_evaluators);
+    ExecNode::debug_string(indentation_level, out);
+    *out << ")";
 }
 
-Status PartitionedAggregationNode::CreateHashPartitions(
-    int level, int single_partition_idx) {
-  if (is_streaming_preagg_) DCHECK_EQ(level, 0);
-  if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
+Status PartitionedAggregationNode::create_hash_partitions(int level) {
+    if (level >= MAX_PARTITION_DEPTH) {
         stringstream error_msg;
         error_msg << "Cannot perform aggregation at hash aggregation node with id "
                 << _id << '.'
@@ -1100,360 +814,280 @@ Status PartitionedAggregationNode::CreateHashPartitions(
                 << MAX_PARTITION_DEPTH << " times."
                 << " This could mean there is significant skew in the data or the memory limit is"
                 << " set too low.";
-        return state_->set_mem_limit_exceeded(error_msg.str());
-  }
-  ht_ctx_->set_level(level);
-
-  DCHECK(hash_partitions_.empty());
-  int num_partitions_created = 0;
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    hash_tbls_[i] = nullptr;
-    if (single_partition_idx == -1 || i == single_partition_idx) {
-      Partition* new_partition = partition_pool_->add(new Partition(this, level, i));
-      ++num_partitions_created;
-      hash_partitions_.push_back(new_partition);
-      RETURN_IF_ERROR(new_partition->InitStreams());
-    } else {
-      hash_partitions_.push_back(nullptr);
-    }
-  }
-
-  // Now that all the streams are reserved (meaning we have enough memory to execute
-  // the algorithm), allocate the hash tables. These can fail and we can still continue.
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* partition = hash_partitions_[i];
-    if (partition == nullptr) continue;
-    if (partition->aggregated_row_stream == nullptr) {
-      // Failed to create the aggregated row stream - cannot create a hash table.
-      // Just continue with a NULL hash table so rows will be passed through.
-      DCHECK(is_streaming_preagg_);
-    } else {
-      bool got_memory;
-      RETURN_IF_ERROR(partition->InitHashTable(&got_memory));
-      // Spill the partition if we cannot create a hash table for a merge aggregation.
-      if (UNLIKELY(!got_memory)) {
-        DCHECK(!is_streaming_preagg_) << "Preagg reserves enough memory for hash tables";
-        // If we're repartitioning, we will be writing aggregated rows first.
-        RETURN_IF_ERROR(partition->Spill(level > 0));
-      }
+        return _state->set_mem_limit_exceeded(error_msg.str());
     }
-    hash_tbls_[i] = partition->hash_tbl.get();
-  }
-  // In this case we did not have to repartition, so ensure that while building the hash
-  // table all rows will be inserted into the partition at 'single_partition_idx' in case
-  // a non deterministic grouping expression causes a row to hash to a different
-  // partition index.
-  if (single_partition_idx != -1) {
-    Partition* partition = hash_partitions_[single_partition_idx];
+    _ht_ctx->set_level(level);
+
+    DCHECK(_hash_partitions.empty());
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
-      hash_partitions_[i] = partition;
-      hash_tbls_[i] = partition->hash_tbl.get();
+        Partition* new_partition = new Partition(this, level);
+        DCHECK(new_partition != NULL);
+        _hash_partitions.push_back(_partition_pool->add(new_partition));
+        RETURN_IF_ERROR(new_partition->init_streams());
     }
-  }
-
-  COUNTER_UPDATE(partitions_created_, num_partitions_created);
-  if (!is_streaming_preagg_) {
-    COUNTER_SET(max_partition_level_, level);
-  }
-  return Status::OK();
-}
+    DCHECK_GT(_state->block_mgr2()->num_reserved_buffers_remaining(_block_mgr_client), 0);
 
-Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
-    bool partitioning_aggregated_rows, int num_rows, const NewPartitionedHashTableCtx* ht_ctx) {
-  DCHECK(!is_streaming_preagg_);
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* partition = hash_partitions_[i];
-    if (partition == nullptr) continue;
-    while (!partition->is_spilled()) {
-      {
-        SCOPED_TIMER(ht_resize_timer_);
-        bool resized;
-        RETURN_IF_ERROR(partition->hash_tbl->CheckAndResize(num_rows, ht_ctx, &resized));
-        if (resized) break;
-      }
-      RETURN_IF_ERROR(SpillPartition(partitioning_aggregated_rows));
+    // Now that all the streams are reserved (meaning we have enough memory to execute
+    // the algorithm), allocate the hash tables. These can fail and we can still continue.
+    for (int i = 0; i < PARTITION_FANOUT; ++i) {
+        if (!_hash_partitions[i]->init_hash_table()) {
+            RETURN_IF_ERROR(_hash_partitions[i]->spill());
+        }
     }
-  }
-  return Status::OK();
+    COUNTER_UPDATE(_partitions_created, PARTITION_FANOUT);
+    // COUNTER_SET(_max_partition_level, level);
+    return Status::OK();
 }
 
-Status PartitionedAggregationNode::NextPartition() {
-  DCHECK(output_partition_ == nullptr);
-
-  if (!is_in_subplan() && spilled_partitions_.empty()) {
-    // All partitions are in memory. Release reservation that was used for previous
-    // partitions that is no longer needed. If we have spilled partitions, we want to
-    // hold onto all reservation in case it is needed to process the spilled partitions.
-    DCHECK(!_buffer_pool_client.has_unpinned_pages());
-    Status status = release_unused_reservation();
-    DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are "
-                        << "no unpinned pages. " << status.get_error_msg();
-  }
-
-  // Keep looping until we get to a partition that fits in memory.
-  Partition* partition = nullptr;
-  while (true) {
-    // First return partitions that are fully aggregated (and in memory).
-    if (!aggregated_partitions_.empty()) {
-      partition = aggregated_partitions_.front();
-      DCHECK(!partition->is_spilled());
-      aggregated_partitions_.pop_front();
-      break;
+Status PartitionedAggregationNode::check_and_resize_hash_partitions(int num_rows,
+        PartitionedHashTableCtx* ht_ctx) {
+    for (int i = 0; i < PARTITION_FANOUT; ++i) {
+        Partition* partition = _hash_partitions[i];
+        while (!partition->is_spilled()) {
+            {
+                SCOPED_TIMER(_ht_resize_timer);
+                if (partition->hash_tbl->check_and_resize(num_rows, ht_ctx)) {
+                    break;
+                }
+            }
+            // There was not enough memory for the resize. Spill a partition and retry.
+            RETURN_IF_ERROR(spill_partition());
+        }
     }
-
-    // No aggregated partitions in memory - we should not be using any reservation aside
-    // from 'serialize_stream_'.
-    DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0,
-        _buffer_pool_client.GetUsedReservation()) << _buffer_pool_client.DebugString();
-
-    // Try to fit a single spilled partition in memory. We can often do this because
-    // we only need to fit 1/PARTITION_FANOUT of the data in memory.
-    // TODO: in some cases when the partition probably won't fit in memory it could
-    // be better to skip directly to repartitioning.
-    RETURN_IF_ERROR(BuildSpilledPartition(&partition));
-    if (partition != nullptr) break;
-
-    // If we can't fit the partition in memory, repartition it.
-    RETURN_IF_ERROR(RepartitionSpilledPartition());
-  }
-  DCHECK(!partition->is_spilled());
-  DCHECK(partition->hash_tbl.get() != nullptr);
-  DCHECK(partition->aggregated_row_stream->is_pinned());
-
-  output_partition_ = partition;
-  output_iterator_ = output_partition_->hash_tbl->Begin(ht_ctx_.get());
-  COUNTER_UPDATE(num_hash_buckets_, output_partition_->hash_tbl->num_buckets());
-  return Status::OK();
+    return Status::OK();
 }
 
-Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) {
-  DCHECK(!spilled_partitions_.empty());
-  DCHECK(!is_streaming_preagg_);
-  // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
-  Partition* src_partition = spilled_partitions_.front();
-  DCHECK(src_partition->is_spilled());
-
-  // Create a new hash partition from the rows of the spilled partition. This is simpler
-  // than trying to finish building a partially-built partition in place. We only
-  // initialise one hash partition that all rows in 'src_partition' will hash to.
-  RETURN_IF_ERROR(CreateHashPartitions(src_partition->level, src_partition->idx));
-  Partition* dst_partition = hash_partitions_[src_partition->idx];
-  DCHECK(dst_partition != nullptr);
-
-  // Rebuild the hash table over spilled aggregate rows then start adding unaggregated
-  // rows to the hash table. It's possible the partition will spill at either stage.
-  // In that case we need to finish processing 'src_partition' so that all rows are
-  // appended to 'dst_partition'.
-  // TODO: if the partition spills again but the aggregation reduces the input
-  // significantly, we could do better here by keeping the incomplete hash table in
-  // memory and only spilling unaggregated rows that didn't fit in the hash table
-  // (somewhat similar to the passthrough pre-aggregation).
-  RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get()));
-  RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get()));
-  src_partition->Close(false);
-  spilled_partitions_.pop_front();
-  hash_partitions_.clear();
-
-  if (dst_partition->is_spilled()) {
-    PushSpilledPartition(dst_partition);
-    *built_partition = nullptr;
-    // Spilled the partition - we should not be using any reservation except from
-    // 'serialize_stream_'.
-    DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0,
-        _buffer_pool_client.GetUsedReservation()) << _buffer_pool_client.DebugString();
-  } else {
-    *built_partition = dst_partition;
-  }
-  return Status::OK();
+int64_t PartitionedAggregationNode::largest_spilled_partition() const {
+    int64_t max_rows = 0;
+    for (int i = 0; i < _hash_partitions.size(); ++i) {
+        Partition* partition = _hash_partitions[i];
+        if (partition->is_closed || !partition->is_spilled()) {
+            continue;
+        }
+        int64_t rows = partition->aggregated_row_stream->num_rows() +
+                partition->unaggregated_row_stream->num_rows();
+        if (rows > max_rows) {
+            max_rows = rows;
+        }
+    }
+    return max_rows;
 }
 
-Status PartitionedAggregationNode::RepartitionSpilledPartition() {
-  DCHECK(!spilled_partitions_.empty());
-  DCHECK(!is_streaming_preagg_);
-  // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
-  Partition* partition = spilled_partitions_.front();
-  DCHECK(partition->is_spilled());
-
-  // Create the new hash partitions to repartition into. This will allocate a
-  // write buffer for each partition's aggregated row stream.
-  RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1));
-  COUNTER_UPDATE(num_repartitions_, 1);
-
-  // Rows in this partition could have been spilled into two streams, depending
-  // on if it is an aggregated intermediate, or an unaggregated row. Aggregated
-  // rows are processed first to save a hash table lookup in ProcessBatch().
-  RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
-
-  // Prepare write buffers so we can append spilled rows to unaggregated partitions.
-  for (Partition* hash_partition : hash_partitions_) {
-    if (!hash_partition->is_spilled()) continue;
-    // The aggregated rows have been repartitioned. Free up at least a buffer's worth of
-    // reservation and use it to pin the unaggregated write buffer.
-//    hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
-    bool got_buffer;
-    RETURN_IF_ERROR(
-        hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
-    DCHECK(got_buffer)
-        << "Accounted in min reservation" << _buffer_pool_client.DebugString();
-  }
-  RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
-
-  COUNTER_UPDATE(num_row_repartitioned_, partition->aggregated_row_stream->num_rows());
-  COUNTER_UPDATE(num_row_repartitioned_, partition->unaggregated_row_stream->num_rows());
-
-  partition->Close(false);
-  spilled_partitions_.pop_front();
-
-  // Done processing this partition. Move the new partitions into
-  // spilled_partitions_/aggregated_partitions_.
-  int64_t num_input_rows = partition->aggregated_row_stream->num_rows()
-      + partition->unaggregated_row_stream->num_rows();
-  RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
-  return Status::OK();
-}
+Status PartitionedAggregationNode::next_partition() {
+    DCHECK(_output_partition == NULL);
 
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) {
-  DCHECK(!is_streaming_preagg_);
-  if (input_stream->num_rows() > 0) {
+    // Keep looping until we get to a partition that fits in memory.
+    Partition* partition = NULL;
     while (true) {
-      bool got_buffer = false;
-      RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer));
-      if (got_buffer) break;
-      // Did not have a buffer to read the input stream. Spill and try again.
-      RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
+        partition = NULL;
+        // First return partitions that are fully aggregated (and in memory).
+        if (!_aggregated_partitions.empty()) {
+            partition = _aggregated_partitions.front();
+            DCHECK(!partition->is_spilled());
+            _aggregated_partitions.pop_front();
+            break;
+        }
+
+        if (partition == NULL) {
+            DCHECK(!_spilled_partitions.empty());
+            DCHECK_EQ(_state->block_mgr2()->num_pinned_buffers(_block_mgr_client),
+                    _needs_serialize ? 1 : 0);
+
+            // TODO: we can probably do better than just picking the first partition. We
+            // can base this on the amount written to disk, etc.
+            partition = _spilled_partitions.front();
+            DCHECK(partition->is_spilled());
+
+            // Create the new hash partitions to repartition into.
+            // TODO: we don't need to repartition here. We are now working on 1 / FANOUT
+            // of the input so it's reasonably likely it can fit. We should look at this
+            // partitions size and just do the aggregation if it fits in memory.
+            RETURN_IF_ERROR(create_hash_partitions(partition->level + 1));
+            COUNTER_UPDATE(_num_repartitions, 1);
+
+            // Rows in this partition could have been spilled into two streams, depending
+            // on if it is an aggregated intermediate, or an unaggregated row.
+            // Note: we must process the aggregated rows first to save a hash table lookup
+            // in process_batch().
+            RETURN_IF_ERROR(process_stream<true>(partition->aggregated_row_stream.get()));
+            RETURN_IF_ERROR(process_stream<false>(partition->unaggregated_row_stream.get()));
+
+            COUNTER_UPDATE(_num_row_repartitioned, partition->aggregated_row_stream->num_rows());
+            COUNTER_UPDATE(_num_row_repartitioned, partition->unaggregated_row_stream->num_rows());
+
+            partition->close(false);
+            _spilled_partitions.pop_front();
+
+            // Done processing this partition. Move the new partitions into
+            // _spilled_partitions/_aggregated_partitions.
+            int64_t num_input_rows = partition->aggregated_row_stream->num_rows() +
+                partition->unaggregated_row_stream->num_rows();
+
+            // Check if there was any reduction in the size of partitions after repartitioning.
+            int64_t largest_partition = largest_spilled_partition();
+            DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
+                "more rows than the input";
+            if (num_input_rows == largest_partition) {
+                // Status status = Status::MemTrackerExceeded();
+                // status.AddDetail(Substitute("Cannot perform aggregation at node with id $0. "
+                //             "Repartitioning did not reduce the size of a spilled partition. "
+                //             "Repartitioning level $1. Number of rows $2.",
+                //             _id, partition->level + 1, num_input_rows));
+                // _state->SetMemTrackerExceeded();
+                stringstream error_msg;
+                error_msg << "Cannot perform aggregation at node with id " << _id << ". "
+                        << "Repartitioning did not reduce the size of a spilled partition. "
+                        << "Repartitioning level " << partition->level + 1
+                        << ". Number of rows " << num_input_rows << " .";
+                return Status::MemoryLimitExceeded(error_msg.str());
+            }
+            RETURN_IF_ERROR(move_hash_partitions(num_input_rows));
+        }
     }
 
-    bool eos = false;
-  const RowDescriptor* desc =
-        AGGREGATED_ROWS ? &intermediate_row_desc_ : &(_children[0]->row_desc());
-  RowBatch batch(*desc, state_->batch_size(), const_cast<MemTracker*>(mem_tracker()));
-    do {
-      RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
-      RETURN_IF_ERROR(
-          ProcessBatch<AGGREGATED_ROWS>(&batch, ht_ctx_.get()));
-      RETURN_IF_ERROR(state_->check_query_state("New partitioned aggregation, while processing stream."));
-      batch.reset();
-    } while (!eos);
-  }
-  input_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  return Status::OK();
+    DCHECK(partition->hash_tbl.get() != NULL);
+    DCHECK(partition->aggregated_row_stream->is_pinned());
+
+    _output_partition = partition;
+    _output_iterator = _output_partition->hash_tbl->begin(_ht_ctx.get());
+    COUNTER_UPDATE(_num_hash_buckets, _output_partition->hash_tbl->num_buckets());
+    return Status::OK();
 }
 
-Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
-  int64_t max_freed_mem = 0;
-  int partition_idx = -1;
-
-  // Iterate over the partitions and pick the largest partition that is not spilled.
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    if (hash_partitions_[i] == nullptr) continue;
-    if (hash_partitions_[i]->is_closed) continue;
-    if (hash_partitions_[i]->is_spilled()) continue;
-    // Pass 'true' because we need to keep the write block pinned. See Partition::Spill().
-    int64_t mem = hash_partitions_[i]->aggregated_row_stream->BytesPinned(true);
-    mem += hash_partitions_[i]->hash_tbl->ByteSize();
-    mem += hash_partitions_[i]->agg_fn_pool->total_reserved_bytes();
-    DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory.
-    if (mem > max_freed_mem) {
-      max_freed_mem = mem;
-      partition_idx = i;
+template<bool AGGREGATED_ROWS>
+Status PartitionedAggregationNode::process_stream(BufferedTupleStream2* input_stream) {
+    if (input_stream->num_rows() > 0) {
+        while (true) {
+            bool got_buffer = false;
+            RETURN_IF_ERROR(input_stream->prepare_for_read(true, &got_buffer));
+            if (got_buffer) {
+                break;
+            }
+            // Did not have a buffer to read the input stream. Spill and try again.
+            RETURN_IF_ERROR(spill_partition());
+        }
+
+        bool eos = false;
+        RowBatch batch(AGGREGATED_ROWS ? *_intermediate_row_desc : _children[0]->row_desc(),
+                _state->batch_size(), mem_tracker());
+        do {
+            RETURN_IF_ERROR(input_stream->get_next(&batch, &eos));
+            RETURN_IF_ERROR(process_batch<AGGREGATED_ROWS>(&batch, _ht_ctx.get()));
+            RETURN_IF_ERROR(_state->query_status());
+            // free_local_allocations();
+            batch.reset();
+        } while (!eos);
     }
-  }
-  DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to "
-                               << "reclaim memory: " << _buffer_pool_client.DebugString();
-  // Remove references to the destroyed hash table from 'hash_tbls_'.
-  // Additionally, we might be dealing with a rebuilt spilled partition, where all
-  // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_'
-  // remains consistent in that case.
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr;
-  }
-  return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
+    input_stream->close();
+    return Status::OK();
 }
 
-Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
-  DCHECK(!hash_partitions_.empty());
-  std::stringstream ss;
-  ss << "PA(node_id=" << id() << ") partitioned(level=" << hash_partitions_[0]->level
-     << ") " << num_input_rows << " rows into:" << std::endl;
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    if (partition == nullptr) continue;
-    // We might be dealing with a rebuilt spilled partition, where all partitions are
-    // pointing to a single in-memory partition, so make sure we only proceed for the
-    // right partition.
-    if(i != partition->idx) continue;
-    int64_t aggregated_rows = 0;
-    if (partition->aggregated_row_stream != nullptr) {
-      aggregated_rows = partition->aggregated_row_stream->num_rows();
-    }
-    int64_t unaggregated_rows = 0;
-    if (partition->unaggregated_row_stream != nullptr) {
-      unaggregated_rows = partition->unaggregated_row_stream->num_rows();
+Status PartitionedAggregationNode::spill_partition() {
+    int64_t max_freed_mem = 0;
+    int partition_idx = -1;
+
+    // Iterate over the partitions and pick the largest partition that is not spilled.
+    for (int i = 0; i < _hash_partitions.size(); ++i) {
+        if (_hash_partitions[i]->is_closed) {
+            continue;
+        }
+        if (_hash_partitions[i]->is_spilled()) {
+            continue;
+        }
+        // TODO: In PHJ the bytes_in_mem() call also calculates the mem used by the
+        // _write_block, why do we ignore it here?
+        int64_t mem = _hash_partitions[i]->aggregated_row_stream->bytes_in_mem(true);
+        mem += _hash_partitions[i]->hash_tbl->byte_size();
+        mem += _hash_partitions[i]->agg_fn_pool->total_reserved_bytes();
+        if (mem > max_freed_mem) {
+            max_freed_mem = mem;
+            partition_idx = i;
+        }
     }
-    double total_rows = aggregated_rows + unaggregated_rows;
-    double percent = total_rows * 100 / num_input_rows;
-    ss << "  " << i << " "  << (partition->is_spilled() ? "spilled" : "not spilled")
-       << " (fraction=" << std::fixed << std::setprecision(2) << percent << "%)" << std::endl
-       << "    #aggregated rows:" << aggregated_rows << std::endl
-       << "    #unaggregated rows: " << unaggregated_rows << std::endl;
-
-    // TODO: update counters to support doubles.
-    COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
-
-    if (total_rows == 0) {
-      partition->Close(false);
-    } else if (partition->is_spilled()) {
-      PushSpilledPartition(partition);
-    } else {
-      aggregated_partitions_.push_back(partition);
+    if (partition_idx == -1) {
+        // Could not find a partition to spill. This means the mem limit was just too low.
+        return _state->block_mgr2()->mem_limit_too_low_error(_block_mgr_client, id());
     }
 
-  }
-  VLOG(2) << ss.str();
-  hash_partitions_.clear();
-  return Status::OK();
+    return _hash_partitions[partition_idx]->spill();
 }
 
-void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
-  DCHECK(partition->is_spilled());
-  DCHECK(partition->hash_tbl == nullptr);
-  // Ensure all pages in the spilled partition's streams are unpinned by invalidating
-  // the streams' read and write iterators. We may need all the memory to process the
-  // next spilled partitions.
-//  partition->aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
-//  partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
-  spilled_partitions_.push_front(partition);
+Status PartitionedAggregationNode::move_hash_partitions(int64_t num_input_rows) {
+    DCHECK(!_hash_partitions.empty());
+    stringstream ss;
+    ss << "PA(node_id=" << id() << ") partitioned(level="
+        << _hash_partitions[0]->level << ") "
+        << num_input_rows << " rows into:" << std::endl;
+    for (int i = 0; i < _hash_partitions.size(); ++i) {
+        Partition* partition = _hash_partitions[i];
+        int64_t aggregated_rows = partition->aggregated_row_stream->num_rows();
+        int64_t unaggregated_rows = partition->unaggregated_row_stream->num_rows();
+        int64_t total_rows = aggregated_rows + unaggregated_rows;
+        double percent = static_cast<double>(total_rows * 100) / num_input_rows;
+        ss << "  " << i << " "  << (partition->is_spilled() ? "spilled" : "not spilled")
+            << " (fraction=" << std::fixed << std::setprecision(2) << percent << "%)" << std::endl
+            << "    #aggregated rows:" << aggregated_rows << std::endl
+            << "    #unaggregated rows: " << unaggregated_rows << std::endl;
+
+        // TODO: update counters to support doubles.
+        // COUNTER_SET(_largest_partition_percent, static_cast<int64_t>(percent));
+
+        if (total_rows == 0) {
+            partition->close(false);
+        } else if (partition->is_spilled()) {
+            DCHECK(partition->hash_tbl.get() == NULL);
+            // We need to unpin all the spilled partitions to make room to allocate new
+            // _hash_partitions when we repartition the spilled partitions.
+            // TODO: we only need to do this when we have memory pressure. This might be
+            // okay though since the block mgr should only write these to disk if there
+            // is memory pressure.
+            RETURN_IF_ERROR(partition->aggregated_row_stream->unpin_stream(true));
+            RETURN_IF_ERROR(partition->unaggregated_row_stream->unpin_stream(true));
+
+            // Push new created partitions at the front. This means a depth first walk
+            // (more finely partitioned partitions are processed first). This allows us
+            // to delete blocks earlier and bottom out the recursion earlier.
+            _spilled_partitions.push_front(partition);
+        } else {
+            _aggregated_partitions.push_back(partition);
+        }
+
+    }
+    VLOG(2) << ss.str();
+    _hash_partitions.clear();
+    return Status::OK();
 }
 
-void PartitionedAggregationNode::ClosePartitions() {
-  for (Partition* partition : hash_partitions_) {
-    if (partition != nullptr) partition->Close(true);
-  }
-  hash_partitions_.clear();
-  for (Partition* partition : aggregated_partitions_) partition->Close(true);
-  aggregated_partitions_.clear();
-  for (Partition* partition : spilled_partitions_) partition->Close(true);
-  spilled_partitions_.clear();
-  memset(hash_tbls_, 0, sizeof(hash_tbls_));
-  partition_pool_->clear();
+void PartitionedAggregationNode::close_partitions() {
+    for (int i = 0; i < _hash_partitions.size(); ++i) {
+        _hash_partitions[i]->close(true);
+    }
+    for (list<Partition*>::iterator it = _aggregated_partitions.begin();
+            it != _aggregated_partitions.end(); ++it) {
+        (*it)->close(true);
+    }
+    for (list<Partition*>::iterator it = _spilled_partitions.begin();
+            it != _spilled_partitions.end(); ++it) {
+        (*it)->close(true);
+    }
+    _aggregated_partitions.clear();
+    _spilled_partitions.clear();
+    _hash_partitions.clear();
+    _partition_pool->clear();
 }
 
-//Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
-//  NewAggFnEvaluator::FreeLocalAllocations(agg_fn_evals_);
-//  for (Partition* partition : hash_partitions_) {
-//    if (partition != nullptr) {
-//      NewAggFnEvaluator::FreeLocalAllocations(partition->agg_fn_evals);
-//    }
-//  }
-//  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
-//  return ExecNode::QueryMaintenance(state);
-//}
-
-// Instantiate required templates.
-template Status PartitionedAggregationNode::AppendSpilledRow<false>(
-    Partition*, TupleRow*);
-template Status PartitionedAggregationNode::AppendSpilledRow<true>(Partition*, TupleRow*);
+#if 0
+// Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
+//   for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+//     ExprContext::free_local_allocations(_aggregate_evaluators[i]->input_expr_ctxs());
+//   }
+//   ExprContext::free_local_allocations(_agg_fn_ctxs);
+//   for (int i = 0; i < _hash_partitions.size(); ++i) {
+//     ExprContext::free_local_allocations(_hash_partitions[i]->agg_fn_ctxs);
+//   }
+//   return ExecNode::QueryMaintenance(state);
+// }
+//
+#endif
 
 }
-
diff --git a/be/src/exec/partitioned_aggregation_node.h b/be/src/exec/partitioned_aggregation_node.h
index 4a2f156..bbcc3e0 100644
--- a/be/src/exec/partitioned_aggregation_node.h
+++ b/be/src/exec/partitioned_aggregation_node.h
@@ -15,26 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_EXEC_NEW_PARTITIONED_AGGREGATION_NODE_H
-#define DORIS_BE_SRC_EXEC_NEW_PARTITIONED_AGGREGATION_NODE_H
-
-#include <deque>
+#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H
+#define DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H
 
+#include <functional>
 #include <boost/scoped_ptr.hpp>
 
 #include "exec/exec_node.h"
-#include "exec/new_partitioned_hash_table.h"
-#include "runtime/buffered_tuple_stream3.h"
-#include "runtime/bufferpool/suballocator.h"
+#include "exec/partitioned_hash_table.inline.h"
+#include "runtime/buffered_block_mgr2.h"
+#include "runtime/buffered_tuple_stream2.h"
 #include "runtime/descriptors.h"  // for TupleId
 #include "runtime/mem_pool.h"
 #include "runtime/string_value.h"
 
 namespace doris {
 
-class AggFn;
-class NewAggFnEvaluator;
-class CodegenAnyVal;
+class AggFnEvaluator;
 class RowBatch;
 class RuntimeState;
 struct StringValue;
@@ -42,661 +39,423 @@ class Tuple;
 class TupleDescriptor;
 class SlotDescriptor;
 
-/// Node for doing partitioned hash aggregation.
-/// This node consumes the input (which can be from the child(0) or a spilled partition).
-///  1. Each row is hashed and we pick a dst partition (hash_partitions_).
-///  2. If the dst partition is not spilled, we probe into the partitions hash table
-///  to aggregate/insert the row.
-///  3. If the partition is already spilled, the input row is spilled.
-///  4. When all the input is consumed, we walk hash_partitions_, put the spilled ones
-///  into spilled_partitions_ and the non-spilled ones into aggregated_partitions_.
-///  aggregated_partitions_ contain partitions that are fully processed and the result
-///  can just be returned. Partitions in spilled_partitions_ need to be repartitioned
-///  and we just repeat these steps.
+// Node for doing partitioned hash aggregation.
+// This node consumes the input (which can be from the child(0) or a spilled partition).
+//  1. Each row is hashed and we pick a dst partition (_hash_partitions).
+//  2. If the dst partition is not spilled, we probe into the partitions hash table
+//  to aggregate/insert the row.
+//  3. If the partition is already spilled, the input row is spilled.
+//  4. When all the input is consumed, we walk _hash_partitions, put the spilled ones
+//  into _spilled_partitions and the non-spilled ones into _aggregated_partitions.
+//  _aggregated_partitions contain partitions that are fully processed and the result
+//  can just be returned. Partitions in _spilled_partitions need to be repartitioned
+//  and we just repeat these steps.
+//
+// Each partition contains these structures:
+// 1) Hash Table for aggregated rows. This contains just the hash table directory
+//    structure but not the rows themselves. This is NULL for spilled partitions when
+//    we stop maintaining the hash table.
+// 2) MemPool for var-len result data for rows in the hash table. If the aggregate
+//    function returns a string, we cannot append it to the tuple stream as that
+//    structure is immutable. Instead, when we need to spill, we sweep and copy the
+//    rows into a tuple stream.
+// 3) Aggregated tuple stream for rows that are/were in the hash table. This stream
+//    contains rows that are aggregated. When the partition is not spilled, this stream
+//    is pinned and contains the memory referenced by the hash table.
+//    In the case where the aggregate function does not return a string (meaning the
+//    size of all the slots is known when the row is constructed), this stream contains
+//    all the memory for the result rows and the MemPool (2) is not used.
+// 4) Unaggregated tuple stream. Stream to spill unaggregated rows.
+//    Rows in this stream always have child(0)'s layout.
+//
+// Buffering: Each stream and hash table needs to maintain at least one buffer for
+// some duration of the processing. To minimize the memory requirements of small queries
+// (i.e. memory usage is less than one IO-buffer per partition), the streams and hash
+// tables of each partition start using small (less than IO-sized) buffers, regardless
+// of the level.
 //
-/// Each partition contains these structures:
-/// 1) Hash Table for aggregated rows. This contains just the hash table directory
-///    structure but not the rows themselves. This is NULL for spilled partitions when
-///    we stop maintaining the hash table.
-/// 2) MemPool for var-len result data for rows in the hash table. If the aggregate
-///    function returns a string, we cannot append it to the tuple stream as that
-///    structure is immutable. Instead, when we need to spill, we sweep and copy the
-///    rows into a tuple stream.
-/// 3) Aggregated tuple stream for rows that are/were in the hash table. This stream
-///    contains rows that are aggregated. When the partition is not spilled, this stream
-///    is pinned and contains the memory referenced by the hash table.
-///    In the case where the aggregate function does not return a string (meaning the
-///    size of all the slots is known when the row is constructed), this stream contains
-///    all the memory for the result rows and the MemPool (2) is not used.
-/// 4) Unaggregated tuple stream. Stream to spill unaggregated rows.
-///    Rows in this stream always have child(0)'s layout.
-///
-/// Buffering: Each stream and hash table needs to maintain at least one buffer for
-/// some duration of the processing. To minimize the memory requirements of small queries
-/// (i.e. memory usage is less than one IO-buffer per partition), the streams and hash
-/// tables of each partition start using small (less than IO-sized) buffers, regardless
-/// of the level.
-///
-/// Two-phase aggregation: we support two-phase distributed aggregations, where
-/// pre-aggregrations attempt to reduce the size of data before shuffling data across the
-/// network to be merged by the merge aggregation node. This exec node supports a
-/// streaming mode for pre-aggregations where it maintains a hash table of aggregated
-/// rows, but can pass through unaggregated rows (after transforming them into the
-/// same tuple format as aggregated rows) when a heuristic determines that it is better
-/// to send rows across the network instead of consuming additional memory and CPU
-/// resources to expand its hash table. The planner decides whether a given
-/// pre-aggregation should use the streaming preaggregation algorithm or the same
-/// blocking aggregation algorithm as used in merge aggregations.
-/// TODO: make this less of a heuristic by factoring in the cost of the exchange vs the
-/// cost of the pre-aggregation.
-///
-/// If there are no grouping expressions, there is only a single output row for both
-/// preaggregations and merge aggregations. This case is handled separately to avoid
-/// building hash tables. There is also no need to do streaming preaggregations.
-///
-/// Handling memory pressure: the node uses two different strategies for responding to
-/// memory pressure, depending on whether it is a streaming pre-aggregation or not. If
-/// the node is a streaming preaggregation, it stops growing its hash table further by
-/// converting unaggregated rows into the aggregated tuple format and passing them
-/// through. If the node is not a streaming pre-aggregation, it responds to memory
-/// pressure by spilling partitions to disk.
-///
-/// TODO: Buffer rows before probing into the hash table?
-/// TODO: After spilling, we can still maintain a very small hash table just to remove
-/// some number of rows (from likely going to disk).
-/// TODO: Consider allowing to spill the hash table structure in addition to the rows.
-/// TODO: Do we want to insert a buffer before probing into the partition's hash table?
-/// TODO: Use a prefetch/batched probe interface.
-/// TODO: Return rows from the aggregated_row_stream rather than the HT.
-/// TODO: Think about spilling heuristic.
-/// TODO: When processing a spilled partition, we have a lot more information and can
-/// size the partitions/hash tables better.
-/// TODO: Start with unpartitioned (single partition) and switch to partitioning and
-/// spilling only if the size gets large, say larger than the LLC.
-/// TODO: Simplify or cleanup the various uses of agg_fn_ctx, agg_fn_ctx_, and ctx.
-/// There are so many contexts in use that a plain "ctx" variable should never be used.
-/// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this.
-/// TODO: support an Init() method with an initial value in the UDAF interface.
+// TODO: Buffer rows before probing into the hash table?
+// TODO: After spilling, we can still maintain a very small hash table just to remove
+// some number of rows (from likely going to disk).
+// TODO: Consider allowing to spill the hash table structure in addition to the rows.
+// TODO: Do we want to insert a buffer before probing into the partition's hash table?
+// TODO: Use a prefetch/batched probe interface.
+// TODO: Return rows from the aggregated_row_stream rather than the HT.
+// TODO: Think about spilling heuristic.
+// TODO: When processing a spilled partition, we have a lot more information and can
+// size the partitions/hash tables better.
+// TODO: Start with unpartitioned (single partition) and switch to partitioning and
+// spilling only if the size gets large, say larger than the LLC.
+// TODO: Simplify or cleanup the various uses of agg_fn_ctx, _agg_fn_ctx, and ctx.
+// There are so many contexts in use that a plain "ctx" variable should never be used.
+// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this.
 class PartitionedAggregationNode : public ExecNode {
- public:
-  
-  PartitionedAggregationNode(ObjectPool* pool,
-      const TPlanNode& tnode, const DescriptorTbl& descs);
-
-  virtual Status init(const TPlanNode& tnode, RuntimeState* state);
-  virtual Status prepare(RuntimeState* state);
-//  virtual void Codegen(RuntimeState* state);
-  virtual Status open(RuntimeState* state);
-  virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status reset(RuntimeState* state);
-  virtual Status close(RuntimeState* state);
-
- protected:
-  /// Frees local allocations from aggregate_evals_ and agg_fn_evals
-//  virtual Status QueryMaintenance(RuntimeState* state);
-  virtual std::string DebugString(int indentation_level) const;
-  virtual void DebugString(int indentation_level, std::stringstream* out) const;
-
- private:
-  struct Partition;
-
-  /// Number of initial partitions to create. Must be a power of 2.
-  static const int PARTITION_FANOUT = 16;
-
-  /// Needs to be the log(PARTITION_FANOUT).
-  /// We use the upper bits to pick the partition and lower bits in the HT.
-  /// TODO: different hash functions here too? We don't need that many bits to pick
-  /// the partition so this might be okay.
-  static const int NUM_PARTITIONING_BITS = 4;
-
-  /// Maximum number of times we will repartition. The maximum build table we can process
-  /// (if we have enough scratch disk space) in case there is no skew is:
-  ///  MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH).
-  /// In the case where there is skew, repartitioning is unlikely to help (assuming a
-  /// reasonable hash function).
-  /// Note that we need to have at least as many SEED_PRIMES in NewPartitionedHashTableCtx.
-  /// TODO: we can revisit and try harder to explicitly detect skew.
-  static const int MAX_PARTITION_DEPTH = 16;
-
-  /// Default initial number of buckets in a hash table.
-  /// TODO: rethink this ?
-  static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
-
-  /// Codegen doesn't allow for automatic Status variables because then exception
-  /// handling code is needed to destruct the Status, and our function call substitution
-  /// doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by
-  /// placing the Status here so exceptions won't need to destruct it.
-  /// TODO: fix IMPALA-1948 and remove this.
-  Status process_batch_status_;
-
-  /// Tuple into which Update()/Merge()/Serialize() results are stored.
-  TupleId intermediate_tuple_id_;
-  TupleDescriptor* intermediate_tuple_desc_;
-
-  /// Row with the intermediate tuple as its only tuple.
-  /// Construct a new row desc for preparing the build exprs because neither the child's
-  /// nor this node's output row desc may contain the intermediate tuple, e.g.,
-  /// in a single-node plan with an intermediate tuple different from the output tuple.
-  /// Lives in the query state's obj_pool.
-  RowDescriptor intermediate_row_desc_;
-
-  /// Tuple into which Finalize() results are stored. Possibly the same as
-  /// the intermediate tuple.
-  TupleId output_tuple_id_;
-  TupleDescriptor* output_tuple_desc_;
-
-  /// Certain aggregates require a finalize step, which is the final step of the
-  /// aggregate after consuming all input rows. The finalize step converts the aggregate
-  /// value into its final form. This is true if this node contains aggregate that
-  /// requires a finalize step.
-  const bool needs_finalize_;
-
-    /// True if this is first phase of a two-phase distributed aggregation for which we
-    /// are doing a streaming preaggregation.
-    bool is_streaming_preagg_;
-
-  /// True if any of the evaluators require the serialize step.
-  bool needs_serialize_;
-
-  /// The list of all aggregate operations for this exec node.
-  std::vector<AggFn*> agg_fns_;
-
-  /// Evaluators for each aggregate function. If this is a grouping aggregation, these
-  /// evaluators are only used to create cloned per-partition evaluators. The cloned
-  /// evaluators are then used to evaluate the functions. If this is a non-grouping
-  /// aggregation these evaluators are used directly to evaluate the functions.
-  ///
-  /// Permanent and result allocations for these allocators are allocated from
-  /// 'expr_perm_pool_' and 'expr_results_pool_' respectively.
-  std::vector<NewAggFnEvaluator*> agg_fn_evals_;
-  boost::scoped_ptr<MemPool> agg_fn_pool_;
-
-  /// Exprs used to evaluate input rows
-  std::vector<Expr*> grouping_exprs_;
-
-  /// Exprs used to insert constructed aggregation tuple into the hash table.
-  /// All the exprs are simply SlotRefs for the intermediate tuple.
-  std::vector<Expr*> build_exprs_;
-
-  /// Exprs used to evaluate input rows
-  /// TODO (pengyubing) Is this variable useful?
-  std::vector<ExprContext*> grouping_expr_ctxs_;
-
-  /// Indices of grouping exprs with var-len string types in grouping_expr_ctxs_. We need
-  /// to do more work for var-len expressions when allocating and spilling rows. All
-  /// var-len grouping exprs have type string.
-  std::vector<int> string_grouping_exprs_;
-
-  RuntimeState* state_;
-  /// Allocator for hash table memory.
-  boost::scoped_ptr<Suballocator> ht_allocator_;
-  /// MemPool used to allocate memory for when we don't have grouping and don't initialize
-  /// the partitioning structures, or during Close() when creating new output tuples.
-  /// For non-grouping aggregations, the ownership of the pool's memory is transferred
-  /// to the output batch on eos. The pool should not be Reset() to allow amortizing
-  /// memory allocation over a series of Reset()/Open()/GetNext()* calls.
-  boost::scoped_ptr<MemPool> mem_pool_;
-
-  // MemPool for allocations made by copying expr results
-  boost::scoped_ptr<MemPool> expr_results_pool_;
-
-  /// The current partition and iterator to the next row in its hash table that we need
-  /// to return in GetNext()
-  Partition* output_partition_;
-  NewPartitionedHashTable::Iterator output_iterator_;
-
-  typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, RowBatch*);
-  /// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is disabled.
-  ProcessBatchNoGroupingFn process_batch_no_grouping_fn_;
-
-  typedef Status (*ProcessBatchFn)(
-      PartitionedAggregationNode*, RowBatch*, NewPartitionedHashTableCtx*);
-  /// Jitted ProcessBatch function pointer. Null if codegen is disabled.
-  ProcessBatchFn process_batch_fn_;
-
-  typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool,
-      RowBatch*, RowBatch*, NewPartitionedHashTableCtx*, int[PARTITION_FANOUT]);
-  /// Jitted ProcessBatchStreaming function pointer.  Null if codegen is disabled.
-  ProcessBatchStreamingFn process_batch_streaming_fn_;
-
-  /// Time spent processing the child rows
-  RuntimeProfile::Counter* build_timer_;
-
-  /// Total time spent resizing hash tables.
-  RuntimeProfile::Counter* ht_resize_timer_;
-
-  /// Time spent returning the aggregated rows
-  RuntimeProfile::Counter* get_results_timer_;
-
-  /// Total number of hash buckets across all partitions.
-  RuntimeProfile::Counter* num_hash_buckets_;
-
-  /// Total number of partitions created.
-  RuntimeProfile::Counter* partitions_created_;
-
-  /// Level of max partition (i.e. number of repartitioning steps).
-  RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
-
-  /// Number of rows that have been repartitioned.
-  RuntimeProfile::Counter* num_row_repartitioned_;
-
-  /// Number of partitions that have been repartitioned.
-  RuntimeProfile::Counter* num_repartitions_;
-
-  /// Number of partitions that have been spilled.
-  RuntimeProfile::Counter* num_spilled_partitions_;
-
-  /// The largest fraction after repartitioning. This is expected to be
-  /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
-  RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
-
-  /// Time spent in streaming preagg algorithm.
-  RuntimeProfile::Counter* streaming_timer_;
-
-  /// The number of rows passed through without aggregation.
-  RuntimeProfile::Counter* num_passthrough_rows_;
-
-  /// The estimated reduction of the preaggregation.
-  RuntimeProfile::Counter* preagg_estimated_reduction_;
-
-  /// Expose the minimum reduction factor to continue growing the hash tables.
-  RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_;
-
-  /// The estimated number of input rows from the planner.
-  int64_t estimated_input_cardinality_;
-
-  /////////////////////////////////////////
-  /// BEGIN: Members that must be Reset()
-
-  /// Result of aggregation w/o GROUP BY.
-  /// Note: can be NULL even if there is no grouping if the result tuple is 0 width
-  /// e.g. select 1 from table group by col.
-  Tuple* singleton_output_tuple_;
-  bool singleton_output_tuple_returned_;
-
-  /// Row batch used as argument to GetNext() for the child node preaggregations. Store
-  /// in node to avoid reallocating for every GetNext() call when streaming.
-  boost::scoped_ptr<RowBatch> child_batch_;
-
-  /// If true, no more rows to output from partitions.
-  bool partition_eos_;
-
-  /// True if no more rows to process from child.
-  bool child_eos_;
-
-  /// Used for hash-related functionality, such as evaluating rows and calculating hashes.
-  /// It also owns the evaluators for the grouping and build expressions used during hash
-  /// table insertion and probing.
-  boost::scoped_ptr<NewPartitionedHashTableCtx> ht_ctx_;
-
-  /// Object pool that holds the Partition objects in hash_partitions_.
-  boost::scoped_ptr<ObjectPool> partition_pool_;
-
-  /// Current partitions we are partitioning into. IMPALA-5788: For the case where we
-  /// rebuild a spilled partition that fits in memory, all pointers in this vector will
-  /// point to a single in-memory partition.
-  std::vector<Partition*> hash_partitions_;
-
-  /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we
-  /// rebuild a spilled partition that fits in memory, all pointers in this array will
-  /// point to the hash table that is a part of a single in-memory partition.
-  NewPartitionedHashTable* hash_tbls_[PARTITION_FANOUT];
-
-  /// All partitions that have been spilled and need further processing.
-  std::deque<Partition*> spilled_partitions_;
-
-  /// All partitions that are aggregated and can just return the results in GetNext().
-  /// After consuming all the input, hash_partitions_ is split into spilled_partitions_
-  /// and aggregated_partitions_, depending on if it was spilled or not.
-  std::deque<Partition*> aggregated_partitions_;
-
-  /// END: Members that must be Reset()
-  /////////////////////////////////////////
-
-  /// The hash table and streams (aggregated and unaggregated) for an individual
-  /// partition. The streams of each partition always (i.e. regardless of level)
-  /// initially use small buffers. Streaming pre-aggregations do not spill and do not
-  /// require an unaggregated stream.
-  struct Partition {
-    Partition(PartitionedAggregationNode* parent, int level, int idx)
-      : parent(parent), is_closed(false), level(level), idx(idx) {}
-
-    ~Partition();
-
-    /// Initializes aggregated_row_stream and unaggregated_row_stream (if a spilling
-    /// aggregation), allocating one buffer for each. Spilling merge aggregations must
-    /// have enough reservation for the initial buffer for the stream, so this should
-    /// not fail due to OOM. Preaggregations do not reserve any buffers: if does not
-    /// have enough reservation for the initial buffer, the aggregated row stream is not
-    /// created and an OK status is returned.
-    Status InitStreams();
-
-    /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL.
-    /// Sets 'got_memory' to true if the hash table was initialised or false on OOM.
-    Status InitHashTable(bool* got_memory);
-
-    /// Called in case we need to serialize aggregated rows. This step effectively does
-    /// a merge aggregation in this node.
-    Status SerializeStreamForSpilling();
-
-    /// Closes this partition. If finalize_rows is true, this iterates over all rows
-    /// in aggregated_row_stream and finalizes them (this is only used in the cancellation
-    /// path).
-    void Close(bool finalize_rows);
-
-    /// Spill this partition. 'more_aggregate_rows' = true means that more aggregate rows
-    /// may be appended to the the partition before appending unaggregated rows. On
-    /// success, one of the streams is left with a write iterator: the aggregated stream
-    /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise.
-    Status Spill(bool more_aggregate_rows);
-
-    bool is_spilled() const { return hash_tbl.get() == NULL; }
-
-    PartitionedAggregationNode* parent;
-
-    /// If true, this partition is closed and there is nothing left to do.
-    bool is_closed;
-
-    /// How many times rows in this partition have been repartitioned. Partitions created
-    /// from the node's children's input is level 0, 1 after the first repartitionining,
-    /// etc.
-    const int level;
-
-    /// The index of this partition within 'hash_partitions_' at its level.
-    const int idx;
-
-    /// Hash table for this partition.
-    /// Can be NULL if this partition is no longer maintaining a hash table (i.e.
-    /// is spilled or we are passing through all rows for this partition).
-    boost::scoped_ptr<NewPartitionedHashTable> hash_tbl;
-
-    /// Clone of parent's agg_fn_evals_. Permanent allocations come from
-    /// 'agg_fn_perm_pool' and result allocations come from the ExecNode's
-    /// 'expr_results_pool_'.
-    std::vector<NewAggFnEvaluator*> agg_fn_evals;
-    boost::scoped_ptr<MemPool> agg_fn_pool;
-
-    /// Tuple stream used to store aggregated rows. When the partition is not spilled,
-    /// (meaning the hash table is maintained), this stream is pinned and contains the
-    /// memory referenced by the hash table. When it is spilled, this consumes reservation
-    /// for a write buffer only during repartitioning of aggregated rows.
-    ///
-    /// For streaming preaggs, this may be NULL if sufficient memory is not available.
-    /// In that case hash_tbl is also NULL and all rows for the partition will be passed
-    /// through.
-    boost::scoped_ptr<BufferedTupleStream3> aggregated_row_stream;
-
-    /// Unaggregated rows that are spilled. Always NULL for streaming pre-aggregations.
-    /// Always unpinned. Has a write buffer allocated when the partition is spilled and
-    /// unaggregated rows are being processed.
-    boost::scoped_ptr<BufferedTupleStream3> unaggregated_row_stream;
-  };
-
-  /// Stream used to store serialized spilled rows. Only used if needs_serialize_
-  /// is set. This stream is never pinned and only used in Partition::Spill as a
-  /// a temporary buffer.
-  boost::scoped_ptr<BufferedTupleStream3> serialize_stream_;
-
-  /// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
-  NewPartitionedHashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
-    NewPartitionedHashTable* ht = hash_tbls_[partition_idx];
-    DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get());
-    return ht;
-  }
-
-  /// Materializes 'row_batch' in either grouping or non-grouping case.
-  Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
-
-  /// Helper function called by GetNextInternal() to ensure that string data referenced in
-  /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the
-  /// first row that should be processed in 'row_batch'.
-  Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx);
-
-  /// Copies string data from the specified slot into 'pool', and sets the StringValues'
-  /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from
-  /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type.
-  Status CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch,
-      int first_row_idx, MemPool* pool);
-
-  /// Constructs singleton output tuple, allocating memory from pool.
-  Tuple* ConstructSingletonOutputTuple(
-      const std::vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool);
-
-  /// Copies grouping values stored in 'ht_ctx_' that were computed over 'current_row_'
-  /// using 'grouping_expr_evals_'. Aggregation expr slots are set to their initial
-  /// values. Returns NULL if there was not enough memory to allocate the tuple or errors
-  /// occurred. In which case, 'status' is set. Allocates tuple and var-len data for
-  /// grouping exprs from stream. Var-len data for aggregate exprs is allocated from the
-  /// FunctionContexts, so is stored outside the stream. If stream's small buffers get
-  /// full, it will attempt to switch to IO-buffers.
-  Tuple* ConstructIntermediateTuple(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
-      BufferedTupleStream3* stream, Status* status);
-
-  /// Constructs intermediate tuple, allocating memory from pool instead of the stream.
-  /// Returns NULL and sets status if there is not enough memory to allocate the tuple.
-  Tuple* ConstructIntermediateTuple(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
-      MemPool* pool, Status* status);
-
-  /// Returns the number of bytes of variable-length data for the grouping values stored
-  /// in 'ht_ctx_'.
-  int GroupingExprsVarlenSize();
-
-  /// Initializes intermediate tuple by copying grouping values stored in 'ht_ctx_' that
-  /// that were computed over 'current_row_' using 'grouping_expr_evals_'. Writes the
-  /// var-len data into buffer. 'buffer' points to the start of a buffer of at least the
-  /// size of the variable-length data: 'varlen_size'.
-  void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size);
-
-  /// Initializes the aggregate function slots of an intermediate tuple.
-  /// Any var-len data is allocated from the FunctionContexts.
-  void InitAggSlots(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
-      Tuple* intermediate_tuple);
-
-  /// Updates the given aggregation intermediate tuple with aggregation values computed
-  /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls Update() or
-  /// Merge() is controlled by the evaluator itself, unless enforced explicitly by passing
-  /// in is_merge == true.  The override is needed to merge spilled and non-spilled rows
-  /// belonging to the same partition independent of whether the agg fn evaluators have
-  /// is_merge() == true.
-  /// This function is replaced by codegen (which is why we don't use a vector argument
-  /// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts.
-  void UpdateTuple(NewAggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row,
-      bool is_merge = false);
-
-  /// Called on the intermediate tuple of each group after all input rows have been
-  /// consumed and aggregated. Computes the final aggregate values to be returned in
-  /// GetNext() using the agg fn evaluators' Serialize() or Finalize().
-  /// For the Finalize() case if the output tuple is different from the intermediate
-  /// tuple, then a new tuple is allocated from 'pool' to hold the final result.
-  /// Grouping values are copied into the output tuple and the the output tuple holding
-  /// the finalized/serialized aggregate values is returned.
-  /// TODO: Coordinate the allocation of new tuples with the release of memory
-  /// so as not to make memory consumption blow up.
-  Tuple* GetOutputTuple(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
-      Tuple* tuple, MemPool* pool);
-
-  /// Do the aggregation for all tuple rows in the batch when there is no grouping.
-  /// This function is replaced by codegen.
-  Status ProcessBatchNoGrouping(RowBatch* batch);
-
-  /// Processes a batch of rows. This is the core function of the algorithm. We partition
-  /// the rows into hash_partitions_, spilling as necessary.
-  /// If AGGREGATED_ROWS is true, it means that the rows in the batch are already
-  /// pre-aggregated.
-  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
-  ///     hash table buckets will be prefetched based on the hash values computed. Note
-  ///     that 'prefetch_mode' will be substituted with constants during codegen time.
-  //
-  /// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for
-  /// performance.
-  template<bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, NewPartitionedHashTableCtx* ht_ctx);
-
-  /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in
-  /// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on
-  /// the capacity of the cache. 'prefetch_mode' specifies the prefetching mode in use.
-  /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes will be
-  /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant.
-  template<bool AGGREGATED_ROWS>
-  void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx, NewPartitionedHashTableCtx* ht_ctx);
-
-  /// This function processes each individual row in ProcessBatch(). Must be inlined into
-  /// ProcessBatch for codegen to substitute function calls with codegen'd versions.
-  /// May spill partitions if not enough memory is available.
-  template <bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, NewPartitionedHashTableCtx* ht_ctx);
-
-  /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is
-  /// the context for the partition's hash table and hash is the precomputed hash of
-  /// the row. The row can be an unaggregated or aggregated row depending on
-  /// AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate
-  /// tuple to the partition's stream. Must be inlined into ProcessBatch for codegen
-  /// to substitute function calls with codegen'd versions.  insert_it is an iterator
-  /// for insertion returned from NewPartitionedHashTable::FindBuildRowBucket().
-  template<bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition,
-      TupleRow* row, uint32_t hash, NewPartitionedHashTable::Iterator insert_it);
-
-  /// Append a row to a spilled partition. May spill partitions if needed to switch to
-  /// I/O buffers. Selects the correct stream according to the argument. Inlined into
-  /// ProcessBatch().
-  template<bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE AppendSpilledRow(Partition* partition, TupleRow* row);
-
-  /// Reads all the rows from input_stream and process them by calling ProcessBatch().
-  template<bool AGGREGATED_ROWS>
-  Status ProcessStream(BufferedTupleStream3* input_stream);
-
-  /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
-  void GetSingletonOutput(RowBatch* row_batch);
-
-  /// Get rows for the next rowbatch from the next partition. Sets 'partition_eos_' to
-  /// true if all rows from all partitions have been returned or the limit is reached.
-  Status GetRowsFromPartition(RuntimeState* state, RowBatch* row_batch);
-
-  /// Get output rows from child for streaming pre-aggregation. Aggregates some rows with
-  /// hash table and passes through other rows converted into the intermediate
-  /// tuple format. Sets 'child_eos_' once all rows from child have been returned.
-  Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch);
-
-  /// Return true if we should keep expanding hash tables in the preagg. If false,
-  /// the preagg should pass through any rows it can't fit in its tables.
-  bool ShouldExpandPreaggHashTables() const;
-
-  /// Streaming processing of in_batch from child. Rows from child are either aggregated
-  /// into the hash table or added to 'out_batch' in the intermediate tuple format.
-  /// 'in_batch' is processed entirely, and 'out_batch' must have enough capacity to
-  /// store all of the rows in 'in_batch'.
-  /// 'needs_serialize' is an argument so that codegen can replace it with a constant,
-  ///     rather than using the member variable 'needs_serialize_'.
-  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
-  ///     hash table buckets will be prefetched based on the hash values computed. Note
-  ///     that 'prefetch_mode' will be substituted with constants during codegen time.
-  /// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the number of
-  ///     additional rows that can be added to the hash table per partition. It is updated
-  ///     by ProcessBatchStreaming() when it inserts new rows.
-  /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser.
-  Status ProcessBatchStreaming(bool needs_serialize,
-      RowBatch* in_batch, RowBatch* out_batch, NewPartitionedHashTableCtx* ht_ctx,
-      int remaining_capacity[PARTITION_FANOUT]);
-
-  /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming
-  /// aggregation. The input row must have been evaluated with 'ht_ctx', with 'hash' set
-  /// to the corresponding hash. If the tuple already exists in the hash table, update
-  /// the tuple and return true. Otherwise try to create a new entry in the hash table,
-  /// returning true if successful or false if the table is full. 'remaining_capacity'
-  /// keeps track of how many more entries can be added to the hash table so we can avoid
-  /// retrying inserts. It is decremented if an insert succeeds and set to zero if an
-  /// insert fails. If an error occurs, returns false and sets 'status'.
-  bool IR_ALWAYS_INLINE TryAddToHashTable(NewPartitionedHashTableCtx* ht_ctx,
-      Partition* partition, NewPartitionedHashTable* hash_tbl, TupleRow* in_row, uint32_t hash,
-      int* remaining_capacity, Status* status);
-
-  /// Initializes hash_partitions_. 'level' is the level for the partitions to create.
-  /// If 'single_partition_idx' is provided, it must be a number in range
-  /// [0, PARTITION_FANOUT), and only that partition is created - all others point to it.
-  /// Also sets ht_ctx_'s level to 'level'.
-  Status CreateHashPartitions(int level, int single_partition_idx = -1);
-
-  /// Ensure that hash tables for all in-memory partitions are large enough to fit
-  /// 'num_rows' additional hash table entries. If there is not enough memory to
-  /// resize the hash tables, may spill partitions. 'aggregated_rows' is true if
-  /// we're currently partitioning aggregated rows.
-  Status CheckAndResizeHashPartitions(bool aggregated_rows, int num_rows, const NewPartitionedHashTableCtx* ht_ctx);
-
-  /// Prepares the next partition to return results from. On return, this function
-  /// initializes output_iterator_ and output_partition_. This either removes
-  /// a partition from aggregated_partitions_ (and is done) or removes the next
-  /// partition from aggregated_partitions_ and repartitions it.
-  Status NextPartition();
-
-  /// Tries to build the first partition in 'spilled_partitions_'.
-  /// If successful, set *built_partition to the partition. The caller owns the partition
-  /// and is responsible for closing it. If unsuccessful because the partition could not
-  /// fit in memory, set *built_partition to NULL and append the spilled partition to the
-  /// head of 'spilled_partitions_' so it can be processed by
-  /// RepartitionSpilledPartition().
-  Status BuildSpilledPartition(Partition** built_partition);
-
-  /// Repartitions the first partition in 'spilled_partitions_' into PARTITION_FANOUT
-  /// output partitions. On success, each output partition is either:
-  /// * closed, if no rows were added to the partition.
-  /// * in 'spilled_partitions_', if the partition spilled.
-  /// * in 'aggregated_partitions_', if the output partition was not spilled.
-  Status RepartitionSpilledPartition();
-
-  /// Picks a partition from 'hash_partitions_' to spill. 'more_aggregate_rows' is passed
-  /// to Partition::Spill() when spilling the partition. See the Partition::Spill()
-  /// comment for further explanation.
-  Status SpillPartition(bool more_aggregate_rows);
-
-  /// Moves the partitions in hash_partitions_ to aggregated_partitions_ or
-  /// spilled_partitions_. Partitions moved to spilled_partitions_ are unpinned.
-  /// input_rows is the number of input rows that have been repartitioned.
-  /// Used for diagnostics.
-  Status MoveHashPartitions(int64_t input_rows);
-
-  /// Adds a partition to the front of 'spilled_partitions_' for later processing.
-  /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed
-  /// first). This allows us to delete pages earlier and bottom out the recursion
-  /// earlier and also improves time locality of access to spilled data on disk.
-  void PushSpilledPartition(Partition* partition);
-
-  /// Calls Close() on every Partition in 'aggregated_partitions_',
-  /// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists,
-  /// the vector and the partition pool.
-  void ClosePartitions();
-
-  /// Calls finalizes on all tuples starting at 'it'.
-  void CleanupHashTbl(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
-      NewPartitionedHashTable::Iterator it);
-
-    /// Compute minimum buffer reservation for grouping aggregations.
-    /// We need one buffer per partition, which is used either as the write buffer for the
-    /// aggregated stream or the unaggregated stream. We need an additional buffer to read
-    /// the stream we are currently repartitioning. The read buffer needs to be a max-sized
-    /// buffer to hold a max-sized row and we need one max-sized write buffer that is used
-    /// temporarily to append a row to any stream.
-    ///
-    /// If we need to serialize, we need an additional buffer while spilling a partition
-    /// as the partitions aggregate stream needs to be serialized and rewritten.
-    /// We do not spill streaming preaggregations, so we do not need to reserve any buffers.
-  int64_t MinReservation() const {
-      //DCHECK(!grouping_exprs_.empty());
-      // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe.
-      //if (is_streaming_preagg_) {
-          // Reserve at least one buffer and a 64kb hash table per partition.
-      //    return (_resource_profile.spillable_buffer_size + 64 * 1024) * PARTITION_FANOUT;
-      //}
-      //int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
-      // Two of the buffers must fit the maximum row.
-      //return _resource_profile.spillable_buffer_size * (num_buffers - 2) +
-      //_resource_profile.max_row_buffer_size * 2;
-      return 0;
-  }
+public:
+    PartitionedAggregationNode(ObjectPool* pool,
+            const TPlanNode& tnode, const DescriptorTbl& descs);
+    // a null dtor to pass codestyle check
+    virtual ~PartitionedAggregationNode() {}
+
+    virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
+    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state);
+    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
+    virtual Status reset(RuntimeState* state);
+    // virtual void close(RuntimeState* state);
+    virtual Status close(RuntimeState* state);
+
+protected:
+    // Frees local allocations from _aggregate_evaluators and agg_fn_ctxs
+    // virtual Status QueryMaintenance(RuntimeState* state);
+
+    virtual void debug_string(int indentation_level, std::stringstream* out) const;
+
+private:
+    struct Partition;
+
+    // Number of initial partitions to create. Must be a power of 2.
+    static const int PARTITION_FANOUT = 16;
+
+    // Needs to be the log(PARTITION_FANOUT).
+    // We use the upper bits to pick the partition and lower bits in the HT.
+    // TODO: different hash functions here too? We don't need that many bits to pick
+    // the partition so this might be okay.
+    static const int NUM_PARTITIONING_BITS = 4;
+
+    // Maximum number of times we will repartition. The maximum build table we can process
+    // (if we have enough scratch disk space) in case there is no skew is:
+    //  MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH).
+    // In the case where there is skew, repartitioning is unlikely to help (assuming a
+    // reasonable hash function).
+    // Note that we need to have at least as many SEED_PRIMES in PartitionedHashTableCtx.
+    // TODO: we can revisit and try harder to explicitly detect skew.
+    static const int MAX_PARTITION_DEPTH = 16;
+
+    // Codegen doesn't allow for automatic Status variables because then exception
+    // handling code is needed to destruct the Status, and our function call substitution
+    // doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by
+    // placing the Status here so exceptions won't need to destruct it.
+    // TODO: fix IMPALA-1948 and remove this.
+    Status _process_batch_status;
+
+    // Tuple into which Update()/Merge()/Serialize() results are stored.
+    TupleId _intermediate_tuple_id;
+    TupleDescriptor* _intermediate_tuple_desc;
+
+    // Row with the intermediate tuple as its only tuple.
+    boost::scoped_ptr<RowDescriptor> _intermediate_row_desc;
+
+    // Tuple into which Finalize() results are stored. Possibly the same as
+    // the intermediate tuple.
+    TupleId _output_tuple_id;
+    TupleDescriptor* _output_tuple_desc;
+
+    // Certain aggregates require a finalize step, which is the final step of the
+    // aggregate after consuming all input rows. The finalize step converts the aggregate
+    // value into its final form. This is true if this node contains aggregate that
+    // requires a finalize step.
+    const bool _needs_finalize;
+
+    // Contains any evaluators that require the serialize step.
+    bool _needs_serialize;
+
+    std::vector<AggFnEvaluator*> _aggregate_evaluators;
+
+    // FunctionContext for each aggregate function and backing MemPool. String data
+    // returned by the aggregate functions is allocated via these contexts.
+    // These contexts are only passed to the evaluators in the non-partitioned
+    // (non-grouping) case. Otherwise they are only used to clone FunctionContexts for the
+    // partitions.
+    // TODO: we really need to plumb through CHAR(N) for intermediate types.
+    std::vector<doris_udf::FunctionContext*> _agg_fn_ctxs;
+    boost::scoped_ptr<MemPool> _agg_fn_pool;
+
+    // Exprs used to evaluate input rows
+    std::vector<ExprContext*> _probe_expr_ctxs;
+
+    // Exprs used to insert constructed aggregation tuple into the hash table.
+    // All the exprs are simply SlotRefs for the intermediate tuple.
+    std::vector<ExprContext*> _build_expr_ctxs;
+
+    // True if the resulting tuple contains var-len agg/grouping values. This
+    // means we need to do more work when allocating and spilling these rows.
+    bool _contains_var_len_grouping_exprs;
+
+    RuntimeState* _state;
+    BufferedBlockMgr2::Client* _block_mgr_client;
+
+    // MemPool used to allocate memory for when we don't have grouping and don't initialize
+    // the partitioning structures, or during close() when creating new output tuples.
+    // For non-grouping aggregations, the ownership of the pool's memory is transferred
+    // to the output batch on eos. The pool should not be Reset() to allow amortizing
+    // memory allocation over a series of Reset()/open()/get_next()* calls.
+    boost::scoped_ptr<MemPool> _mem_pool;
+
+    // The current partition and iterator to the next row in its hash table that we need
+    // to return in get_next()
+    Partition* _output_partition;
+    PartitionedHashTable::Iterator _output_iterator;
+
+    typedef Status (*ProcessRowBatchFn)(
+            PartitionedAggregationNode*, RowBatch*, PartitionedHashTableCtx*);
+    // Jitted ProcessRowBatch function pointer.  Null if codegen is disabled.
+    ProcessRowBatchFn _process_row_batch_fn;
+
+    // Time spent processing the child rows
+    RuntimeProfile::Counter* _build_timer;
+
+    // Total time spent resizing hash tables.
+    RuntimeProfile::Counter* _ht_resize_timer;
+
+    // Time spent returning the aggregated rows
+    RuntimeProfile::Counter* _get_results_timer;
+
+    // Total number of hash buckets across all partitions.
+    RuntimeProfile::Counter* _num_hash_buckets;
+
+    // Total number of partitions created.
+    RuntimeProfile::Counter* _partitions_created;
+
+    // Level of max partition (i.e. number of repartitioning steps).
+    // RuntimeProfile::HighWaterMarkCounter* _max_partition_level;
+
+    // Number of rows that have been repartitioned.
+    RuntimeProfile::Counter* _num_row_repartitioned;
+
+    // Number of partitions that have been repartitioned.
+    RuntimeProfile::Counter* _num_repartitions;
+
+    // Number of partitions that have been spilled.
+    RuntimeProfile::Counter* _num_spilled_partitions;
+
+    // The largest fraction after repartitioning. This is expected to be
+    // 1 / PARTITION_FANOUT. A value much larger indicates skew.
+    // RuntimeProfile::HighWaterMarkCounter* _largest_partition_percent;
+
+    ////////////////////////////
+    // BEGIN: Members that must be Reset()
+
+    // Result of aggregation w/o GROUP BY.
+    // Note: can be NULL even if there is no grouping if the result tuple is 0 width
+    // e.g. select 1 from table group by col.
+    Tuple* _singleton_output_tuple;
+    bool _singleton_output_tuple_returned;
+
+    // Used for hash-related functionality, such as evaluating rows and calculating hashes.
+    // TODO: If we want to multi-thread then this context should be thread-local and not
+    // associated with the node.
+    boost::scoped_ptr<PartitionedHashTableCtx> _ht_ctx;
+
+    // Object pool that holds the Partition objects in _hash_partitions.
+    boost::scoped_ptr<ObjectPool> _partition_pool;
+
+    // Current partitions we are partitioning into.
+    std::vector<Partition*> _hash_partitions;
+
+    // All partitions that have been spilled and need further processing.
+    std::list<Partition*> _spilled_partitions;
+
+    // All partitions that are aggregated and can just return the results in get_next().
+    // After consuming all the input, _hash_partitions is split into _spilled_partitions
+    // and _aggregated_partitions, depending on if it was spilled or not.
+    std::list<Partition*> _aggregated_partitions;
+
+    // END: Members that must be Reset()
+    ////////////////////////////
+
+    // The hash table and streams (aggregated and unaggregated) for an individual
+    // partition. The streams of each partition always (i.e. regardless of level)
+    // initially use small buffers.
+    struct Partition {
+        Partition(PartitionedAggregationNode* parent, int level) :
+                parent(parent), is_closed(false), level(level) {}
+
+        // Initializes aggregated_row_stream and unaggregated_row_stream, reserving
+        // one buffer for each. The buffers backing these streams are reserved, so this
+        // function will not fail with a continuable OOM. If we fail to init these buffers,
+        // the mem limit is too low to run this algorithm.
+        Status init_streams();
+
+        // Initializes the hash table. Returns false on OOM.
+        bool init_hash_table();
+
+        // Called in case we need to serialize aggregated rows. This step effectively does
+        // a merge aggregation in this node.
+        Status clean_up();
+
+        // Closes this partition. If finalize_rows is true, this iterates over all rows
+        // in aggregated_row_stream and finalizes them (this is only used in the cancellation
+        // path).
+        void close(bool finalize_rows);
+
+        // Spills this partition, unpinning streams and cleaning up hash tables as necessary.
+        Status spill();
+
+        bool is_spilled() const {
+            return hash_tbl.get() == NULL;
+        }
+
+        PartitionedAggregationNode* parent;
+
+        // If true, this partition is closed and there is nothing left to do.
+        bool is_closed;
+
+        // How many times rows in this partition have been repartitioned. Partitions created
+        // from the node's children's input is level 0, 1 after the first repartitionining,
+        // etc.
+        const int level;
+
+        // Hash table for this partition.
+        // Can be NULL if this partition is no longer maintaining a hash table (i.e.
+        // is spilled).
+        boost::scoped_ptr<PartitionedHashTable> hash_tbl;
+
+        // Clone of parent's _agg_fn_ctxs and backing MemPool.
+        std::vector<doris_udf::FunctionContext*> agg_fn_ctxs;
+        boost::scoped_ptr<MemPool> agg_fn_pool;
+
+        // Tuple stream used to store aggregated rows. When the partition is not spilled,
+        // (meaning the hash table is maintained), this stream is pinned and contains the
+        // memory referenced by the hash table. When it is spilled, aggregate rows are
+        // just appended to this stream.
+        boost::scoped_ptr<BufferedTupleStream2> aggregated_row_stream;
+
+        // Unaggregated rows that are spilled.
+        boost::scoped_ptr<BufferedTupleStream2> unaggregated_row_stream;
+    };
+
+    // Stream used to store serialized spilled rows. Only used if _needs_serialize
+    // is set. This stream is never pinned and only used in Partition::spill as a
+    // a temporary buffer.
+    boost::scoped_ptr<BufferedTupleStream2> _serialize_stream;
+
+    // Allocates a new aggregation intermediate tuple.
+    // Initialized to grouping values computed over '_current_row' using 'agg_fn_ctxs'.
+    // Aggregation expr slots are set to their initial values.
+    // Pool/Stream specify where the memory (tuple and var len slots) should be allocated
+    // from. Only one can be set.
+    // Returns NULL if there was not enough memory to allocate the tuple or an error
+    // occurred. When returning NULL, sets *status. If 'stream' is set and its small
+    // buffers get full, it will attempt to switch to IO-buffers.
+    Tuple* construct_intermediate_tuple(
+            const std::vector<doris_udf::FunctionContext*>& agg_fn_ctxs,
+            MemPool* pool, BufferedTupleStream2* stream, Status* status);
+
+    // Updates the given aggregation intermediate tuple with aggregation values computed
+    // over 'row' using 'agg_fn_ctxs'. Whether the agg fn evaluator calls Update() or
+    // Merge() is controlled by the evaluator itself, unless enforced explicitly by passing
+    // in is_merge == true.  The override is needed to merge spilled and non-spilled rows
+    // belonging to the same partition independent of whether the agg fn evaluators have
+    // is_merge() == true.
+    // This function is replaced by codegen (which is why we don't use a vector argument
+    // for agg_fn_ctxs).
+    void update_tuple(doris_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row,
+            bool is_merge = false);
+
+    // Called on the intermediate tuple of each group after all input rows have been
+    // consumed and aggregated. Computes the final aggregate values to be returned in
+    // get_next() using the agg fn evaluators' Serialize() or Finalize().
+    // For the Finalize() case if the output tuple is different from the intermediate
+    // tuple, then a new tuple is allocated from 'pool' to hold the final result.
+    // Grouping values are copied into the output tuple and the the output tuple holding
+    // the finalized/serialized aggregate values is returned.
+    // TODO: Coordinate the allocation of new tuples with the release of memory
+    // so as not to make memory consumption blow up.
+    Tuple* get_output_tuple(const std::vector<doris_udf::FunctionContext*>& agg_fn_ctxs,
+            Tuple* tuple, MemPool* pool);
+
+    // Do the aggregation for all tuple rows in the batch when there is no grouping.
+    // The PartitionedHashTableCtx argument is unused, but included so the signature matches that of
+    // process_batch() for codegen. This function is replaced by codegen.
+    Status process_batch_no_grouping(RowBatch* batch, PartitionedHashTableCtx* ht_ctx = NULL);
+
+    // Processes a batch of rows. This is the core function of the algorithm. We partition
+    // the rows into _hash_partitions, spilling as necessary.
+    // If AGGREGATED_ROWS is true, it means that the rows in the batch are already
+    // pre-aggregated.
+    //
+    // This function is replaced by codegen. It's inlined into ProcessBatch_true/false in
+    // the IR module. We pass in _ht_ctx.get() as an argument for performance.
+    template<bool AGGREGATED_ROWS>
+    Status IR_ALWAYS_INLINE process_batch(RowBatch* batch, PartitionedHashTableCtx* ht_ctx);
+
+    // This function processes each individual row in process_batch(). Must be inlined
+    // into process_batch for codegen to substitute function calls with codegen'd versions.
+    template<bool AGGREGATED_ROWS>
+    Status IR_ALWAYS_INLINE process_row(TupleRow* row, PartitionedHashTableCtx* ht_ctx);
+
+    // Create a new intermediate tuple in partition, initialized with row. ht_ctx is
+    // the context for the partition's hash table and hash is the precomputed hash of
+    // the row. The row can be an unaggregated or aggregated row depending on
+    // AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate
+    // tuple to the partition's stream. Must be inlined into process_batch for codegen to
+    // substitute function calls with codegen'd versions. insert_it is an iterator for
+    // insertion returned from PartitionedHashTable::FindOrInsert().
+    template<bool AGGREGATED_ROWS>
+    Status IR_ALWAYS_INLINE add_intermediate_tuple(Partition* partition,
+            PartitionedHashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, PartitionedHashTable::Iterator insert_it);
+
+    // Append a row to a spilled partition. May spill partitions if needed to switch to
+    // I/O buffers. Selects the correct stream according to the argument. Inlined into
+    // process_batch().
+    template<bool AGGREGATED_ROWS>
+    Status IR_ALWAYS_INLINE append_spilled_row(Partition* partition, TupleRow* row);
+
+    // Append a row to a stream of a spilled partition. May spill partitions if needed
+    // to append the row.
+    Status append_spilled_row(BufferedTupleStream2* stream, TupleRow* row);
+
+    // Reads all the rows from input_stream and process them by calling process_batch().
+    template<bool AGGREGATED_ROWS>
+    Status process_stream(BufferedTupleStream2* input_stream);
+
+    // Initializes _hash_partitions. 'level' is the level for the partitions to create.
+    // Also sets _ht_ctx's level to 'level'.
+    Status create_hash_partitions(int level);
+
+    // Ensure that hash tables for all in-memory partitions are large enough to fit
+    // num_rows additional rows.
+    Status check_and_resize_hash_partitions(int num_rows, PartitionedHashTableCtx* ht_ctx);
+
+    // Iterates over all the partitions in _hash_partitions and returns the number of rows
+    // of the largest spilled partition (in terms of number of aggregated and unaggregated
+    // rows).
+    int64_t largest_spilled_partition() const;
+
+    // Prepares the next partition to return results from. On return, this function
+    // initializes _output_iterator and _output_partition. This either removes
+    // a partition from _aggregated_partitions (and is done) or removes the next
+    // partition from _aggregated_partitions and repartitions it.
+    Status next_partition();
+
+    // Picks a partition from _hash_partitions to spill.
+    Status spill_partition();
+
+    // Moves the partitions in _hash_partitions to _aggregated_partitions or
+    // _spilled_partitions. Partitions moved to _spilled_partitions are unpinned.
+    // input_rows is the number of input rows that have been repartitioned.
+    // Used for diagnostics.
+    Status move_hash_partitions(int64_t input_rows);
+
+    // Calls close() on every Partition in '_aggregated_partitions',
+    // '_spilled_partitions', and '_hash_partitions' and then resets the lists,
+    // the vector and the partition pool.
+    void close_partitions();
+
+    // Calls finalizes on all tuples starting at 'it'.
+    void cleanup_hash_tbl(const std::vector<doris_udf::FunctionContext*>& agg_fn_ctxs,
+            PartitionedHashTable::Iterator it);
+
+    // We need two buffers per partition, one for the aggregated stream and one
+    // for the unaggregated stream. We need an additional buffer to read the stream
+    // we are currently repartitioning.
+    // If we need to serialize, we need an additional buffer while spilling a partition
+    // as the partitions aggregate stream needs to be serialized and rewritten.
+    int min_required_buffers() const {
+        return 2 * PARTITION_FANOUT + 1 + (_needs_serialize ? 1 : 0);
+    }
 };
 
-}
-
-#endif
+} // end namespace doris
 
+#endif // DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H
diff --git a/be/src/exec/partitioned_aggregation_node_ir.cc b/be/src/exec/partitioned_aggregation_node_ir.cc
index ad5a52e..e9837d5 100644
--- a/be/src/exec/partitioned_aggregation_node_ir.cc
+++ b/be/src/exec/partitioned_aggregation_node_ir.cc
@@ -17,235 +17,127 @@
 
 #include "exec/partitioned_aggregation_node.h"
 
-#include "exec/new_partitioned_hash_table.inline.h"
-#include "exprs/new_agg_fn_evaluator.h"
-#include "exprs/expr_context.h"
-#include "runtime/buffered_tuple_stream3.inline.h"
+#include "exec/partitioned_hash_table.inline.h"
+#include "runtime/buffered_tuple_stream2.inline.h"
 #include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
-#include "util/runtime_profile.h"
 
-using namespace doris;
+namespace doris {
 
-Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
-  Tuple* output_tuple = singleton_output_tuple_;
-  FOREACH_ROW(batch, 0, batch_iter) {
-    UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.get());
-  }
-  return Status::OK();
+Status PartitionedAggregationNode::process_batch_no_grouping(
+        RowBatch* batch, PartitionedHashTableCtx* ht_ctx) {
+    for (int i = 0; i < batch->num_rows(); ++i) {
+        update_tuple(&_agg_fn_ctxs[0], _singleton_output_tuple, batch->get_row(i));
+    }
+    return Status::OK();
 }
 
 template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
-    NewPartitionedHashTableCtx* ht_ctx) {
-  DCHECK(!hash_partitions_.empty());
-  DCHECK(!is_streaming_preagg_);
-
-  // Make sure that no resizes will happen when inserting individual rows to the hash
-  // table of each partition by pessimistically assuming that all the rows in each batch
-  // will end up to the same partition.
-  // TODO: Once we have a histogram with the number of rows per partition, we will have
-  // accurate resize calls.
-  RETURN_IF_ERROR(CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx));
-
-  NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  const int cache_size = expr_vals_cache->capacity();
-  const int num_rows = batch->num_rows();
-  for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
-    EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, ht_ctx);
-
-    FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
-      RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.get(), ht_ctx));
-      expr_vals_cache->NextRow();
+Status PartitionedAggregationNode::process_batch(RowBatch* batch, PartitionedHashTableCtx* ht_ctx) {
+    DCHECK(!_hash_partitions.empty());
+
+    // Make sure that no resizes will happen when inserting individual rows to the hash
+    // table of each partition by pessimistically assuming that all the rows in each batch
+    // will end up to the same partition.
+    // TODO: Once we have a histogram with the number of rows per partition, we will have
+    // accurate resize calls.
+    int num_rows = batch->num_rows();
+    RETURN_IF_ERROR(check_and_resize_hash_partitions(num_rows, ht_ctx));
+
+    for (int i = 0; i < num_rows; ++i) {
+        RETURN_IF_ERROR(process_row<AGGREGATED_ROWS>(batch->get_row(i), ht_ctx));
     }
-    DCHECK(expr_vals_cache->AtEnd());
-  }
-  return Status::OK();
+
+    return Status::OK();
 }
 
 template<bool AGGREGATED_ROWS>
-void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
-    RowBatch* batch, int start_row_idx,
-    NewPartitionedHashTableCtx* ht_ctx) {
-  NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  const int cache_size = expr_vals_cache->capacity();
-
-  expr_vals_cache->Reset();
-  FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) {
-    TupleRow* row = batch_iter.get();
-    bool is_null;
+Status PartitionedAggregationNode::process_row(TupleRow* row, PartitionedHashTableCtx* ht_ctx) {
+    uint32_t hash = 0;
     if (AGGREGATED_ROWS) {
-      is_null = !ht_ctx->EvalAndHashBuild(row);
+        if (!ht_ctx->eval_and_hash_build(row, &hash)) {
+            return Status::OK();
+        }
     } else {
-      is_null = !ht_ctx->EvalAndHashProbe(row);
-    }
-    // Hoist lookups out of non-null branch to speed up non-null case.
-    const uint32_t hash = expr_vals_cache->CurExprValuesHash();
-    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-    NewPartitionedHashTable* hash_tbl = GetHashTable(partition_idx);
-    if (is_null) {
-      expr_vals_cache->SetRowNull();
-    } else if (config::enable_prefetch) {
-      if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<false>(hash);
+        if (!ht_ctx->eval_and_hash_probe(row, &hash)) {
+            return Status::OK();
+        }
     }
-    expr_vals_cache->NextRow();
-  }
 
-  expr_vals_cache->ResetForRead();
-}
+    // To process this row, we first see if it can be aggregated or inserted into this
+    // partition's hash table. If we need to insert it and that fails, due to OOM, we
+    // spill the partition. The partition to spill is not necessarily dst_partition,
+    // so we can try again to insert the row.
+    Partition* dst_partition = _hash_partitions[hash >> (32 - NUM_PARTITIONING_BITS)];
+    if (dst_partition->is_spilled()) {
+        // This partition is already spilled, just append the row.
+        return append_spilled_row<AGGREGATED_ROWS>(dst_partition, row);
+    }
 
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessRow(TupleRow* row,
-    NewPartitionedHashTableCtx* ht_ctx) {
-  NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  // Hoist lookups out of non-null branch to speed up non-null case.
-  const uint32_t hash = expr_vals_cache->CurExprValuesHash();
-  const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-  if (expr_vals_cache->IsRowNull()) return Status::OK();
-  // To process this row, we first see if it can be aggregated or inserted into this
-  // partition's hash table. If we need to insert it and that fails, due to OOM, we
-  // spill the partition. The partition to spill is not necessarily dst_partition,
-  // so we can try again to insert the row.
-  NewPartitionedHashTable* hash_tbl = GetHashTable(partition_idx);
-  Partition* dst_partition = hash_partitions_[partition_idx];
-  DCHECK(dst_partition != nullptr);
-  DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL);
-  if (hash_tbl == NULL) {
-    // This partition is already spilled, just append the row.
-    return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row);
-  }
-
-  DCHECK(dst_partition->aggregated_row_stream->is_pinned());
-  bool found;
-  // Find the appropriate bucket in the hash table. There will always be a free
-  // bucket because we checked the size above.
-  NewPartitionedHashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
-  DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
-  if (AGGREGATED_ROWS) {
-    // If the row is already an aggregate row, it cannot match anything in the
-    // hash table since we process the aggregate rows first. These rows should
-    // have been aggregated in the initial pass.
-    DCHECK(!found);
-  } else if (found) {
-    // Row is already in hash table. Do the aggregation and we're done.
-    UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row);
-    return Status::OK();
-  }
+    PartitionedHashTable* ht = dst_partition->hash_tbl.get();
+    DCHECK(ht != NULL);
+    DCHECK(dst_partition->aggregated_row_stream->is_pinned());
+    bool found;
+    // Find the appropriate bucket in the hash table. There will always be a free
+    // bucket because we checked the size above.
+    PartitionedHashTable::Iterator it = ht->find_bucket(ht_ctx, hash, &found);
+    DCHECK(!it.at_end()) << "Hash table had no free buckets";
+    if (AGGREGATED_ROWS) {
+        // If the row is already an aggregate row, it cannot match anything in the
+        // hash table since we process the aggregate rows first. These rows should
+        // have been aggregated in the initial pass.
+        DCHECK(!found);
+    } else if (found) {
+        // Row is already in hash table. Do the aggregation and we're done.
+        update_tuple(&dst_partition->agg_fn_ctxs[0], it.get_tuple(), row);
+        return Status::OK();
+    }
 
-  // If we are seeing this result row for the first time, we need to construct the
-  // result row and initialize it.
-  return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it);
+    // If we are seeing this result row for the first time, we need to construct the
+    // result row and initialize it.
+    return add_intermediate_tuple<AGGREGATED_ROWS>(dst_partition, ht_ctx, row, hash, it);
 }
 
 template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AddIntermediateTuple(Partition* partition,
-    TupleRow* row, uint32_t hash, NewPartitionedHashTable::Iterator insert_it) {
-  while (true) {
-    DCHECK(partition->aggregated_row_stream->is_pinned());
-    Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
-        partition->aggregated_row_stream.get(), &process_batch_status_);
-
-    if (LIKELY(intermediate_tuple != NULL)) {
-      UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, row, AGGREGATED_ROWS);
-      // After copying and initializing the tuple, insert it into the hash table.
-      insert_it.SetTuple(intermediate_tuple, hash);
-      return Status::OK();
-    } else if (!process_batch_status_.ok()) {
-      return std::move(process_batch_status_);
-    }
-
-    // We did not have enough memory to add intermediate_tuple to the stream.
-    RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
-    if (partition->is_spilled()) {
-      return AppendSpilledRow<AGGREGATED_ROWS>(partition, row);
-    }
-  }
-}
+Status PartitionedAggregationNode::add_intermediate_tuple(
+        Partition* partition,
+        PartitionedHashTableCtx* ht_ctx,
+        TupleRow* row,
+        uint32_t hash,
+        PartitionedHashTable::Iterator insert_it) {
+    while (true) {
+        DCHECK(partition->aggregated_row_stream->is_pinned());
+        Tuple* intermediate_tuple = construct_intermediate_tuple(partition->agg_fn_ctxs,
+                NULL, partition->aggregated_row_stream.get(), &_process_batch_status);
+
+        if (LIKELY(intermediate_tuple != NULL)) {
+            update_tuple(&partition->agg_fn_ctxs[0], intermediate_tuple, row, AGGREGATED_ROWS);
+            // After copying and initializing the tuple, insert it into the hash table.
+            insert_it.set_tuple(intermediate_tuple, hash);
+            return Status::OK();
+        } else if (!_process_batch_status.ok()) {
+            return _process_batch_status;
+        }
 
-Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
-    RowBatch* in_batch, RowBatch* out_batch,
-    NewPartitionedHashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
-  DCHECK(is_streaming_preagg_);
-  DCHECK_EQ(out_batch->num_rows(), 0);
-  DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
-
-  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
-  NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  const int num_rows = in_batch->num_rows();
-  const int cache_size = expr_vals_cache->capacity();
-  for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
-    EvalAndHashPrefetchGroup<false>(in_batch, group_start, ht_ctx);
-
-    FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
-      // Hoist lookups out of non-null branch to speed up non-null case.
-      TupleRow* in_row = in_batch_iter.get();
-      const uint32_t hash = expr_vals_cache->CurExprValuesHash();
-      const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-      if (!expr_vals_cache->IsRowNull() &&
-          !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
-            GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx],
-            &process_batch_status_)) {
-        RETURN_IF_ERROR(std::move(process_batch_status_));
-        // Tuple is not going into hash table, add it to the output batch.
-        Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_evals_,
-            out_batch->tuple_data_pool(), &process_batch_status_);
-        if (UNLIKELY(intermediate_tuple == NULL)) {
-          DCHECK(!process_batch_status_.ok());
-          return std::move(process_batch_status_);
+        // We did not have enough memory to add intermediate_tuple to the stream.
+        RETURN_IF_ERROR(spill_partition());
+        if (partition->is_spilled()) {
+            return append_spilled_row<AGGREGATED_ROWS>(partition, row);
         }
-        UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row);
-        out_batch_iterator.get()->set_tuple(0, intermediate_tuple);
-        out_batch_iterator.next();
-        out_batch->commit_last_row();
-      }
-      DCHECK(process_batch_status_.ok());
-      expr_vals_cache->NextRow();
     }
-    DCHECK(expr_vals_cache->AtEnd());
-  }
-  if (needs_serialize) {
-    FOREACH_ROW(out_batch, 0, out_batch_iter) {
-      NewAggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.get()->get_tuple(0));
-    }
-  }
-
-  return Status::OK();
 }
 
-bool PartitionedAggregationNode::TryAddToHashTable(
-    NewPartitionedHashTableCtx* ht_ctx, Partition* partition,
-    NewPartitionedHashTable* hash_tbl, TupleRow* in_row,
-    uint32_t hash, int* remaining_capacity, Status* status) {
-  DCHECK(remaining_capacity != NULL);
-  DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
-  DCHECK_GE(*remaining_capacity, 0);
-  bool found;
-  // This is called from ProcessBatchStreaming() so the rows are not aggregated.
-  NewPartitionedHashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
-  Tuple* intermediate_tuple;
-  if (found) {
-    intermediate_tuple = it.GetTuple();
-  } else if (*remaining_capacity == 0) {
-    return false;
-  } else {
-    intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
-        partition->aggregated_row_stream.get(), status);
-    if (LIKELY(intermediate_tuple != NULL)) {
-      it.SetTuple(intermediate_tuple, hash);
-      --(*remaining_capacity);
-    } else {
-      // Avoid repeatedly trying to add tuples when under memory pressure.
-      *remaining_capacity = 0;
-      return false;
-    }
-  }
-  UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row);
-  return true;
+template<bool AGGREGATED_ROWS>
+Status PartitionedAggregationNode::append_spilled_row(Partition* partition, TupleRow* row) {
+    DCHECK(partition->is_spilled());
+    BufferedTupleStream2* stream = AGGREGATED_ROWS ?
+            partition->aggregated_row_stream.get() : partition->unaggregated_row_stream.get();
+    return append_spilled_row(stream, row);
 }
 
-// Instantiate required templates.
-template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
-    NewPartitionedHashTableCtx*);
-template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
-    NewPartitionedHashTableCtx*);
+template Status PartitionedAggregationNode::process_batch<false>(
+    RowBatch*, PartitionedHashTableCtx*);
+template Status PartitionedAggregationNode::process_batch<true>(
+    RowBatch*, PartitionedHashTableCtx*);
 
+} // end namespace doris
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 7f4ce70..3d96636 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -55,7 +55,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
     _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
     DCHECK_GT(_tuple_ptrs_size, 0);
     // TODO: switch to Init() pattern so we can check memory limit and return Status.
-    if (config::enable_partitioned_aggregation) {
+    if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) {
         _mem_tracker->consume(_tuple_ptrs_size);
         _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
         DCHECK(_tuple_ptrs != NULL);
@@ -89,7 +89,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc,
     _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
     DCHECK_GT(_tuple_ptrs_size, 0);
     // TODO: switch to Init() pattern so we can check memory limit and return Status.
-    if (config::enable_partitioned_aggregation) {
+    if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) {
         _mem_tracker->consume(_tuple_ptrs_size);
         _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
         DCHECK(_tuple_ptrs != nullptr);
@@ -187,7 +187,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
     _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*);
     DCHECK_GT(_tuple_ptrs_size, 0);
     // TODO: switch to Init() pattern so we can check memory limit and return Status.
-    if (config::enable_partitioned_aggregation) {
+    if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) {
         _mem_tracker->consume(_tuple_ptrs_size);
         _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
         DCHECK(_tuple_ptrs != NULL);
@@ -288,7 +288,7 @@ void RowBatch::clear() {
     for (int i = 0; i < _blocks.size(); ++i) {
         _blocks[i]->del();
     }
-    if (config::enable_partitioned_aggregation ) {
+    if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) {
         DCHECK(_tuple_ptrs != NULL);
         free(_tuple_ptrs);
         _mem_tracker->release(_tuple_ptrs_size);
@@ -498,7 +498,7 @@ void RowBatch::reset() {
     }
     _blocks.clear();
     _auxiliary_mem_usage = 0;
-    if (!config::enable_partitioned_aggregation) {
+    if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) {
         _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
     }
     _need_to_return = false;
@@ -590,7 +590,7 @@ void RowBatch::acquire_state(RowBatch* src) {
     _num_rows = src->_num_rows;
     _capacity = src->_capacity;
     _need_to_return = src->_need_to_return;
-    if (!config::enable_partitioned_aggregation) {
+    if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) {
         // Tuple pointers are allocated from tuple_data_pool_ so are transferred.
         _tuple_ptrs = src->_tuple_ptrs;
         src->_tuple_ptrs = NULL;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org