You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/02 22:18:54 UTC
[2/6] impala git commit: IMPALA-110 (part 2): Refactor
PartitionedAggregationNode
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
deleted file mode 100644
index b6b1752..0000000
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ /dev/null
@@ -1,1955 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/partitioned-aggregation-node.h"
-
-#include <math.h>
-#include <algorithm>
-#include <set>
-#include <sstream>
-
-#include "codegen/codegen-anyval.h"
-#include "codegen/llvm-codegen.h"
-#include "exec/hash-table.inline.h"
-#include "exprs/agg-fn-evaluator.h"
-#include "exprs/anyval-util.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "exprs/slot-ref.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/buffered-tuple-stream.inline.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec-env.h"
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/string-value.inline.h"
-#include "runtime/tuple-row.h"
-#include "runtime/tuple.h"
-#include "udf/udf-internal.h"
-#include "util/debug-util.h"
-#include "util/runtime-profile-counters.h"
-
-#include "gen-cpp/Exprs_types.h"
-#include "gen-cpp/PlanNodes_types.h"
-
-#include "common/names.h"
-
-using namespace impala;
-using namespace strings;
-
-namespace impala {
-
-const char* PartitionedAggregationNode::LLVM_CLASS_NAME =
- "class.impala::PartitionedAggregationNode";
-
-/// The minimum reduction factor (input rows divided by output rows) to grow hash tables
-/// in a streaming preaggregation, given that the hash tables are currently the given
-/// size or above. The sizes roughly correspond to hash table sizes where the bucket
-/// arrays will fit in a cache level. Intuitively, we don't want the working set of the
-/// aggregation to expand to the next level of cache unless we're reducing the input
-/// enough to outweigh the increased memory latency we'll incur for each hash table
-/// lookup.
-///
-/// Note that the current reduction achieved is not always a good estimate of the
-/// final reduction. It may be biased either way depending on the ordering of the
-/// input. If the input order is random, we will underestimate the final reduction
-/// factor because the probability of a row having the same key as a previous row
-/// increases as more input is processed. If the input order is correlated with the
-/// key, skew may bias the estimate. If high cardinality keys appear first, we
-/// may overestimate and if low cardinality keys appear first, we underestimate.
-/// To estimate the eventual reduction achieved, we estimate the final reduction
-/// using the planner's estimated input cardinality and the assumption that input
-/// is in a random order. This means that we assume that the reduction factor will
-/// increase over time.
-struct StreamingHtMinReductionEntry {
- // Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in
- // bytes is greater than this threshold.
- int min_ht_mem;
- // The minimum reduction factor to expand the hash tables.
- double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the cache size
-// of the machine that we're running on.
-static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
- // Expand up to L2 cache always.
- {0, 0.0},
- // Expand into L3 cache if we look like we're getting some reduction.
- {256 * 1024, 1.1},
- // Expand into main memory if we're getting a significant reduction.
- {2 * 1024 * 1024, 2.0},
-};
-
-static const int STREAMING_HT_MIN_REDUCTION_SIZE =
- sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
-
-PartitionedAggregationNode::PartitionedAggregationNode(
- ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : ExecNode(pool, tnode, descs),
- intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
- intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
- intermediate_row_desc_(intermediate_tuple_desc_, false),
- output_tuple_id_(tnode.agg_node.output_tuple_id),
- output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
- needs_finalize_(tnode.agg_node.need_finalize),
- is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation),
- needs_serialize_(false),
- output_partition_(NULL),
- process_batch_no_grouping_fn_(NULL),
- process_batch_fn_(NULL),
- process_batch_streaming_fn_(NULL),
- build_timer_(NULL),
- ht_resize_timer_(NULL),
- get_results_timer_(NULL),
- num_hash_buckets_(NULL),
- partitions_created_(NULL),
- max_partition_level_(NULL),
- num_row_repartitioned_(NULL),
- num_repartitions_(NULL),
- num_spilled_partitions_(NULL),
- largest_partition_percent_(NULL),
- streaming_timer_(NULL),
- num_passthrough_rows_(NULL),
- preagg_estimated_reduction_(NULL),
- preagg_streaming_ht_min_reduction_(NULL),
- estimated_input_cardinality_(tnode.agg_node.estimated_input_cardinality),
- singleton_output_tuple_(NULL),
- singleton_output_tuple_returned_(true),
- partition_eos_(false),
- child_eos_(false),
- partition_pool_(new ObjectPool()) {
- DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
- if (is_streaming_preagg_) {
- DCHECK(conjunct_evals_.empty()) << "Preaggs have no conjuncts";
- DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
- DCHECK(limit_ == -1) << "Preaggs have no limits";
- }
-}
-
-Status PartitionedAggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-
- DCHECK(intermediate_tuple_desc_ != nullptr);
- DCHECK(output_tuple_desc_ != nullptr);
- DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
- const RowDescriptor& row_desc = *child(0)->row_desc();
- RETURN_IF_ERROR(ScalarExpr::Create(tnode.agg_node.grouping_exprs, row_desc,
- state, &grouping_exprs_));
-
- // Construct build exprs from intermediate_row_desc_
- for (int i = 0; i < grouping_exprs_.size(); ++i) {
- SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
- DCHECK(desc->type().type == TYPE_NULL || desc->type() == grouping_exprs_[i]->type());
- // Hack to avoid TYPE_NULL SlotRefs.
- SlotRef* build_expr = pool_->Add(desc->type().type != TYPE_NULL ?
- new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN));
- build_exprs_.push_back(build_expr);
- RETURN_IF_ERROR(build_expr->Init(intermediate_row_desc_, state));
- if (build_expr->type().IsVarLenStringType()) string_grouping_exprs_.push_back(i);
- }
-
- int j = grouping_exprs_.size();
- for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
- SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
- SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
- AggFn* agg_fn;
- RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], row_desc,
- *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
- agg_fns_.push_back(agg_fn);
- needs_serialize_ |= agg_fn->SupportsSerialize();
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
- SCOPED_TIMER(runtime_profile_->total_time_counter());
-
- RETURN_IF_ERROR(ExecNode::Prepare(state));
- state_ = state;
-
- singleton_tuple_pool_.reset(new MemPool(mem_tracker()));
-
- ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime");
- get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
- num_hash_buckets_ =
- ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
- partitions_created_ =
- ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
- largest_partition_percent_ =
- runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT);
- if (is_streaming_preagg_) {
- runtime_profile()->AppendExecOption("Streaming Preaggregation");
- streaming_timer_ = ADD_TIMER(runtime_profile(), "StreamingTime");
- num_passthrough_rows_ =
- ADD_COUNTER(runtime_profile(), "RowsPassedThrough", TUnit::UNIT);
- preagg_estimated_reduction_ = ADD_COUNTER(
- runtime_profile(), "ReductionFactorEstimate", TUnit::DOUBLE_VALUE);
- preagg_streaming_ht_min_reduction_ = ADD_COUNTER(
- runtime_profile(), "ReductionFactorThresholdToExpand", TUnit::DOUBLE_VALUE);
- } else {
- build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
- num_row_repartitioned_ =
- ADD_COUNTER(runtime_profile(), "RowsRepartitioned", TUnit::UNIT);
- num_repartitions_ =
- ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
- num_spilled_partitions_ =
- ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
- max_partition_level_ = runtime_profile()->AddHighWaterMarkCounter(
- "MaxPartitionLevel", TUnit::UNIT);
- }
-
- RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, expr_perm_pool(),
- expr_results_pool(), &agg_fn_evals_));
-
- if (!grouping_exprs_.empty()) {
- RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_,
- grouping_exprs_, true, vector<bool>(build_exprs_.size(), true),
- state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_perm_pool(),
- expr_results_pool(), expr_results_pool(), &ht_ctx_));
- }
- state->CheckAndAddCodegenDisabledMessage(runtime_profile());
- return Status::OK();
-}
-
-void PartitionedAggregationNode::Codegen(RuntimeState* state) {
- DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
- if (IsNodeCodegenDisabled()) return;
-
- LlvmCodeGen* codegen = state->codegen();
- DCHECK(codegen != NULL);
- TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
- Status codegen_status = is_streaming_preagg_ ?
- CodegenProcessBatchStreaming(codegen, prefetch_mode) :
- CodegenProcessBatch(codegen, prefetch_mode);
- runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
-}
-
-Status PartitionedAggregationNode::Open(RuntimeState* state) {
- SCOPED_TIMER(runtime_profile_->total_time_counter());
- // Open the child before consuming resources in this node.
- RETURN_IF_ERROR(child(0)->Open(state));
- RETURN_IF_ERROR(ExecNode::Open(state));
-
- // Claim reservation after the child has been opened to reduce the peak reservation
- // requirement.
- if (!buffer_pool_client()->is_registered() && !grouping_exprs_.empty()) {
- DCHECK_GE(resource_profile_.min_reservation, MinReservation());
- RETURN_IF_ERROR(ClaimBufferReservation(state));
- }
-
- if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state));
- RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
- if (grouping_exprs_.empty()) {
- // Create the single output tuple for this non-grouping agg. This must happen after
- // opening the aggregate evaluators.
- singleton_output_tuple_ =
- ConstructSingletonOutputTuple(agg_fn_evals_, singleton_tuple_pool_.get());
- // Check for failures during AggFnEvaluator::Init().
- RETURN_IF_ERROR(state_->GetQueryStatus());
- singleton_output_tuple_returned_ = false;
- } else {
- if (ht_allocator_ == nullptr) {
- // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call.
- ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(),
- buffer_pool_client(), resource_profile_.spillable_buffer_size));
-
- if (!is_streaming_preagg_ && needs_serialize_) {
- serialize_stream_.reset(new BufferedTupleStream(state, &intermediate_row_desc_,
- buffer_pool_client(), resource_profile_.spillable_buffer_size,
- resource_profile_.max_row_buffer_size));
- RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
- bool got_buffer;
- // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up
- // another buffer during spilling.
- RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
- DCHECK(got_buffer) << "Accounted in min reservation"
- << buffer_pool_client()->DebugString();
- DCHECK(serialize_stream_->has_write_iterator());
- }
- }
- RETURN_IF_ERROR(CreateHashPartitions(0));
- }
-
- // Streaming preaggregations do all processing in GetNext().
- if (is_streaming_preagg_) return Status::OK();
-
- RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
- // Read all the rows from the child and process them.
- bool eos = false;
- do {
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(QueryMaintenance(state));
- RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
-
- if (UNLIKELY(VLOG_ROW_IS_ON)) {
- for (int i = 0; i < batch.num_rows(); ++i) {
- TupleRow* row = batch.GetRow(i);
- VLOG_ROW << "input row: " << PrintRow(row, *children_[0]->row_desc());
- }
- }
-
- TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
- SCOPED_TIMER(build_timer_);
- if (grouping_exprs_.empty()) {
- if (process_batch_no_grouping_fn_ != NULL) {
- RETURN_IF_ERROR(process_batch_no_grouping_fn_(this, &batch));
- } else {
- RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch));
- }
- } else {
- // There is grouping, so we will do partitioned aggregation.
- if (process_batch_fn_ != NULL) {
- RETURN_IF_ERROR(process_batch_fn_(this, &batch, prefetch_mode, ht_ctx_.get()));
- } else {
- RETURN_IF_ERROR(ProcessBatch<false>(&batch, prefetch_mode, ht_ctx_.get()));
- }
- }
- batch.Reset();
- } while (!eos);
-
- // The child can be closed at this point in most cases because we have consumed all of
- // the input from the child and transfered ownership of the resources we need. The
- // exception is if we are inside a subplan expecting to call Open()/GetNext() on the
- // child again,
- if (!IsInSubplan()) child(0)->Close(state);
- child_eos_ = true;
-
- // Done consuming child(0)'s input. Move all the partitions in hash_partitions_
- // to spilled_partitions_ or aggregated_partitions_. We'll finish the processing in
- // GetNext().
- if (!grouping_exprs_.empty()) {
- RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned()));
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::GetNext(
- RuntimeState* state, RowBatch* row_batch, bool* eos) {
- SCOPED_TIMER(runtime_profile_->total_time_counter());
- RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(QueryMaintenance(state));
-
- if (ReachedLimit()) {
- *eos = true;
- return Status::OK();
- }
-
- if (grouping_exprs_.empty()) {
- // There was no grouping, so evaluate the conjuncts and return the single result row.
- // We allow calling GetNext() after eos, so don't return this row again.
- if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
- singleton_output_tuple_returned_ = true;
- *eos = true;
- return Status::OK();
- }
-
- if (!child_eos_) {
- // For streaming preaggregations, we process rows from the child as we go.
- DCHECK(is_streaming_preagg_);
- RETURN_IF_ERROR(GetRowsStreaming(state, row_batch));
- } else if (!partition_eos_) {
- RETURN_IF_ERROR(GetRowsFromPartition(state, row_batch));
- }
-
- *eos = partition_eos_ && child_eos_;
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
- return Status::OK();
-}
-
-void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
- DCHECK(grouping_exprs_.empty());
- int row_idx = row_batch->AddRow();
- TupleRow* row = row_batch->GetRow(row_idx);
- // The output row batch may reference memory allocated by Serialize() or Finalize(),
- // allocating that memory directly from the row batch's pool means we can safely return
- // the batch.
- vector<ScopedResultsPool> allocate_from_batch_pool =
- ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool());
- Tuple* output_tuple = GetOutputTuple(agg_fn_evals_,
- singleton_output_tuple_, row_batch->tuple_data_pool());
- row->SetTuple(0, output_tuple);
- if (ExecNode::EvalConjuncts(
- conjunct_evals_.data(), conjunct_evals_.size(), row)) {
- row_batch->CommitLastRow();
- ++num_rows_returned_;
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
- }
- // Keep the current chunk to amortize the memory allocation over a series
- // of Reset()/Open()/GetNext()* calls.
- row_batch->tuple_data_pool()->AcquireData(singleton_tuple_pool_.get(), true);
- // This node no longer owns the memory for singleton_output_tuple_.
- singleton_output_tuple_ = NULL;
-}
-
-Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
- RowBatch* row_batch) {
- DCHECK(!row_batch->AtCapacity());
- if (output_iterator_.AtEnd()) {
- // Done with this partition, move onto the next one.
- if (output_partition_ != NULL) {
- output_partition_->Close(false);
- output_partition_ = NULL;
- }
- if (aggregated_partitions_.empty() && spilled_partitions_.empty()) {
- // No more partitions, all done.
- partition_eos_ = true;
- return Status::OK();
- }
- // Process next partition.
- RETURN_IF_ERROR(NextPartition());
- DCHECK(output_partition_ != NULL);
- }
-
- SCOPED_TIMER(get_results_timer_);
-
- // The output row batch may reference memory allocated by Serialize() or Finalize(),
- // allocating that memory directly from the row batch's pool means we can safely return
- // the batch.
- vector<ScopedResultsPool> allocate_from_batch_pool = ScopedResultsPool::Create(
- output_partition_->agg_fn_evals, row_batch->tuple_data_pool());
- int count = 0;
- const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
- // Keeping returning rows from the current partition.
- while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
- // This loop can go on for a long time if the conjuncts are very selective. Do query
- // maintenance every N iterations.
- if ((count++ & (N - 1)) == 0) {
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(QueryMaintenance(state));
- }
-
- int row_idx = row_batch->AddRow();
- TupleRow* row = row_batch->GetRow(row_idx);
- Tuple* intermediate_tuple = output_iterator_.GetTuple();
- Tuple* output_tuple = GetOutputTuple(output_partition_->agg_fn_evals,
- intermediate_tuple, row_batch->tuple_data_pool());
- output_iterator_.Next();
- row->SetTuple(0, output_tuple);
- DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
- if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) {
- row_batch->CommitLastRow();
- ++num_rows_returned_;
- if (ReachedLimit()) break;
- }
- }
-
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
- partition_eos_ = ReachedLimit();
- if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy();
-
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
- RowBatch* out_batch) {
- DCHECK(!child_eos_);
- DCHECK(is_streaming_preagg_);
-
- if (child_batch_ == NULL) {
- child_batch_.reset(new RowBatch(child(0)->row_desc(), state->batch_size(),
- mem_tracker()));
- }
-
- do {
- DCHECK_EQ(out_batch->num_rows(), 0);
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(QueryMaintenance(state));
-
- RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_));
-
- SCOPED_TIMER(streaming_timer_);
-
- int remaining_capacity[PARTITION_FANOUT];
- bool ht_needs_expansion = false;
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- HashTable* hash_tbl = GetHashTable(i);
- remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
- ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
- }
-
- // Stop expanding hash tables if we're not reducing the input sufficiently. As our
- // hash tables expand out of each level of cache hierarchy, every hash table lookup
- // will take longer. We also may not be able to expand hash tables because of memory
- // pressure. In this case HashTable::CheckAndResize() will fail. In either case we
- // should always use the remaining space in the hash table to avoid wasting memory.
- if (ht_needs_expansion && ShouldExpandPreaggHashTables()) {
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- HashTable* ht = GetHashTable(i);
- if (remaining_capacity[i] < child_batch_->num_rows()) {
- SCOPED_TIMER(ht_resize_timer_);
- bool resized;
- RETURN_IF_ERROR(
- ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get(), &resized));
- if (resized) {
- remaining_capacity[i] = ht->NumInsertsBeforeResize();
- }
- }
- }
- }
-
- TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
- if (process_batch_streaming_fn_ != NULL) {
- RETURN_IF_ERROR(process_batch_streaming_fn_(this, needs_serialize_, prefetch_mode,
- child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
- } else {
- RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_, prefetch_mode,
- child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
- }
-
- child_batch_->Reset(); // All rows from child_batch_ were processed.
- } while (out_batch->num_rows() == 0 && !child_eos_);
-
- if (child_eos_) {
- child(0)->Close(state);
- child_batch_.reset();
- RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned()));
- }
-
- num_rows_returned_ += out_batch->num_rows();
- COUNTER_SET(num_passthrough_rows_, num_rows_returned_);
- return Status::OK();
-}
-
-bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
- int64_t ht_mem = 0;
- int64_t ht_rows = 0;
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- HashTable* ht = hash_partitions_[i]->hash_tbl.get();
- ht_mem += ht->CurrentMemSize();
- ht_rows += ht->size();
- }
-
- // Need some rows in tables to have valid statistics.
- if (ht_rows == 0) return true;
-
- // Find the appropriate reduction factor in our table for the current hash table sizes.
- int cache_level = 0;
- while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
- ++cache_level;
- }
-
- // Compare the number of rows in the hash table with the number of input rows that
- // were aggregated into it. Exclude passed through rows from this calculation since
- // they were not in hash tables.
- const int64_t input_rows = children_[0]->rows_returned();
- const int64_t aggregated_input_rows = input_rows - num_rows_returned_;
- const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_;
- double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows;
-
- // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be
- // inaccurate, which could lead to a divide by zero below.
- if (aggregated_input_rows <= 0) return true;
-
- // Extrapolate the current reduction factor (r) using the formula
- // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data
- // set, N is the number of input rows, excluding passed-through rows, and n is the
- // number of rows inserted or merged into the hash tables. This is a very rough
- // approximation but is good enough to be useful.
- // TODO: consider collecting more statistics to better estimate reduction.
- double estimated_reduction = aggregated_input_rows >= expected_input_rows
- ? current_reduction
- : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
- STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
- COUNTER_SET(preagg_estimated_reduction_, estimated_reduction);
- COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
- return estimated_reduction > min_reduction;
-}
-
-void PartitionedAggregationNode::CleanupHashTbl(
- const vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it) {
- if (!needs_finalize_ && !needs_serialize_) return;
-
- // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
- // them in order to free any memory allocated by UDAs.
- if (needs_finalize_) {
- // Finalize() requires a dst tuple but we don't actually need the result,
- // so allocate a single dummy tuple to avoid accumulating memory.
- Tuple* dummy_dst = NULL;
- dummy_dst = Tuple::Create(
- output_tuple_desc_->byte_size(), singleton_tuple_pool_.get());
- while (!it.AtEnd()) {
- Tuple* tuple = it.GetTuple();
- AggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst);
- it.Next();
- // Free any expr result allocations to prevent them accumulating excessively.
- expr_results_pool_->Clear();
- }
- } else {
- while (!it.AtEnd()) {
- Tuple* tuple = it.GetTuple();
- AggFnEvaluator::Serialize(agg_fn_evals, tuple);
- it.Next();
- // Free any expr result allocations to prevent them accumulating excessively.
- expr_results_pool_->Clear();
- }
- }
-}
-
-Status PartitionedAggregationNode::Reset(RuntimeState* state) {
- DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
- if (!grouping_exprs_.empty()) {
- child_eos_ = false;
- partition_eos_ = false;
- // Reset the HT and the partitions for this grouping agg.
- ht_ctx_->set_level(0);
- ClosePartitions();
- }
- return ExecNode::Reset(state);
-}
-
-void PartitionedAggregationNode::Close(RuntimeState* state) {
- if (is_closed()) return;
-
- if (!singleton_output_tuple_returned_) {
- GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, singleton_tuple_pool_.get());
- }
-
- // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
- // them in order to free any memory allocated by UDAs
- if (output_partition_ != NULL) {
- CleanupHashTbl(output_partition_->agg_fn_evals, output_iterator_);
- output_partition_->Close(false);
- }
-
- ClosePartitions();
-
- child_batch_.reset();
-
- // Close all the agg-fn-evaluators
- AggFnEvaluator::Close(agg_fn_evals_, state);
-
- if (singleton_tuple_pool_.get() != nullptr) singleton_tuple_pool_->FreeAll();
- if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
- ht_ctx_.reset();
- if (serialize_stream_.get() != nullptr) {
- serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- }
- ScalarExpr::Close(grouping_exprs_);
- ScalarExpr::Close(build_exprs_);
- AggFn::Close(agg_fns_);
- ExecNode::Close(state);
-}
-
-PartitionedAggregationNode::Partition::~Partition() {
- DCHECK(is_closed);
-}
-
-Status PartitionedAggregationNode::Partition::InitStreams() {
- agg_fn_perm_pool.reset(new MemPool(parent->expr_mem_tracker()));
- DCHECK_EQ(agg_fn_evals.size(), 0);
- AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_perm_pool.get(),
- parent->expr_results_pool(), parent->agg_fn_evals_, &agg_fn_evals);
- // Varlen aggregate function results are stored outside of aggregated_row_stream because
- // BufferedTupleStream doesn't support relocating varlen data stored in the stream.
- auto agg_slot = parent->intermediate_tuple_desc_->slots().begin() +
- parent->grouping_exprs_.size();
- set<SlotId> external_varlen_slots;
- for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); ++agg_slot) {
- if ((*agg_slot)->type().IsVarLenStringType()) {
- external_varlen_slots.insert((*agg_slot)->id());
- }
- }
-
- aggregated_row_stream.reset(
- new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
- parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
- parent->resource_profile_.max_row_buffer_size, external_varlen_slots));
- RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true));
- bool got_buffer;
- RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
- DCHECK(got_buffer) << "Buffer included in reservation " << parent->id_ << "\n"
- << parent->buffer_pool_client()->DebugString() << "\n"
- << parent->DebugString(2);
- if (!parent->is_streaming_preagg_) {
- unaggregated_row_stream.reset(
- new BufferedTupleStream(parent->state_, parent->child(0)->row_desc(),
- parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
- parent->resource_profile_.max_row_buffer_size));
- // This stream is only used to spill, no need to ever have this pinned.
- RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), false));
- // Save memory by waiting until we spill to allocate the write buffer for the
- // unaggregated row stream.
- DCHECK(!unaggregated_row_stream->has_write_iterator());
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
- DCHECK(aggregated_row_stream != nullptr);
- DCHECK(hash_tbl == nullptr);
- // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
- // remaining bits can be used for the hash table.
- // TODO: we could switch to 64 bit hashes and then we don't need a max size.
- // It might be reasonable to limit individual hash table size for other reasons
- // though. Always start with small buffers.
- hash_tbl.reset(HashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr,
- 1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ));
- // Please update the error message in CreateHashPartitions() if initial size of
- // hash table changes.
- return hash_tbl->Init(got_memory);
-}
-
-Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
- DCHECK(!parent->is_streaming_preagg_);
- if (parent->needs_serialize_) {
- // We need to do a lot more work in this case. This step effectively does a merge
- // aggregation in this node. We need to serialize the intermediates, spill the
- // intermediates and then feed them into the aggregate function's merge step.
- // This is often used when the intermediate is a string type, meaning the current
- // (before serialization) in-memory layout is not the on-disk block layout.
- // The disk layout does not support mutable rows. We need to rewrite the stream
- // into the on disk format.
- // TODO: if it happens to not be a string, we could serialize in place. This is
- // a future optimization since it is very unlikely to have a serialize phase
- // for those UDAs.
- DCHECK(parent->serialize_stream_.get() != NULL);
- DCHECK(!parent->serialize_stream_->is_pinned());
-
- // Serialize and copy the spilled partition's stream into the new stream.
- Status status;
- BufferedTupleStream* new_stream = parent->serialize_stream_.get();
- HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
- while (!it.AtEnd()) {
- Tuple* tuple = it.GetTuple();
- it.Next();
- AggFnEvaluator::Serialize(agg_fn_evals, tuple);
- if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) {
- DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error";
- // Even if we can't add to new_stream, finish up processing this agg stream to make
- // clean up easier (someone has to finalize this stream and we don't want to remember
- // where we are).
- parent->CleanupHashTbl(agg_fn_evals, it);
- hash_tbl->Close();
- hash_tbl.reset();
- aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- return status;
- }
- }
-
- aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- aggregated_row_stream.swap(parent->serialize_stream_);
- // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
- // when we need to spill again. We need to have this available before we need
- // to spill to make sure it is available. This should be acquirable since we just
- // freed at least one buffer from this partition's (old) aggregated_row_stream.
- parent->serialize_stream_.reset(
- new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
- parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
- parent->resource_profile_.max_row_buffer_size));
- status = parent->serialize_stream_->Init(parent->id(), false);
- if (status.ok()) {
- bool got_buffer;
- status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
- DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation";
- }
- if (!status.ok()) {
- hash_tbl->Close();
- hash_tbl.reset();
- return status;
- }
- DCHECK(parent->serialize_stream_->has_write_iterator());
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
- DCHECK(!parent->is_streaming_preagg_);
- DCHECK(!is_closed);
- DCHECK(!is_spilled());
- RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker()));
-
- RETURN_IF_ERROR(SerializeStreamForSpilling());
-
- // Free the in-memory result data.
- AggFnEvaluator::Close(agg_fn_evals, parent->state_);
- agg_fn_evals.clear();
-
- if (agg_fn_perm_pool.get() != nullptr) {
- agg_fn_perm_pool->FreeAll();
- agg_fn_perm_pool.reset();
- }
-
- hash_tbl->Close();
- hash_tbl.reset();
-
- // Unpin the stream to free memory, but leave a write buffer in place so we can
- // continue appending rows to one of the streams in the partition.
- DCHECK(aggregated_row_stream->has_write_iterator());
- DCHECK(!unaggregated_row_stream->has_write_iterator());
- if (more_aggregate_rows) {
- aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
- } else {
- aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
- bool got_buffer;
- RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
- DCHECK(got_buffer) << "Accounted in min reservation"
- << parent->buffer_pool_client()->DebugString();
- }
-
- COUNTER_ADD(parent->num_spilled_partitions_, 1);
- if (parent->num_spilled_partitions_->value() == 1) {
- parent->runtime_profile()->AppendExecOption("Spilled");
- }
- return Status::OK();
-}
-
-void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
- if (is_closed) return;
- is_closed = true;
- if (aggregated_row_stream.get() != NULL) {
- if (finalize_rows && hash_tbl.get() != NULL) {
- // We need to walk all the rows and Finalize them here so the UDA gets a chance
- // to cleanup. If the hash table is gone (meaning this was spilled), the rows
- // should have been finalized/serialized in Spill().
- parent->CleanupHashTbl(agg_fn_evals, hash_tbl->Begin(parent->ht_ctx_.get()));
- }
- aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- }
- if (hash_tbl.get() != NULL) hash_tbl->Close();
- if (unaggregated_row_stream.get() != NULL) {
- unaggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- }
- for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
- if (agg_fn_perm_pool.get() != nullptr) agg_fn_perm_pool->FreeAll();
-}
-
-Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
- const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
- DCHECK(grouping_exprs_.empty());
- Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), pool);
- InitAggSlots(agg_fn_evals, output_tuple);
- return output_tuple;
-}
-
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
- const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool,
- Status* status) noexcept {
- const int fixed_size = intermediate_tuple_desc_->byte_size();
- const int varlen_size = GroupingExprsVarlenSize();
- const int tuple_data_size = fixed_size + varlen_size;
- uint8_t* tuple_data = pool->TryAllocate(tuple_data_size);
- if (UNLIKELY(tuple_data == NULL)) {
- string details = Substitute("Cannot perform aggregation at node with id $0. Failed "
- "to allocate $1 bytes for intermediate tuple.", id_, tuple_data_size);
- *status = pool->mem_tracker()->MemLimitExceeded(state_, details, tuple_data_size);
- return NULL;
- }
- memset(tuple_data, 0, fixed_size);
- Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(tuple_data);
- uint8_t* varlen_data = tuple_data + fixed_size;
- CopyGroupingValues(intermediate_tuple, varlen_data, varlen_size);
- InitAggSlots(agg_fn_evals, intermediate_tuple);
- return intermediate_tuple;
-}
-
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
- const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream,
- Status* status) noexcept {
- DCHECK(stream != NULL && status != NULL);
- // Allocate space for the entire tuple in the stream.
- const int fixed_size = intermediate_tuple_desc_->byte_size();
- const int varlen_size = GroupingExprsVarlenSize();
- const int tuple_size = fixed_size + varlen_size;
- uint8_t* tuple_data = stream->AddRowCustomBegin(tuple_size, status);
- if (UNLIKELY(tuple_data == nullptr)) {
- // If we failed to allocate and did not hit an error (indicated by a non-ok status),
- // the caller of this function can try to free some space, e.g. through spilling, and
- // re-attempt to allocate space for this row.
- return nullptr;
- }
- Tuple* tuple = reinterpret_cast<Tuple*>(tuple_data);
- tuple->Init(fixed_size);
- uint8_t* varlen_buffer = tuple_data + fixed_size;
- CopyGroupingValues(tuple, varlen_buffer, varlen_size);
- InitAggSlots(agg_fn_evals, tuple);
- stream->AddRowCustomEnd(tuple_size);
- return tuple;
-}
-
-int PartitionedAggregationNode::GroupingExprsVarlenSize() {
- int varlen_size = 0;
- // TODO: The hash table could compute this as it hashes.
- for (int expr_idx: string_grouping_exprs_) {
- StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx));
- // Avoid branching by multiplying length by null bit.
- varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx);
- }
- return varlen_size;
-}
-
-// TODO: codegen this function.
-void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
- uint8_t* buffer, int varlen_size) {
- // Copy over all grouping slots (the variable length data is copied below).
- for (int i = 0; i < grouping_exprs_.size(); ++i) {
- SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i];
- if (ht_ctx_->ExprValueNull(i)) {
- intermediate_tuple->SetNull(slot_desc->null_indicator_offset());
- } else {
- void* src = ht_ctx_->ExprValue(i);
- void* dst = intermediate_tuple->GetSlot(slot_desc->tuple_offset());
- memcpy(dst, src, slot_desc->slot_size());
- }
- }
-
- for (int expr_idx: string_grouping_exprs_) {
- if (ht_ctx_->ExprValueNull(expr_idx)) continue;
-
- SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx];
- // ptr and len were already copied to the fixed-len part of string value
- StringValue* sv = reinterpret_cast<StringValue*>(
- intermediate_tuple->GetSlot(slot_desc->tuple_offset()));
- memcpy(buffer, sv->ptr, sv->len);
- sv->ptr = reinterpret_cast<char*>(buffer);
- buffer += sv->len;
- }
-}
-
-// TODO: codegen this function.
-void PartitionedAggregationNode::InitAggSlots(
- const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
- vector<SlotDescriptor*>::const_iterator slot_desc =
- intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size();
- for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) {
- // To minimize branching on the UpdateTuple path, initialize the result value so that
- // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for
- // SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can
- // just start adding to the destination value (rather than repeatedly checking the
- // destination NULL bit. The codegen'd version of UpdateSlot() exploits this to
- // eliminate a branch per value.
- //
- // For boolean and numeric types, the default values are false/0, so the nullable
- // aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(),
- // initialize the value to max/min possible value for the same effect.
- AggFnEvaluator* eval = agg_fn_evals[i];
- eval->Init(intermediate_tuple);
-
- DCHECK(agg_fns_[i] == &(eval->agg_fn()));
- const AggFn* agg_fn = agg_fns_[i];
- const AggFn::AggregationOp agg_op = agg_fn->agg_op();
- if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) &&
- !agg_fn->intermediate_type().IsStringType() &&
- !agg_fn->intermediate_type().IsTimestampType()) {
- ExprValue default_value;
- void* default_value_ptr = NULL;
- if (agg_op == AggFn::MIN) {
- default_value_ptr = default_value.SetToMax((*slot_desc)->type());
- } else {
- DCHECK_EQ(agg_op, AggFn::MAX);
- default_value_ptr = default_value.SetToMin((*slot_desc)->type());
- }
- RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
- }
- }
-}
-
-void PartitionedAggregationNode::UpdateTuple(AggFnEvaluator** agg_fn_evals,
- Tuple* tuple, TupleRow* row, bool is_merge) noexcept {
- DCHECK(tuple != NULL || agg_fns_.empty());
- for (int i = 0; i < agg_fns_.size(); ++i) {
- if (is_merge) {
- agg_fn_evals[i]->Merge(row->GetTuple(0), tuple);
- } else {
- agg_fn_evals[i]->Add(row, tuple);
- }
- }
-}
-
-Tuple* PartitionedAggregationNode::GetOutputTuple(
- const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
- DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple;
- Tuple* dst = tuple;
- if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
- dst = Tuple::Create(output_tuple_desc_->byte_size(), pool);
- }
- if (needs_finalize_) {
- AggFnEvaluator::Finalize(agg_fn_evals, tuple, dst);
- } else {
- AggFnEvaluator::Serialize(agg_fn_evals, tuple);
- }
- // Copy grouping values from tuple to dst.
- // TODO: Codegen this.
- if (dst != tuple) {
- int num_grouping_slots = grouping_exprs_.size();
- for (int i = 0; i < num_grouping_slots; ++i) {
- SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
- SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
- bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
- void* src_slot = NULL;
- if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
- RawValue::Write(src_slot, dst, dst_slot_desc, NULL);
- }
- }
- return dst;
-}
-
-template <bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AppendSpilledRow(
- Partition* __restrict__ partition, TupleRow* __restrict__ row) {
- DCHECK(!is_streaming_preagg_);
- DCHECK(partition->is_spilled());
- BufferedTupleStream* stream = AGGREGATED_ROWS ?
- partition->aggregated_row_stream.get() :
- partition->unaggregated_row_stream.get();
- DCHECK(!stream->is_pinned());
- Status status;
- if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
- RETURN_IF_ERROR(status);
-
- // Keep trying to free memory by spilling until we succeed or hit an error.
- // Running out of partitions to spill is treated as an error by SpillPartition().
- while (true) {
- RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
- if (stream->AddRow(row, &status)) return Status::OK();
- RETURN_IF_ERROR(status);
- }
-}
-
-string PartitionedAggregationNode::DebugString(int indentation_level) const {
- stringstream ss;
- DebugString(indentation_level, &ss);
- return ss.str();
-}
-
-void PartitionedAggregationNode::DebugString(
- int indentation_level, stringstream* out) const {
- *out << string(indentation_level * 2, ' ');
- *out << "PartitionedAggregationNode("
- << "intermediate_tuple_id=" << intermediate_tuple_id_
- << " output_tuple_id=" << output_tuple_id_
- << " needs_finalize=" << needs_finalize_
- << " grouping_exprs=" << ScalarExpr::DebugString(grouping_exprs_)
- << " agg_exprs=" << AggFn::DebugString(agg_fns_);
- ExecNode::DebugString(indentation_level, out);
- *out << ")";
-}
-
-Status PartitionedAggregationNode::CreateHashPartitions(
- int level, int single_partition_idx) {
- if (is_streaming_preagg_) DCHECK_EQ(level, 0);
- if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
- return Status(
- TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, MAX_PARTITION_DEPTH);
- }
- ht_ctx_->set_level(level);
-
- DCHECK(hash_partitions_.empty());
- int num_partitions_created = 0;
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- hash_tbls_[i] = nullptr;
- if (single_partition_idx == -1 || i == single_partition_idx) {
- Partition* new_partition = partition_pool_->Add(new Partition(this, level, i));
- ++num_partitions_created;
- hash_partitions_.push_back(new_partition);
- RETURN_IF_ERROR(new_partition->InitStreams());
- } else {
- hash_partitions_.push_back(nullptr);
- }
- }
- // Now that all the streams are reserved (meaning we have enough memory to execute
- // the algorithm), allocate the hash tables. These can fail and we can still continue.
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- Partition* partition = hash_partitions_[i];
- if (partition == nullptr) continue;
- if (partition->aggregated_row_stream == nullptr) {
- // Failed to create the aggregated row stream - cannot create a hash table.
- // Just continue with a NULL hash table so rows will be passed through.
- DCHECK(is_streaming_preagg_);
- } else {
- bool got_memory;
- RETURN_IF_ERROR(partition->InitHashTable(&got_memory));
- // Spill the partition if we cannot create a hash table for a merge aggregation.
- if (UNLIKELY(!got_memory)) {
- DCHECK(!is_streaming_preagg_) << "Preagg reserves enough memory for hash tables";
- // If we're repartitioning, we will be writing aggregated rows first.
- RETURN_IF_ERROR(partition->Spill(level > 0));
- }
- }
- hash_tbls_[i] = partition->hash_tbl.get();
- }
- // In this case we did not have to repartition, so ensure that while building the hash
- // table all rows will be inserted into the partition at 'single_partition_idx' in case
- // a non deterministic grouping expression causes a row to hash to a different
- // partition index.
- if (single_partition_idx != -1) {
- Partition* partition = hash_partitions_[single_partition_idx];
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- hash_partitions_[i] = partition;
- hash_tbls_[i] = partition->hash_tbl.get();
- }
- }
-
- COUNTER_ADD(partitions_created_, num_partitions_created);
- if (!is_streaming_preagg_) {
- COUNTER_SET(max_partition_level_, level);
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
- bool partitioning_aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) {
- DCHECK(!is_streaming_preagg_);
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- Partition* partition = hash_partitions_[i];
- if (partition == nullptr) continue;
- while (!partition->is_spilled()) {
- {
- SCOPED_TIMER(ht_resize_timer_);
- bool resized;
- RETURN_IF_ERROR(partition->hash_tbl->CheckAndResize(num_rows, ht_ctx, &resized));
- if (resized) break;
- }
- RETURN_IF_ERROR(SpillPartition(partitioning_aggregated_rows));
- }
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::NextPartition() {
- DCHECK(output_partition_ == nullptr);
-
- if (!IsInSubplan() && spilled_partitions_.empty()) {
- // All partitions are in memory. Release reservation that was used for previous
- // partitions that is no longer needed. If we have spilled partitions, we want to
- // hold onto all reservation in case it is needed to process the spilled partitions.
- DCHECK(!buffer_pool_client()->has_unpinned_pages());
- Status status = ReleaseUnusedReservation();
- DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are "
- << "no unpinned pages. " << status.GetDetail();
- }
-
- // Keep looping until we get to a partition that fits in memory.
- Partition* partition = nullptr;
- while (true) {
- // First return partitions that are fully aggregated (and in memory).
- if (!aggregated_partitions_.empty()) {
- partition = aggregated_partitions_.front();
- DCHECK(!partition->is_spilled());
- aggregated_partitions_.pop_front();
- break;
- }
-
- // No aggregated partitions in memory - we should not be using any reservation aside
- // from 'serialize_stream_'.
- DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0,
- buffer_pool_client()->GetUsedReservation())
- << buffer_pool_client()->DebugString();
-
- // Try to fit a single spilled partition in memory. We can often do this because
- // we only need to fit 1/PARTITION_FANOUT of the data in memory.
- // TODO: in some cases when the partition probably won't fit in memory it could
- // be better to skip directly to repartitioning.
- RETURN_IF_ERROR(BuildSpilledPartition(&partition));
- if (partition != nullptr) break;
-
- // If we can't fit the partition in memory, repartition it.
- RETURN_IF_ERROR(RepartitionSpilledPartition());
- }
- DCHECK(!partition->is_spilled());
- DCHECK(partition->hash_tbl.get() != nullptr);
- DCHECK(partition->aggregated_row_stream->is_pinned());
-
- output_partition_ = partition;
- output_iterator_ = output_partition_->hash_tbl->Begin(ht_ctx_.get());
- COUNTER_ADD(num_hash_buckets_, output_partition_->hash_tbl->num_buckets());
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) {
- DCHECK(!spilled_partitions_.empty());
- DCHECK(!is_streaming_preagg_);
- // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
- Partition* src_partition = spilled_partitions_.front();
- DCHECK(src_partition->is_spilled());
-
- // Create a new hash partition from the rows of the spilled partition. This is simpler
- // than trying to finish building a partially-built partition in place. We only
- // initialise one hash partition that all rows in 'src_partition' will hash to.
- RETURN_IF_ERROR(CreateHashPartitions(src_partition->level, src_partition->idx));
- Partition* dst_partition = hash_partitions_[src_partition->idx];
- DCHECK(dst_partition != nullptr);
-
- // Rebuild the hash table over spilled aggregate rows then start adding unaggregated
- // rows to the hash table. It's possible the partition will spill at either stage.
- // In that case we need to finish processing 'src_partition' so that all rows are
- // appended to 'dst_partition'.
- // TODO: if the partition spills again but the aggregation reduces the input
- // significantly, we could do better here by keeping the incomplete hash table in
- // memory and only spilling unaggregated rows that didn't fit in the hash table
- // (somewhat similar to the passthrough pre-aggregation).
- RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get()));
- RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get()));
- src_partition->Close(false);
- spilled_partitions_.pop_front();
- hash_partitions_.clear();
-
- if (dst_partition->is_spilled()) {
- PushSpilledPartition(dst_partition);
- *built_partition = nullptr;
- // Spilled the partition - we should not be using any reservation except from
- // 'serialize_stream_'.
- DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0,
- buffer_pool_client()->GetUsedReservation())
- << buffer_pool_client()->DebugString();
- } else {
- *built_partition = dst_partition;
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::RepartitionSpilledPartition() {
- DCHECK(!spilled_partitions_.empty());
- DCHECK(!is_streaming_preagg_);
- // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
- Partition* partition = spilled_partitions_.front();
- DCHECK(partition->is_spilled());
-
- // Create the new hash partitions to repartition into. This will allocate a
- // write buffer for each partition's aggregated row stream.
- RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1));
- COUNTER_ADD(num_repartitions_, 1);
-
- // Rows in this partition could have been spilled into two streams, depending
- // on if it is an aggregated intermediate, or an unaggregated row. Aggregated
- // rows are processed first to save a hash table lookup in ProcessBatch().
- RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
-
- // Prepare write buffers so we can append spilled rows to unaggregated partitions.
- for (Partition* hash_partition : hash_partitions_) {
- if (!hash_partition->is_spilled()) continue;
- // The aggregated rows have been repartitioned. Free up at least a buffer's worth of
- // reservation and use it to pin the unaggregated write buffer.
- hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
- bool got_buffer;
- RETURN_IF_ERROR(
- hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
- DCHECK(got_buffer) << "Accounted in min reservation"
- << buffer_pool_client()->DebugString();
- }
- RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
-
- COUNTER_ADD(num_row_repartitioned_, partition->aggregated_row_stream->num_rows());
- COUNTER_ADD(num_row_repartitioned_, partition->unaggregated_row_stream->num_rows());
-
- partition->Close(false);
- spilled_partitions_.pop_front();
-
- // Done processing this partition. Move the new partitions into
- // spilled_partitions_/aggregated_partitions_.
- int64_t num_input_rows = partition->aggregated_row_stream->num_rows()
- + partition->unaggregated_row_stream->num_rows();
- RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
- return Status::OK();
-}
-
-template <bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stream) {
- DCHECK(!is_streaming_preagg_);
- if (input_stream->num_rows() > 0) {
- while (true) {
- bool got_buffer = false;
- RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer));
- if (got_buffer) break;
- // Did not have a buffer to read the input stream. Spill and try again.
- RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
- }
-
- TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
- bool eos = false;
- const RowDescriptor* desc =
- AGGREGATED_ROWS ? &intermediate_row_desc_ : children_[0]->row_desc();
- RowBatch batch(desc, state_->batch_size(), mem_tracker());
- do {
- RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
- RETURN_IF_ERROR(
- ProcessBatch<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get()));
- RETURN_IF_ERROR(QueryMaintenance(state_));
- batch.Reset();
- } while (!eos);
- }
- input_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
- int64_t max_freed_mem = 0;
- int partition_idx = -1;
-
- // Iterate over the partitions and pick the largest partition that is not spilled.
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- if (hash_partitions_[i] == nullptr) continue;
- if (hash_partitions_[i]->is_closed) continue;
- if (hash_partitions_[i]->is_spilled()) continue;
- // Pass 'true' because we need to keep the write block pinned. See Partition::Spill().
- int64_t mem = hash_partitions_[i]->aggregated_row_stream->BytesPinned(true);
- mem += hash_partitions_[i]->hash_tbl->ByteSize();
- mem += hash_partitions_[i]->agg_fn_perm_pool->total_reserved_bytes();
- DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory.
- if (mem > max_freed_mem) {
- max_freed_mem = mem;
- partition_idx = i;
- }
- }
- DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to "
- << "reclaim memory: "
- << buffer_pool_client()->DebugString();
- // Remove references to the destroyed hash table from 'hash_tbls_'.
- // Additionally, we might be dealing with a rebuilt spilled partition, where all
- // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_'
- // remains consistent in that case.
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr;
- }
- return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
-}
-
-Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
- DCHECK(!hash_partitions_.empty());
- stringstream ss;
- ss << "PA(node_id=" << id() << ") partitioned(level=" << hash_partitions_[0]->level
- << ") " << num_input_rows << " rows into:" << endl;
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* partition = hash_partitions_[i];
- if (partition == nullptr) continue;
- // We might be dealing with a rebuilt spilled partition, where all partitions are
- // pointing to a single in-memory partition, so make sure we only proceed for the
- // right partition.
- if(i != partition->idx) continue;
- int64_t aggregated_rows = 0;
- if (partition->aggregated_row_stream != nullptr) {
- aggregated_rows = partition->aggregated_row_stream->num_rows();
- }
- int64_t unaggregated_rows = 0;
- if (partition->unaggregated_row_stream != nullptr) {
- unaggregated_rows = partition->unaggregated_row_stream->num_rows();
- }
- double total_rows = aggregated_rows + unaggregated_rows;
- double percent = total_rows * 100 / num_input_rows;
- ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
- << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
- << " #aggregated rows:" << aggregated_rows << endl
- << " #unaggregated rows: " << unaggregated_rows << endl;
-
- // TODO: update counters to support doubles.
- COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
-
- if (total_rows == 0) {
- partition->Close(false);
- } else if (partition->is_spilled()) {
- PushSpilledPartition(partition);
- } else {
- aggregated_partitions_.push_back(partition);
- }
- }
- VLOG(2) << ss.str();
- hash_partitions_.clear();
- return Status::OK();
-}
-
-void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
- DCHECK(partition->is_spilled());
- DCHECK(partition->hash_tbl == nullptr);
- // Ensure all pages in the spilled partition's streams are unpinned by invalidating
- // the streams' read and write iterators. We may need all the memory to process the
- // next spilled partitions.
- partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
- partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
- spilled_partitions_.push_front(partition);
-}
-
-void PartitionedAggregationNode::ClosePartitions() {
- for (Partition* partition : hash_partitions_) {
- if (partition != nullptr) partition->Close(true);
- }
- hash_partitions_.clear();
- for (Partition* partition : aggregated_partitions_) partition->Close(true);
- aggregated_partitions_.clear();
- for (Partition* partition : spilled_partitions_) partition->Close(true);
- spilled_partitions_.clear();
- memset(hash_tbls_, 0, sizeof(hash_tbls_));
- partition_pool_->Clear();
-}
-
-// IR Generation for updating a single aggregation slot. Signature is:
-// void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row)
-//
-// The IR for sum(double_col), which is constructed directly with the IRBuilder, is:
-//
-// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-// <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #33 {
-// entry:
-// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
-// @_ZNK6impala14AggFnEvaluator11input_evalsEv(
-// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-// %0 = getelementptr %"class.impala::ScalarExprEvaluator"*,
-// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
-// %input_eval = load %"class.impala::ScalarExprEvaluator"*,
-// %"class.impala::ScalarExprEvaluator"** %0
-// %input0 = call { i8, double } @GetSlotRef(%"class.impala::ScalarExprEvaluator"*
-// %input_eval, %"class.impala::TupleRow"* %row)
-// %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>,
-// <{ double, i8 }>* %agg_tuple, i32 0, i32 0
-// %dst_val = load double, double* %dst_slot_ptr
-// %1 = extractvalue { i8, double } %input0, 0
-// %is_null = trunc i8 %1 to i1
-// br i1 %is_null, label %ret, label %not_null
-//
-// ret: ; preds = %not_null, %entry
-// ret void
-//
-// not_null: ; preds = %entry
-// %val = extractvalue { i8, double } %input0, 1
-// %2 = fadd double %dst_val, %val
-// %3 = bitcast <{ double, i8 }>* %agg_tuple to i8*
-// %null_byte_ptr = getelementptr inbounds i8, i8* %3, i32 8
-// %null_byte = load i8, i8* %null_byte_ptr
-// %null_bit_cleared = and i8 %null_byte, -2
-// store i8 %null_bit_cleared, i8* %null_byte_ptr
-// store double %2, double* %dst_slot_ptr
-// br label %ret
-// }
-//
-// The IR for ndv(timestamp_col), which uses the UDA interface, is:
-//
-// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-// <{ [1024 x i8] }>* %agg_tuple,
-// %"class.impala::TupleRow"* %row) #39 {
-// entry:
-// %dst_lowered_ptr = alloca { i64, i8* }
-// %0 = alloca { i64, i64 }
-// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
-// @_ZNK6impala14AggFnEvaluator11input_evalsEv(
-// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-// %1 = getelementptr %"class.impala::ScalarExprEvaluator"*,
-// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
-// %input_eval = load %"class.impala::ScalarExprEvaluator"*,
-// %"class.impala::ScalarExprEvaluator"** %1
-// %input0 = call { i64, i64 } @GetSlotRef(
-// %"class.impala::ScalarExprEvaluator"* %input_eval,
-// %"class.impala::TupleRow"* %row)
-// %dst_slot_ptr = getelementptr inbounds <{ [1024 x i8] }>,
-// <{ [1024 x i8] }>* %agg_tuple, i32 0, i32 0
-// %2 = bitcast [1024 x i8]* %dst_slot_ptr to i8*
-// %dst = insertvalue { i64, i8* } zeroinitializer, i8* %2, 1
-// %3 = extractvalue { i64, i8* } %dst, 0
-// %4 = and i64 %3, 4294967295
-// %5 = or i64 %4, 4398046511104
-// %dst1 = insertvalue { i64, i8* } %dst, i64 %5, 0
-// %agg_fn_ctx = call %"class.impala_udf::FunctionContext"*
-// @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv(
-// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-// store { i64, i64 } %input0, { i64, i64 }* %0
-// %input_unlowered_ptr =
-// bitcast { i64, i64 }* %0 to %"struct.impala_udf::TimestampVal"*
-// store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr
-// %dst_unlowered_ptr =
-// bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
-// call void @"void impala::AggregateFunctions::HllUpdate<impala_udf::TimestampVal>"(
-// %"class.impala_udf::FunctionContext"* %agg_fn_ctx,
-// %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr,
-// %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
-// %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
-// br label %ret
-//
-// ret: ; preds = %entry
-// ret void
-// }
-//
-Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
- SlotDescriptor* slot_desc, llvm::Function** fn) {
- llvm::PointerType* agg_fn_eval_type = codegen->GetStructPtrType<AggFnEvaluator>();
- llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
- if (tuple_struct == NULL) {
- return Status("PartitionedAggregationNode::CodegenUpdateSlot(): failed to generate "
- "intermediate tuple desc");
- }
- llvm::PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
- llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
-
- LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
- prototype.AddArgument(
- LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
- LlvmBuilder builder(codegen->context());
- llvm::Value* args[3];
- *fn = prototype.GeneratePrototype(&builder, &args[0]);
- llvm::Value* agg_fn_eval_arg = args[0];
- llvm::Value* agg_tuple_arg = args[1];
- llvm::Value* row_arg = args[2];
-
- // Get the vector of input expressions' evaluators.
- llvm::Value* input_evals_vector = codegen->CodegenCallFunction(&builder,
- IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg,
- "input_evals_vector");
-
- AggFn* agg_fn = agg_fns_[agg_fn_idx];
- const int num_inputs = agg_fn->GetNumChildren();
- DCHECK_GE(num_inputs, 1);
- vector<CodegenAnyVal> input_vals;
- for (int i = 0; i < num_inputs; ++i) {
- ScalarExpr* input_expr = agg_fn->GetChild(i);
- llvm::Function* input_expr_fn;
- RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(codegen, &input_expr_fn));
- DCHECK(input_expr_fn != NULL);
-
- // Call input expr function with the matching evaluator to get src slot value.
- llvm::Value* input_eval =
- codegen->CodegenArrayAt(&builder, input_evals_vector, i, "input_eval");
- string input_name = Substitute("input$0", i);
- CodegenAnyVal input_val = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
- input_expr->type(), input_expr_fn,
- llvm::ArrayRef<llvm::Value*>({input_eval, row_arg}), input_name.c_str());
- input_vals.push_back(input_val);
- }
-
- AggFn::AggregationOp agg_op = agg_fn->agg_op();
- const ColumnType& dst_type = agg_fn->intermediate_type();
- bool dst_is_int_or_float_or_bool = dst_type.IsIntegerType()
- || dst_type.IsFloatingPointType() || dst_type.IsBooleanType();
- bool dst_is_numeric_or_bool = dst_is_int_or_float_or_bool || dst_type.IsDecimalType();
-
- llvm::BasicBlock* ret_block = llvm::BasicBlock::Create(codegen->context(), "ret", *fn);
-
- // Emit the code to compute 'result' and set the NULL indicator if needed. First check
- // for special cases where we can emit a very simple instruction sequence, then fall
- // back to the general-purpose approach of calling the cross-compiled builtin UDA.
- CodegenAnyVal& src = input_vals[0];
-
- // 'dst_slot_ptr' points to the slot in the aggregate tuple to update.
- llvm::Value* dst_slot_ptr = builder.CreateStructGEP(
- NULL, agg_tuple_arg, slot_desc->llvm_field_idx(), "dst_slot_ptr");
- // TODO: consider moving the following codegen logic to AggFn.
- if (agg_op == AggFn::COUNT) {
- src.CodegenBranchIfNull(&builder, ret_block);
- llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
- llvm::Value* result = agg_fn->is_merge() ?
- builder.CreateAdd(dst_value, src.GetVal(), "count_sum") :
- builder.CreateAdd(
- dst_value, codegen->GetI64Constant(1), "count_inc");
- builder.CreateStore(result, dst_slot_ptr);
- DCHECK(!slot_desc->is_nullable());
- } else if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && dst_is_numeric_or_bool) {
- bool is_min = agg_op == AggFn::MIN;
- src.CodegenBranchIfNull(&builder, ret_block);
- codegen->CodegenMinMax(
- &builder, slot_desc->type(), src.GetVal(), dst_slot_ptr, is_min, *fn);
-
- // Dst may have been NULL, make sure to unset the NULL bit.
- DCHECK(slot_desc->is_nullable());
- slot_desc->CodegenSetNullIndicator(
- codegen, &builder, agg_tuple_arg, codegen->false_value());
- } else if (agg_op == AggFn::SUM && dst_is_int_or_float_or_bool) {
- src.CodegenBranchIfNull(&builder, ret_block);
- llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
- llvm::Value* result = dst_type.IsFloatingPointType() ?
- builder.CreateFAdd(dst_value, src.GetVal()) :
- builder.CreateAdd(dst_value, src.GetVal());
- builder.CreateStore(result, dst_slot_ptr);
-
- if (slot_desc->is_nullable()) {
- slot_desc->CodegenSetNullIndicator(
- codegen, &builder, agg_tuple_arg, codegen->false_value());
- } else {
- // 'slot_desc' is not nullable if the aggregate function is sum_init_zero(),
- // because the slot is initialized to be zero and the null bit is nonexistent.
- DCHECK_EQ(agg_fn->fn_name(), "sum_init_zero");
- }
- } else {
- // The remaining cases are implemented using the UDA interface.
- // Create intermediate argument 'dst' from 'dst_value'
- CodegenAnyVal dst = CodegenAnyVal::GetNonNullVal(codegen, &builder, dst_type, "dst");
-
- // For a subset of builtins we generate a different code sequence that exploits two
- // properties of the builtins. First, NULL input values can be skipped. Second, the
- // value of the slot was initialized in the right way in InitAggSlots() (e.g. 0 for
- // SUM) that we get the right result if UpdateSlot() pretends that the NULL bit of
- // 'dst' is unset. Empirically this optimisation makes TPC-H Q1 5-10% faster.
- bool special_null_handling = !agg_fn->intermediate_type().IsStringType()
- && !agg_fn->intermediate_type().IsTimestampType()
- && (agg_op == AggFn::MIN || agg_op == AggFn::MAX
- || agg_op == AggFn::SUM || agg_op == AggFn::AVG
- || agg_op == AggFn::NDV);
- if (slot_desc->is_nullable()) {
- if (special_null_handling) {
- src.CodegenBranchIfNull(&builder, ret_block);
- slot_desc->CodegenSetNullIndicator(
- codegen, &builder, agg_tuple_arg, codegen->false_value());
- } else {
- dst.SetIsNull(slot_desc->CodegenIsNull(codegen, &builder, agg_tuple_arg));
- }
- }
- dst.LoadFromNativePtr(dst_slot_ptr);
-
- // Get the FunctionContext object for the AggFnEvaluator.
- llvm::Function* get_agg_fn_ctx_fn =
- codegen->GetFunction(IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, false);
- DCHECK(get_agg_fn_ctx_fn != NULL);
- llvm::Value* agg_fn_ctx_val =
- builder.CreateCall(get_agg_fn_ctx_fn, {agg_fn_eval_arg}, "agg_fn_ctx");
-
- // Call the UDA to update/merge 'src' into 'dst', with the result stored in
- // 'updated_dst_val'.
- CodegenAnyVal updated_dst_val;
- RETURN_IF_ERROR(CodegenCallUda(codegen, &builder, agg_fn, agg_fn_ctx_val,
- input_vals, dst, &updated_dst_val));
- // Copy the value back to the slot. In the FIXED_UDA_INTERMEDIATE case, the
- // UDA function writes directly to the slot so there is nothing to copy.
- if (dst_type.type != TYPE_FIXED_UDA_INTERMEDIATE) {
- updated_dst_val.StoreToNativePtr(dst_slot_ptr);
- }
-
- if (slot_desc->is_nullable() && !special_null_handling) {
- // Set NULL bit in the slot based on the return value.
- llvm::Value* result_is_null = updated_dst_val.GetIsNull("result_is_null");
- slot_desc->CodegenSetNullIndicator(
- codegen, &builder, agg_tuple_arg, result_is_null);
- }
- }
- builder.CreateBr(ret_block);
-
- builder.SetInsertPoint(ret_block);
- builder.CreateRetVoid();
-
- // Avoid producing huge UpdateTuple() function after inlining - LLVM's optimiser
- // memory/CPU usage scales super-linearly with function size.
- // E.g. compute stats on all columns of a 1000-column table previously took 4 minutes to
- // codegen because all the UpdateSlot() functions were inlined.
- if (agg_fn_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
- codegen->SetNoInline(*fn);
- }
-
- *fn = codegen->FinalizeFunction(*fn);
- if (*fn == NULL) {
- return Status("PartitionedAggregationNode::CodegenUpdateSlot(): codegen'd "
- "UpdateSlot() function failed verification, see log");
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::CodegenCallUda(LlvmCodeGen* codegen,
- LlvmBuilder* builder, AggFn* agg_fn, llvm::Value* agg_fn_ctx_val,
- const vector<CodegenAnyVal>& input_vals, const CodegenAnyVal& dst_val,
- CodegenAnyVal* updated_dst_val) {
- llvm::Function* uda_fn;
- RETURN_IF_ERROR(agg_fn->CodegenUpdateOrMergeFunction(codegen, &uda_fn));
-
- // Set up arguments for call to UDA, which are the FunctionContext*, followed by
- // pointers to all input values, followed by a pointer to the destination value.
- vector<llvm::Value*> uda_fn_args;
- uda_fn_args.push_back(agg_fn_ctx_val);
-
- // Create pointers to input args to pass to uda_fn. We must use the unlowered type,
- // e.g. IntVal, because the UDA interface expects the values to be passed as const
- // references to the classes.
- DCHECK_EQ(agg_fn->GetNumChildren(), input_vals.size());
- for (int i = 0; i < input_vals.size(); ++i) {
- uda_fn_args.push_back(input_vals[i].GetUnloweredPtr("input_unlowered_ptr"));
- }
-
- // Create pointer to dst to pass to uda_fn. We must use the unlowered type for the
- // same reason as above.
- llvm::Value* dst_lowered_ptr = dst_val.GetLoweredPtr("dst_lowered_ptr");
- const ColumnType& dst_type = agg_fn->intermediate_type();
- llvm::Type* dst_unlowered_ptr_type =
- CodegenAnyVal::GetUnloweredPtrType(codegen, dst_type);
- llvm::Value* dst_unlowered_ptr = builder->CreateBitCast(
- dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr");
- uda_fn_args.push_back(dst_unlowered_ptr);
-
- // Call 'uda_fn'
- builder->CreateCall(uda_fn, uda_fn_args);
-
- // Convert intermediate 'dst_arg' back to the native type.
- llvm::Value* anyval_result = builder->CreateLoad(dst_lowered_ptr, "anyval_result");
-
- *updated_dst_val = CodegenAnyVal(codegen, builder, dst_type, anyval_result);
- return Status::OK();
-}
-
-// IR codegen for the UpdateTuple loop. This loop is query specific and based on the
-// aggregate functions. The function signature must match the non- codegen'd UpdateTuple
-// exactly.
-// For the query:
-// select count(*), count(int_col), sum(double_col) the IR looks like:
-//
-// define void @UpdateTuple(%"class.impala::PartitionedAggregationNode"* %this_ptr,
-// %"class.impala::AggFnEvaluator"** %agg_fn_evals, %"class.impala::Tuple"* %tuple,
-// %"class.impala::TupleRow"* %row, i1 %is_merge) #33 {
-// entry:
-// %tuple1 = bitcast %"class.impala::Tuple"* %tuple to <{ i64, i64, double, i8 }>*
-// %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>,
-// <{ i64, i64, double, i8 }>* %tuple1, i32 0, i32 0
-// %count_star_val = load i64, i64* %src_slot
-// %count_star_inc = add i64 %count_star_val, 1
-// store i64 %count_star_inc, i64* %src_slot
-// %0 = getelementptr %"class.impala::AggFnEvaluator"*,
-// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1
-// %agg_fn_eval =
-// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %0
-// call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
-// %1 = getelementptr %"class.impala::AggFnEvaluator"*,
-// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2
-// %agg_fn_eval2 =
-// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %1
-// call void @UpdateSlot.2(%"class.impala::AggFnEvaluator"* %agg_fn_eval2,
-// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
-// ret void
-// }
-//
-Status PartitionedAggregationNode::CodegenUpdateTuple(
- LlvmCodeGen* codegen, llvm::Function** fn) {
- for (const SlotDescriptor* slot_desc : intermediate_tuple_desc_->slots()) {
- if (slot_desc->type().type == TYPE_CHAR) {
- return Status::Expected("PartitionedAggregationNode::CodegenUpdateTuple(): cannot "
- "codegen CHAR in aggregations");
- }
- }
-
- if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == NULL) {
- return Status::Expected("PartitionedAggregationNode::CodegenUpdateTuple(): failed to"
- " generate intermediate tuple desc");
- }
-
- // Get the types to match the UpdateTuple signature
- llvm::PointerType* agg_node_ptr_type =
- codegen->GetStructPtrType<PartitionedAggregationNode>();
- llvm::PointerType* evals_type = codegen->GetStructPtrPtrType<AggFnEvaluator>();
- llvm::PointerType* tuple_ptr_type = codegen->GetStructPtrType<Tuple>();
- llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
-
- llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
- llvm::PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct);
- LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
- prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_evals", evals_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->bool_type()));
-
- LlvmBuilder builder(codegen->context());
- llvm::Value* args[5];
- *fn = prototype.GeneratePrototype(&builder, &args[0]);
- llvm::Value* agg_fn_evals_arg = args[1];
- llvm::Value* tuple_arg = args[2];
- llvm::Value* row_arg = args[3];
-
- // Cast the parameter types to the internal llvm runtime types.
- // TODO: get rid of this by using right type in function signature
- tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple");
-
- // Loop over each expr and generate the IR for that slot. If the expr is not
- // count(*), generate a helper IR function to update the slot and call that.
- int j = grouping_exprs_.size();
- for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
- SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
- AggFn* agg_fn = agg_fns_[i];
- if (agg_fn->is_count_star()) {
- // TODO: we should be able to hoist this up to the loop over the batch and just
- // increment the slot by the number of rows in the batch.
- int field_idx = slot_desc->llvm_field_idx();
- llvm::Value* const_one = codegen->GetI64Constant(1);
- llvm::Value* slot_ptr =
- builder.CreateStructGEP(NULL, tuple_arg, field_idx, "src_slot");
- llvm::Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
- llvm::Value* count_inc =
- builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
- builder.CreateStore(count_inc, slot_ptr);
- } else {
- llvm::Function* update_slot_fn;
- RETURN_IF_ERROR(CodegenUpdateSlot(codegen, i, slot_desc, &update_slot_fn));
-
- // Load agg_fn_evals_[i]
- llvm::Value* agg_fn_eval_val =
- codegen->CodegenArrayAt(&builder, agg_fn_evals_arg, i, "agg_fn_eval");
-
- // Call UpdateSlot(agg_fn_evals_[i], tuple, row);
- llvm::Value* update_slot_args[] = {agg_fn_eval_val, tuple_arg, row_arg};
- builder.CreateCall(update_slot_fn, update_slot_args);
- }
- }
- builder.CreateRetVoid();
-
- // Avoid inlining big UpdateTuple function into outer loop - we're unlikely to get
- // any benefit from it since the function call overhead will be amortized.
- if (agg_fns_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
- codegen->SetNoInline(*fn);
- }
-
- // CodegenProcessBatch() does the final optimizations.
- *fn = codegen->FinalizeFunction(*fn);
- if (*fn == NULL) {
- return Status("PartitionedAggregationNode::CodegenUpdateTuple(): codegen'd "
- "UpdateTuple() function failed verification, see log");
- }
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::CodegenProcessBatch(LlvmCodeGen* codegen,
- TPrefetchMode::type prefetch_mode) {
- llvm::Function* update_tuple_fn;
- RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
-
- // Get the cross compiled update row batch function
- IRFunction::Type ir_fn = (!grouping_exprs_.empty() ?
- IRFunction::PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED :
- IRFunction::PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING);
- llvm::Function* process_batch_fn = codegen->GetFunction(ir_fn, true);
- DCHECK(process_batch_fn != NULL);
-
- int replaced;
- if (!grouping_exprs_.empty()) {
- // Codegen for grouping using hash table
-
- // Replace prefetch_mode with constant so branches can be optimised out.
- llvm::Value* prefetch_mode_arg = codegen->GetArgument(process_batch_fn, 3);
- prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
-
- // The codegen'd ProcessBatch function is only used in Open() with level_ = 0,
- // so don't use murmur hash
- llvm::Function* hash_fn;
- RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, /* use murmur */ false, &hash_fn));
-
- // Codegen HashTable::Equals<true>
- llvm::Function* build_equals_fn;
- RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn));
-
- // Codegen for evaluating input rows
- llvm::Function* eval_grouping_expr_fn;
- RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_grouping_expr_fn));
-
- // Replace call sites
- replaced = codegen->ReplaceCallSites(process_batch_fn, eval_grouping_expr_fn,
- "EvalProbeRow");
- DCHECK_EQ(replaced, 1);
-
- replaced = codegen->ReplaceCallSites(process_batch_fn, hash_fn, "HashRow");
- DCHECK_EQ(replaced, 1);
-
- replaced = codegen->ReplaceCallSites(process_batch_fn, build_equals_fn, "Equals");
- DCHECK_EQ(replaced, 1);
-
- HashTableCtx::HashTableReplacedConstants replaced_constants;
- const bool stores_duplicates = false;
- RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates, 1,
- process_batch_fn, &replaced_constants));
- DCHECK_GE(replaced_constants.stores_nulls, 1);
- DCHECK_GE(replaced_constants.finds_some_nulls, 1);
- DCHECK_GE(replaced_constants.stores_duplicates, 1);
- DCHECK_GE(replaced_constants.stores_tuples, 1);
- DCHECK_GE(replaced_constants.quadratic_probing, 1);
- }
-
- replaced = codegen->ReplaceCallSites(process_batch_fn, update_tuple_fn, "UpdateTuple");
- DCHECK_GE(replaced, 1);
- process_batch_fn = codegen->FinalizeFunction(process_batch_fn);
- if (process_batch_fn == NULL) {
- return Status("PartitionedAggregationNode::CodegenProcessBatch(): codegen'd "
- "ProcessBatch() function failed verification, see log");
- }
-
- void **codegened_fn_ptr = grouping_exprs_.empty() ?
- reinterpret_cast<void**>(&process_batch_no_grouping_fn_) :
- reinterpret_cast<void**>(&process_batch_fn_);
- codegen->AddFunctionToJit(process_batch_fn, codegened_fn_ptr);
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::CodegenProcessBatchStreaming(
- LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
- DCHECK(is_streaming_preagg_);
-
- IRFunction::Type ir_fn = IRFunction::PART_AGG_NODE_PROCESS_BATCH_STREAMING;
- llvm::Function* process_batch_streaming_fn = codegen->GetFunction(ir_fn, true);
- DCHECK(process_batch_streaming_fn != NULL);
-
- // Make needs_serialize arg constant so dead code can be optimised out.
- llvm::Value* needs_serialize_arg = codegen->GetArgument(process_batch_streaming_fn, 2);
- needs_serialize_arg->replaceAllUsesWith(codegen->GetBoolConstant(needs_serialize_));
-
- // Replace prefetch_mode with constant so branches can be optimised out.
- llvm::Value* prefetch_mode_arg = codegen->GetArgument(process_batch_streaming_fn, 3);
- prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
-
- llvm::Function* update_tuple_fn;
- RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
-
- // We only use the top-level hash function for streaming aggregations.
- llvm::Function* hash_fn;
- RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, false, &hash_fn));
-
- // Codegen HashTable::Equals
- llvm::Function* equals_fn;
- RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &equals_fn));
-
- // Codegen for evaluating input rows
- llvm::Function* eval_grouping_expr_fn;
- RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_grouping_expr_fn));
-
- // Replace call sites
- int replaced = codegen->ReplaceCallSites(process_batch_streaming_fn, update_tuple_fn,
- "UpdateTuple");
- DCHECK_EQ(replaced, 2);
-
- replaced = codegen->ReplaceCallSites(process_batch_streaming_fn, eval_grouping_expr_fn,
- "EvalProbeRow");
- DCHECK_EQ(replaced, 1);
-
- replaced = codegen->ReplaceCallSites(process_batch_streaming_fn, hash_fn, "HashRow");
- DCHECK_EQ(replaced, 1);
-
- replaced = codegen->ReplaceCallSites(process_batch_streaming_fn, equals_fn, "Equals");
- DCHECK_EQ(replaced, 1);
-
- HashTableCtx::HashTableReplacedConstants replaced_constants;
- const bool stores_duplicates = false;
- RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates, 1,
- process_batch_streaming_fn, &replaced_constants));
- DCHECK_GE(replaced_constants.stores_nulls, 1);
- DCHECK_GE(replaced_constants.finds_some_nulls, 1);
- DCHECK_GE(replaced_constants.stores_duplicates, 1);
- DCHECK_GE(replaced_constants.stores_tuples, 1);
- DCHECK_GE(replaced_constants.quadratic_probing, 1);
-
- DCHECK(process_batch_streaming_fn != NULL);
- process_batch_streaming_fn = codegen->FinalizeFunction(process_batch_streaming_fn);
- if (process_batch_streaming_fn == NULL) {
- return Status("PartitionedAggregationNode::CodegenProcessBatchStreaming(): codegen'd "
- "ProcessBatchStreaming() function failed verification, see log");
- }
-
- codegen->AddFunctionToJit(process_batch_streaming_fn,
- reinterpret_cast<void**>(&process_batch_streaming_fn_));
- return Status::OK();
-}
-
-// Instantiate required templates.
-template Status PartitionedAggregationNode::AppendSpilledRow<false>(
- Partition*, TupleRow*);
-template Status PartitionedAggregationNode::AppendSpilledRow<true>(Partition*, TupleRow*);
-}