You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/02 22:18:55 UTC
[3/6] impala git commit: IMPALA-110 (part 2): Refactor
PartitionedAggregationNode
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/non-grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
new file mode 100644
index 0000000..585c264
--- /dev/null
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/non-grouping-aggregator.h"
+
+#include <sstream>
+
+#include "codegen/llvm-codegen.h"
+#include "exec/exec-node.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-pool.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+NonGroupingAggregator::NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
+ const TPlanNode& tnode, const DescriptorTbl& descs)
+ : Aggregator(exec_node, pool, tnode, descs, "NonGroupingAggregator"),
+ add_batch_impl_fn_(nullptr),
+ singleton_output_tuple_(nullptr),
+ singleton_output_tuple_returned_(true) {}
+
+Status NonGroupingAggregator::Prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Aggregator::Prepare(state));
+ singleton_tuple_pool_.reset(new MemPool(mem_tracker_.get()));
+ return Status::OK();
+}
+
+void NonGroupingAggregator::Codegen(RuntimeState* state) {
+ LlvmCodeGen* codegen = state->codegen();
+ DCHECK(codegen != nullptr);
+ TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+ Status codegen_status = CodegenAddBatchImpl(codegen, prefetch_mode);
+ runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+}
+
+Status NonGroupingAggregator::Open(RuntimeState* state) {
+ RETURN_IF_ERROR(Aggregator::Open(state));
+
+ // Create the single output tuple for this non-grouping agg. This must happen after
+ // opening the aggregate evaluators.
+ singleton_output_tuple_ =
+ ConstructSingletonOutputTuple(agg_fn_evals_, singleton_tuple_pool_.get());
+ // Check for failures during AggFnEvaluator::Init().
+ RETURN_IF_ERROR(state->GetQueryStatus());
+ singleton_output_tuple_returned_ = false;
+
+ return Status::OK();
+}
+
+Status NonGroupingAggregator::GetNext(
+ RuntimeState* state, RowBatch* row_batch, bool* eos) {
+ // There was no grouping, so evaluate the conjuncts and return the single result row.
+ // We allow calling GetNext() after eos, so don't return this row again.
+ if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
+ singleton_output_tuple_returned_ = true;
+ *eos = true;
+ return Status::OK();
+}
+
+void NonGroupingAggregator::GetSingletonOutput(RowBatch* row_batch) {
+ int row_idx = row_batch->AddRow();
+ TupleRow* row = row_batch->GetRow(row_idx);
+ // The output row batch may reference memory allocated by Serialize() or Finalize(),
+ // allocating that memory directly from the row batch's pool means we can safely return
+ // the batch.
+ vector<ScopedResultsPool> allocate_from_batch_pool =
+ ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool());
+ Tuple* output_tuple = GetOutputTuple(
+ agg_fn_evals_, singleton_output_tuple_, row_batch->tuple_data_pool());
+ row->SetTuple(0, output_tuple);
+ if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjunct_evals_.size(), row)) {
+ row_batch->CommitLastRow();
+ ++num_rows_returned_;
+ COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+ }
+ // Keep the current chunk to amortize the memory allocation over a series
+ // of Reset()/Open()/GetNext()* calls.
+ row_batch->tuple_data_pool()->AcquireData(singleton_tuple_pool_.get(), true);
+ // This node no longer owns the memory for singleton_output_tuple_.
+ singleton_output_tuple_ = nullptr;
+}
+
+void NonGroupingAggregator::Close(RuntimeState* state) {
+ if (!singleton_output_tuple_returned_) {
+ GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, singleton_tuple_pool_.get());
+ }
+
+ if (singleton_tuple_pool_.get() != nullptr) singleton_tuple_pool_->FreeAll();
+ // Must be called after singleton_tuple_pool_ is freed, so that mem_tracker_ can be
+ // closed.
+ Aggregator::Close(state);
+}
+
+Status NonGroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
+ SCOPED_TIMER(build_timer_);
+
+ if (add_batch_impl_fn_ != nullptr) {
+ RETURN_IF_ERROR(add_batch_impl_fn_(this, batch));
+ } else {
+ RETURN_IF_ERROR(AddBatchImpl(batch));
+ }
+
+ return Status::OK();
+}
+
+Tuple* NonGroupingAggregator::ConstructSingletonOutputTuple(
+ const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
+ Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), pool);
+ InitAggSlots(agg_fn_evals, output_tuple);
+ return output_tuple;
+}
+
+string NonGroupingAggregator::DebugString(int indentation_level) const {
+ stringstream ss;
+ DebugString(indentation_level, &ss);
+ return ss.str();
+}
+
+void NonGroupingAggregator::DebugString(int indentation_level, stringstream* out) const {
+ *out << string(indentation_level * 2, ' ');
+ *out << "NonGroupingAggregator("
+ << "intermediate_tuple_id=" << intermediate_tuple_id_
+ << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_
+ << " agg_exprs=" << AggFn::DebugString(agg_fns_);
+ *out << ")";
+}
+
+Status NonGroupingAggregator::CodegenAddBatchImpl(
+ LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
+ llvm::Function* update_tuple_fn;
+ RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
+
+ // Get the cross compiled update row batch function
+ IRFunction::Type ir_fn = IRFunction::NON_GROUPING_AGG_ADD_BATCH_IMPL;
+ llvm::Function* add_batch_impl_fn = codegen->GetFunction(ir_fn, true);
+ DCHECK(add_batch_impl_fn != nullptr);
+
+ int replaced;
+ replaced = codegen->ReplaceCallSites(add_batch_impl_fn, update_tuple_fn, "UpdateTuple");
+ DCHECK_GE(replaced, 1);
+ add_batch_impl_fn = codegen->FinalizeFunction(add_batch_impl_fn);
+ if (add_batch_impl_fn == nullptr) {
+ return Status("NonGroupingAggregator::CodegenAddBatchImpl(): codegen'd "
+ "AddBatchImpl() function failed verification, see log");
+ }
+
+ void** codegened_fn_ptr = reinterpret_cast<void**>(&add_batch_impl_fn_);
+ codegen->AddFunctionToJit(add_batch_impl_fn, codegened_fn_ptr);
+ return Status::OK();
+}
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/non-grouping-aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
new file mode 100644
index 0000000..41b3e0d
--- /dev/null
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H
+#define IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H
+
+#include <memory>
+#include <vector>
+
+#include "exec/aggregator.h"
+#include "runtime/mem-pool.h"
+
+namespace impala {
+
+class AggFnEvaluator;
+class DescriptorTbl;
+class ExecNode;
+class LlvmCodeGen;
+class ObjectPool;
+class RowBatch;
+class RuntimeState;
+class TPlanNode;
+class Tuple;
+
+/// Aggregator for doing non-grouping aggregations. Input is passed to the aggregator
+/// through AddBatch(), which generates the single output row. This Aggregator does
+/// not support streaming preaggregation.
+class NonGroupingAggregator : public Aggregator {
+ public:
+ NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs);
+
+ virtual Status Prepare(RuntimeState* state) override;
+ virtual void Codegen(RuntimeState* state) override;
+ virtual Status Open(RuntimeState* state) override;
+ virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+ virtual Status Reset(RuntimeState* state) override { return Status::OK(); }
+ virtual void Close(RuntimeState* state) override;
+
+ virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override;
+ virtual Status InputDone() override { return Status::OK(); }
+
+ virtual int num_grouping_exprs() override { return 0; }
+
+ /// NonGroupingAggregator doesn't create a buffer pool client so it doesn't need the
+ /// debug options.
+ virtual void SetDebugOptions(const TDebugOptions& debug_options) override {}
+
+ virtual std::string DebugString(int indentation_level = 0) const override;
+ virtual void DebugString(int indentation_level, std::stringstream* out) const override;
+
+ private:
+ /// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the
+ /// pool's memory is transferred to the output batch on eos. The pool should not be
+ /// Reset() to allow amortizing memory allocation over a series of
+ /// Reset()/Open()/GetNext()* calls.
+ std::unique_ptr<MemPool> singleton_tuple_pool_;
+
+ typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*);
+ /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
+ AddBatchImplFn add_batch_impl_fn_;
+
+ /////////////////////////////////////////
+ /// BEGIN: Members that must be Reset()
+
+ /// Result of aggregation w/o GROUP BY.
+ /// Note: can be NULL even if there is no grouping if the result tuple is 0 width
+ /// e.g. select 1 from table group by col.
+ Tuple* singleton_output_tuple_;
+ bool singleton_output_tuple_returned_;
+
+ /// END: Members that must be Reset()
+ /////////////////////////////////////////
+
+ /// Constructs singleton output tuple, allocating memory from pool.
+ Tuple* ConstructSingletonOutputTuple(
+ const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool);
+
+ /// Do the aggregation for all tuple rows in the batch when there is no grouping.
+ /// This function is replaced by codegen.
+ Status AddBatchImpl(RowBatch* batch) WARN_UNUSED_RESULT;
+
+ /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
+ void GetSingletonOutput(RowBatch* row_batch);
+
+ /// Codegen the non-streaming add row batch loop. The loop has already been compiled to
+ /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR.
+ /// This function will modify the loop subsituting the statically compiled functions
+ /// with codegen'd ones. 'add_batch_impl_fn_' will be updated with the codegened
+ /// function.
+ /// Assumes AGGREGATED_ROWS = false.
+ Status CodegenAddBatchImpl(
+ LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
deleted file mode 100644
index 69d297c..0000000
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ /dev/null
@@ -1,253 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/partitioned-aggregation-node.h"
-
-#include "exec/hash-table.inline.h"
-#include "exprs/agg-fn-evaluator.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream.inline.h"
-#include "runtime/row-batch.h"
-#include "runtime/tuple-row.h"
-
-using namespace impala;
-
-Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
- Tuple* output_tuple = singleton_output_tuple_;
- FOREACH_ROW(batch, 0, batch_iter) {
- UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.Get());
- }
- return Status::OK();
-}
-
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
- TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) {
- DCHECK(!hash_partitions_.empty());
- DCHECK(!is_streaming_preagg_);
-
- // Make sure that no resizes will happen when inserting individual rows to the hash
- // table of each partition by pessimistically assuming that all the rows in each batch
- // will end up to the same partition.
- // TODO: Once we have a histogram with the number of rows per partition, we will have
- // accurate resize calls.
- RETURN_IF_ERROR(
- CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx));
-
- HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- const int cache_size = expr_vals_cache->capacity();
- const int num_rows = batch->num_rows();
- for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
- EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, prefetch_mode, ht_ctx);
-
- FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
- RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
- expr_vals_cache->NextRow();
- }
- DCHECK(expr_vals_cache->AtEnd());
- }
- return Status::OK();
-}
-
-template<bool AGGREGATED_ROWS>
-void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
- RowBatch* batch, int start_row_idx, TPrefetchMode::type prefetch_mode,
- HashTableCtx* ht_ctx) {
- HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- const int cache_size = expr_vals_cache->capacity();
-
- expr_vals_cache->Reset();
- FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) {
- TupleRow* row = batch_iter.Get();
- bool is_null;
- if (AGGREGATED_ROWS) {
- is_null = !ht_ctx->EvalAndHashBuild(row);
- } else {
- is_null = !ht_ctx->EvalAndHashProbe(row);
- }
- // Hoist lookups out of non-null branch to speed up non-null case.
- const uint32_t hash = expr_vals_cache->CurExprValuesHash();
- const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- HashTable* hash_tbl = GetHashTable(partition_idx);
- if (is_null) {
- expr_vals_cache->SetRowNull();
- } else if (prefetch_mode != TPrefetchMode::NONE) {
- if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<false>(hash);
- }
- expr_vals_cache->NextRow();
- }
-
- expr_vals_cache->ResetForRead();
-}
-
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
- HashTableCtx* __restrict__ ht_ctx) {
- HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- // Hoist lookups out of non-null branch to speed up non-null case.
- const uint32_t hash = expr_vals_cache->CurExprValuesHash();
- const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- if (expr_vals_cache->IsRowNull()) return Status::OK();
- // To process this row, we first see if it can be aggregated or inserted into this
- // partition's hash table. If we need to insert it and that fails, due to OOM, we
- // spill the partition. The partition to spill is not necessarily dst_partition,
- // so we can try again to insert the row.
- HashTable* hash_tbl = GetHashTable(partition_idx);
- Partition* dst_partition = hash_partitions_[partition_idx];
- DCHECK(dst_partition != nullptr);
- DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL);
- if (hash_tbl == NULL) {
- // This partition is already spilled, just append the row.
- return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row);
- }
-
- DCHECK(dst_partition->aggregated_row_stream->is_pinned());
- bool found;
- // Find the appropriate bucket in the hash table. There will always be a free
- // bucket because we checked the size above.
- HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
- DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
- if (AGGREGATED_ROWS) {
- // If the row is already an aggregate row, it cannot match anything in the
- // hash table since we process the aggregate rows first. These rows should
- // have been aggregated in the initial pass.
- DCHECK(!found);
- } else if (found) {
- // Row is already in hash table. Do the aggregation and we're done.
- UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row);
- return Status::OK();
- }
-
- // If we are seeing this result row for the first time, we need to construct the
- // result row and initialize it.
- return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it);
-}
-
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__ partition,
- TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it) {
- while (true) {
- DCHECK(partition->aggregated_row_stream->is_pinned());
- Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
- partition->aggregated_row_stream.get(), &process_batch_status_);
-
- if (LIKELY(intermediate_tuple != NULL)) {
- UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple,
- row, AGGREGATED_ROWS);
- // After copying and initializing the tuple, insert it into the hash table.
- insert_it.SetTuple(intermediate_tuple, hash);
- return Status::OK();
- } else if (!process_batch_status_.ok()) {
- return std::move(process_batch_status_);
- }
-
- // We did not have enough memory to add intermediate_tuple to the stream.
- RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
- if (partition->is_spilled()) {
- return AppendSpilledRow<AGGREGATED_ROWS>(partition, row);
- }
- }
-}
-
-Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
- TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
- HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
- DCHECK(is_streaming_preagg_);
- DCHECK_EQ(out_batch->num_rows(), 0);
- DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
-
- RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
- HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- const int num_rows = in_batch->num_rows();
- const int cache_size = expr_vals_cache->capacity();
- for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
- EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, ht_ctx);
-
- FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
- // Hoist lookups out of non-null branch to speed up non-null case.
- TupleRow* in_row = in_batch_iter.Get();
- const uint32_t hash = expr_vals_cache->CurExprValuesHash();
- const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- if (!expr_vals_cache->IsRowNull() &&
- !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
- GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx],
- &process_batch_status_)) {
- RETURN_IF_ERROR(std::move(process_batch_status_));
- // Tuple is not going into hash table, add it to the output batch.
- Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_evals_,
- out_batch->tuple_data_pool(), &process_batch_status_);
- if (UNLIKELY(intermediate_tuple == NULL)) {
- DCHECK(!process_batch_status_.ok());
- return std::move(process_batch_status_);
- }
- UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row);
- out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
- out_batch_iterator.Next();
- out_batch->CommitLastRow();
- }
- DCHECK(process_batch_status_.ok());
- expr_vals_cache->NextRow();
- }
- DCHECK(expr_vals_cache->AtEnd());
- }
- if (needs_serialize) {
- FOREACH_ROW(out_batch, 0, out_batch_iter) {
- AggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.Get()->GetTuple(0));
- }
- }
-
- return Status::OK();
-}
-
-bool PartitionedAggregationNode::TryAddToHashTable(
- HashTableCtx* __restrict__ ht_ctx, Partition* __restrict__ partition,
- HashTable* __restrict__ hash_tbl, TupleRow* __restrict__ in_row,
- uint32_t hash, int* __restrict__ remaining_capacity, Status* status) {
- DCHECK(remaining_capacity != NULL);
- DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
- DCHECK_GE(*remaining_capacity, 0);
- bool found;
- // This is called from ProcessBatchStreaming() so the rows are not aggregated.
- HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
- Tuple* intermediate_tuple;
- if (found) {
- intermediate_tuple = it.GetTuple();
- } else if (*remaining_capacity == 0) {
- return false;
- } else {
- intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
- partition->aggregated_row_stream.get(), status);
- if (LIKELY(intermediate_tuple != NULL)) {
- it.SetTuple(intermediate_tuple, hash);
- --(*remaining_capacity);
- } else {
- // Avoid repeatedly trying to add tuples when under memory pressure.
- *remaining_capacity = 0;
- return false;
- }
- }
-
- UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row);
- return true;
-}
-
-// Instantiate required templates.
-template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
- TPrefetchMode::type, HashTableCtx*);
-template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
- TPrefetchMode::type, HashTableCtx*);