You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/02 22:18:55 UTC

[3/6] impala git commit: IMPALA-110 (part 2): Refactor PartitionedAggregationNode

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/non-grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
new file mode 100644
index 0000000..585c264
--- /dev/null
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/non-grouping-aggregator.h"
+
+#include <sstream>
+
+#include "codegen/llvm-codegen.h"
+#include "exec/exec-node.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-pool.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+NonGroupingAggregator::NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
+    const TPlanNode& tnode, const DescriptorTbl& descs)
+  : Aggregator(exec_node, pool, tnode, descs, "NonGroupingAggregator"),
+    add_batch_impl_fn_(nullptr),
+    singleton_output_tuple_(nullptr),
+    singleton_output_tuple_returned_(true) {}
+
+Status NonGroupingAggregator::Prepare(RuntimeState* state) {
+  RETURN_IF_ERROR(Aggregator::Prepare(state));
+  singleton_tuple_pool_.reset(new MemPool(mem_tracker_.get()));
+  return Status::OK();
+}
+
+void NonGroupingAggregator::Codegen(RuntimeState* state) {
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != nullptr);
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  Status codegen_status = CodegenAddBatchImpl(codegen, prefetch_mode);
+  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+}
+
+Status NonGroupingAggregator::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(Aggregator::Open(state));
+
+  // Create the single output tuple for this non-grouping agg. This must happen after
+  // opening the aggregate evaluators.
+  singleton_output_tuple_ =
+      ConstructSingletonOutputTuple(agg_fn_evals_, singleton_tuple_pool_.get());
+  // Check for failures during AggFnEvaluator::Init().
+  RETURN_IF_ERROR(state->GetQueryStatus());
+  singleton_output_tuple_returned_ = false;
+
+  return Status::OK();
+}
+
+Status NonGroupingAggregator::GetNext(
+    RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  // There was no grouping, so evaluate the conjuncts and return the single result row.
+  // We allow calling GetNext() after eos, so don't return this row again.
+  if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
+  singleton_output_tuple_returned_ = true;
+  *eos = true;
+  return Status::OK();
+}
+
+void NonGroupingAggregator::GetSingletonOutput(RowBatch* row_batch) {
+  int row_idx = row_batch->AddRow();
+  TupleRow* row = row_batch->GetRow(row_idx);
+  // The output row batch may reference memory allocated by Serialize() or Finalize(),
+  // allocating that memory directly from the row batch's pool means we can safely return
+  // the batch.
+  vector<ScopedResultsPool> allocate_from_batch_pool =
+      ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool());
+  Tuple* output_tuple = GetOutputTuple(
+      agg_fn_evals_, singleton_output_tuple_, row_batch->tuple_data_pool());
+  row->SetTuple(0, output_tuple);
+  if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjunct_evals_.size(), row)) {
+    row_batch->CommitLastRow();
+    ++num_rows_returned_;
+    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  }
+  // Keep the current chunk to amortize the memory allocation over a series
+  // of Reset()/Open()/GetNext()* calls.
+  row_batch->tuple_data_pool()->AcquireData(singleton_tuple_pool_.get(), true);
+  // This node no longer owns the memory for singleton_output_tuple_.
+  singleton_output_tuple_ = nullptr;
+}
+
+void NonGroupingAggregator::Close(RuntimeState* state) {
+  if (!singleton_output_tuple_returned_) {
+    GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, singleton_tuple_pool_.get());
+  }
+
+  if (singleton_tuple_pool_.get() != nullptr) singleton_tuple_pool_->FreeAll();
+  // Must be called after singleton_tuple_pool_ is freed, so that mem_tracker_ can be
+  // closed.
+  Aggregator::Close(state);
+}
+
+Status NonGroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(build_timer_);
+
+  if (add_batch_impl_fn_ != nullptr) {
+    RETURN_IF_ERROR(add_batch_impl_fn_(this, batch));
+  } else {
+    RETURN_IF_ERROR(AddBatchImpl(batch));
+  }
+
+  return Status::OK();
+}
+
+Tuple* NonGroupingAggregator::ConstructSingletonOutputTuple(
+    const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
+  Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), pool);
+  InitAggSlots(agg_fn_evals, output_tuple);
+  return output_tuple;
+}
+
+string NonGroupingAggregator::DebugString(int indentation_level) const {
+  stringstream ss;
+  DebugString(indentation_level, &ss);
+  return ss.str();
+}
+
+void NonGroupingAggregator::DebugString(int indentation_level, stringstream* out) const {
+  *out << string(indentation_level * 2, ' ');
+  *out << "NonGroupingAggregator("
+       << "intermediate_tuple_id=" << intermediate_tuple_id_
+       << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_
+       << " agg_exprs=" << AggFn::DebugString(agg_fns_);
+  *out << ")";
+}
+
+Status NonGroupingAggregator::CodegenAddBatchImpl(
+    LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
+  llvm::Function* update_tuple_fn;
+  RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
+
+  // Get the cross compiled update row batch function
+  IRFunction::Type ir_fn = IRFunction::NON_GROUPING_AGG_ADD_BATCH_IMPL;
+  llvm::Function* add_batch_impl_fn = codegen->GetFunction(ir_fn, true);
+  DCHECK(add_batch_impl_fn != nullptr);
+
+  int replaced;
+  replaced = codegen->ReplaceCallSites(add_batch_impl_fn, update_tuple_fn, "UpdateTuple");
+  DCHECK_GE(replaced, 1);
+  add_batch_impl_fn = codegen->FinalizeFunction(add_batch_impl_fn);
+  if (add_batch_impl_fn == nullptr) {
+    return Status("NonGroupingAggregator::CodegenAddBatchImpl(): codegen'd "
+                  "AddBatchImpl() function failed verification, see log");
+  }
+
+  void** codegened_fn_ptr = reinterpret_cast<void**>(&add_batch_impl_fn_);
+  codegen->AddFunctionToJit(add_batch_impl_fn, codegened_fn_ptr);
+  return Status::OK();
+}
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/non-grouping-aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
new file mode 100644
index 0000000..41b3e0d
--- /dev/null
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H
+#define IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H
+
+#include <memory>
+#include <vector>
+
+#include "exec/aggregator.h"
+#include "runtime/mem-pool.h"
+
+namespace impala {
+
+class AggFnEvaluator;
+class DescriptorTbl;
+class ExecNode;
+class LlvmCodeGen;
+class ObjectPool;
+class RowBatch;
+class RuntimeState;
+class TPlanNode;
+class Tuple;
+
+/// Aggregator for doing non-grouping aggregations. Input is passed to the aggregator
+/// through AddBatch(), which generates the single output row. This Aggregator does
+/// not support streaming preaggregation.
+class NonGroupingAggregator : public Aggregator {
+ public:
+  NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
+      const DescriptorTbl& descs);
+
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual void Codegen(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  virtual Status Reset(RuntimeState* state) override { return Status::OK(); }
+  virtual void Close(RuntimeState* state) override;
+
+  virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override;
+  virtual Status InputDone() override { return Status::OK(); }
+
+  virtual int num_grouping_exprs() override { return 0; }
+
+  /// NonGroupingAggregator doesn't create a buffer pool client so it doesn't need the
+  /// debug options.
+  virtual void SetDebugOptions(const TDebugOptions& debug_options) override {}
+
+  virtual std::string DebugString(int indentation_level = 0) const override;
+  virtual void DebugString(int indentation_level, std::stringstream* out) const override;
+
+ private:
+  /// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the
+  /// pool's memory is transferred to the output batch on eos. The pool should not be
+  /// Reset() to allow amortizing memory allocation over a series of
+  /// Reset()/Open()/GetNext()* calls.
+  std::unique_ptr<MemPool> singleton_tuple_pool_;
+
+  typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*);
+  /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
+  AddBatchImplFn add_batch_impl_fn_;
+
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// Result of aggregation w/o GROUP BY.
+  /// Note: can be NULL even if there is no grouping if the result tuple is 0 width
+  /// e.g. select 1 from table group by col.
+  Tuple* singleton_output_tuple_;
+  bool singleton_output_tuple_returned_;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+
+  /// Constructs singleton output tuple, allocating memory from pool.
+  Tuple* ConstructSingletonOutputTuple(
+      const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool);
+
+  /// Do the aggregation for all tuple rows in the batch when there is no grouping.
+  /// This function is replaced by codegen.
+  Status AddBatchImpl(RowBatch* batch) WARN_UNUSED_RESULT;
+
+  /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
+  void GetSingletonOutput(RowBatch* row_batch);
+
+  /// Codegen the non-streaming add row batch loop. The loop has already been compiled to
+  /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR.
+  /// This function will modify the loop subsituting the statically compiled functions
+  /// with codegen'd ones. 'add_batch_impl_fn_' will be updated with the codegened
+  /// function.
+  /// Assumes AGGREGATED_ROWS = false.
+  Status CodegenAddBatchImpl(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
deleted file mode 100644
index 69d297c..0000000
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ /dev/null
@@ -1,253 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/partitioned-aggregation-node.h"
-
-#include "exec/hash-table.inline.h"
-#include "exprs/agg-fn-evaluator.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream.inline.h"
-#include "runtime/row-batch.h"
-#include "runtime/tuple-row.h"
-
-using namespace impala;
-
-Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
-  Tuple* output_tuple = singleton_output_tuple_;
-  FOREACH_ROW(batch, 0, batch_iter) {
-    UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.Get());
-  }
-  return Status::OK();
-}
-
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
-    TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) {
-  DCHECK(!hash_partitions_.empty());
-  DCHECK(!is_streaming_preagg_);
-
-  // Make sure that no resizes will happen when inserting individual rows to the hash
-  // table of each partition by pessimistically assuming that all the rows in each batch
-  // will end up to the same partition.
-  // TODO: Once we have a histogram with the number of rows per partition, we will have
-  // accurate resize calls.
-  RETURN_IF_ERROR(
-      CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx));
-
-  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  const int cache_size = expr_vals_cache->capacity();
-  const int num_rows = batch->num_rows();
-  for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
-    EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, prefetch_mode, ht_ctx);
-
-    FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
-      RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
-      expr_vals_cache->NextRow();
-    }
-    DCHECK(expr_vals_cache->AtEnd());
-  }
-  return Status::OK();
-}
-
-template<bool AGGREGATED_ROWS>
-void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
-    RowBatch* batch, int start_row_idx, TPrefetchMode::type prefetch_mode,
-    HashTableCtx* ht_ctx) {
-  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  const int cache_size = expr_vals_cache->capacity();
-
-  expr_vals_cache->Reset();
-  FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) {
-    TupleRow* row = batch_iter.Get();
-    bool is_null;
-    if (AGGREGATED_ROWS) {
-      is_null = !ht_ctx->EvalAndHashBuild(row);
-    } else {
-      is_null = !ht_ctx->EvalAndHashProbe(row);
-    }
-    // Hoist lookups out of non-null branch to speed up non-null case.
-    const uint32_t hash = expr_vals_cache->CurExprValuesHash();
-    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-    HashTable* hash_tbl = GetHashTable(partition_idx);
-    if (is_null) {
-      expr_vals_cache->SetRowNull();
-    } else if (prefetch_mode != TPrefetchMode::NONE) {
-      if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<false>(hash);
-    }
-    expr_vals_cache->NextRow();
-  }
-
-  expr_vals_cache->ResetForRead();
-}
-
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
-    HashTableCtx* __restrict__ ht_ctx) {
-  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  // Hoist lookups out of non-null branch to speed up non-null case.
-  const uint32_t hash = expr_vals_cache->CurExprValuesHash();
-  const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-  if (expr_vals_cache->IsRowNull()) return Status::OK();
-  // To process this row, we first see if it can be aggregated or inserted into this
-  // partition's hash table. If we need to insert it and that fails, due to OOM, we
-  // spill the partition. The partition to spill is not necessarily dst_partition,
-  // so we can try again to insert the row.
-  HashTable* hash_tbl = GetHashTable(partition_idx);
-  Partition* dst_partition = hash_partitions_[partition_idx];
-  DCHECK(dst_partition != nullptr);
-  DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL);
-  if (hash_tbl == NULL) {
-    // This partition is already spilled, just append the row.
-    return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row);
-  }
-
-  DCHECK(dst_partition->aggregated_row_stream->is_pinned());
-  bool found;
-  // Find the appropriate bucket in the hash table. There will always be a free
-  // bucket because we checked the size above.
-  HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
-  DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
-  if (AGGREGATED_ROWS) {
-    // If the row is already an aggregate row, it cannot match anything in the
-    // hash table since we process the aggregate rows first. These rows should
-    // have been aggregated in the initial pass.
-    DCHECK(!found);
-  } else if (found) {
-    // Row is already in hash table. Do the aggregation and we're done.
-    UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row);
-    return Status::OK();
-  }
-
-  // If we are seeing this result row for the first time, we need to construct the
-  // result row and initialize it.
-  return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it);
-}
-
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__ partition,
-    TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it) {
-  while (true) {
-    DCHECK(partition->aggregated_row_stream->is_pinned());
-    Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
-        partition->aggregated_row_stream.get(), &process_batch_status_);
-
-    if (LIKELY(intermediate_tuple != NULL)) {
-      UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple,
-          row, AGGREGATED_ROWS);
-      // After copying and initializing the tuple, insert it into the hash table.
-      insert_it.SetTuple(intermediate_tuple, hash);
-      return Status::OK();
-    } else if (!process_batch_status_.ok()) {
-      return std::move(process_batch_status_);
-    }
-
-    // We did not have enough memory to add intermediate_tuple to the stream.
-    RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
-    if (partition->is_spilled()) {
-      return AppendSpilledRow<AGGREGATED_ROWS>(partition, row);
-    }
-  }
-}
-
-Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
-    TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
-    HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
-  DCHECK(is_streaming_preagg_);
-  DCHECK_EQ(out_batch->num_rows(), 0);
-  DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
-
-  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
-  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  const int num_rows = in_batch->num_rows();
-  const int cache_size = expr_vals_cache->capacity();
-  for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
-    EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, ht_ctx);
-
-    FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
-      // Hoist lookups out of non-null branch to speed up non-null case.
-      TupleRow* in_row = in_batch_iter.Get();
-      const uint32_t hash = expr_vals_cache->CurExprValuesHash();
-      const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-      if (!expr_vals_cache->IsRowNull() &&
-          !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
-            GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx],
-            &process_batch_status_)) {
-        RETURN_IF_ERROR(std::move(process_batch_status_));
-        // Tuple is not going into hash table, add it to the output batch.
-        Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_evals_,
-            out_batch->tuple_data_pool(), &process_batch_status_);
-        if (UNLIKELY(intermediate_tuple == NULL)) {
-          DCHECK(!process_batch_status_.ok());
-          return std::move(process_batch_status_);
-        }
-        UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row);
-        out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
-        out_batch_iterator.Next();
-        out_batch->CommitLastRow();
-      }
-      DCHECK(process_batch_status_.ok());
-      expr_vals_cache->NextRow();
-    }
-    DCHECK(expr_vals_cache->AtEnd());
-  }
-  if (needs_serialize) {
-    FOREACH_ROW(out_batch, 0, out_batch_iter) {
-      AggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.Get()->GetTuple(0));
-    }
-  }
-
-  return Status::OK();
-}
-
-bool PartitionedAggregationNode::TryAddToHashTable(
-    HashTableCtx* __restrict__ ht_ctx, Partition* __restrict__ partition,
-    HashTable* __restrict__ hash_tbl, TupleRow* __restrict__ in_row,
-    uint32_t hash, int* __restrict__ remaining_capacity, Status* status) {
-  DCHECK(remaining_capacity != NULL);
-  DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
-  DCHECK_GE(*remaining_capacity, 0);
-  bool found;
-  // This is called from ProcessBatchStreaming() so the rows are not aggregated.
-  HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
-  Tuple* intermediate_tuple;
-  if (found) {
-    intermediate_tuple = it.GetTuple();
-  } else if (*remaining_capacity == 0) {
-    return false;
-  } else {
-    intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
-        partition->aggregated_row_stream.get(), status);
-    if (LIKELY(intermediate_tuple != NULL)) {
-      it.SetTuple(intermediate_tuple, hash);
-      --(*remaining_capacity);
-    } else {
-      // Avoid repeatedly trying to add tuples when under memory pressure.
-      *remaining_capacity = 0;
-      return false;
-    }
-  }
-
-  UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row);
-  return true;
-}
-
-// Instantiate required templates.
-template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
-    TPrefetchMode::type, HashTableCtx*);
-template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
-    TPrefetchMode::type, HashTableCtx*);