You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/02 22:18:57 UTC
[5/6] impala git commit: IMPALA-110 (part 2): Refactor
PartitionedAggregationNode
IMPALA-110 (part 2): Refactor PartitionedAggregationNode
This patch refactors PartitionedAggregationNode in preparation for
supporting multiple distinct operators in a query.
The primary goal of the refactor is to separate out the core
aggregation functionality into a new type of object called an
Aggregator. For now, each aggregation ExecNode will contain a single
Aggregator. Then, future patches will extend the aggregation ExecNode
to support taking a single input and processing it with multiple
Aggregators, allowing us to support more exotic combinations of
aggregate functions and groupings.
Specifically, this patch splits PartitionedAggregationNode into five
new classes:
- Aggregator: a superclass containing the functionality that's shared
between GroupingAggregator and NonGroupingAggregator.
- GroupingAggregator: this class contains the bulk of the interesting
aggregation code, including everything related to creating and
updating partitions and hash tables, spilling, etc.
- NonGroupingAggregator: this class handles the case of aggregations
that don't have grouping exprs. Since these aggregations always
result in just a single output row, the functionality here is
relatively simple (eg. no spilling or streaming).
- StreamingAggregationNode: this node performs a streaming
preaggregation, where the input is retrieved from the child during
GetNext() and passed to the GroupingAggregator (non-grouping do not
support streaming) Eventually, we'll support a list of
GroupingAggregators.
- AggregationNode: this node performs a final aggregation, where the
input is retrieved from the child during Open() and passed to the
Aggregator. Currently the Aggregator can be either grouping or
non-grouping. Eventually we'll support a list of GroupingAggregator
and/or a single NonGroupingAggregator.
Testing:
- Passed a full exhaustive run.
Change-Id: I9e7bb583f54aa4add3738bde7f57cf3511ac567e
Reviewed-on: http://gerrit.cloudera.org:8080/10394
Reviewed-by: Thomas Marshall <th...@cmu.edu>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/010321d4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/010321d4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/010321d4
Branch: refs/heads/master
Commit: 010321d404a9ad5b7338eeb24a4e9ac576cf4dff
Parents: dde9308
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Thu May 10 19:39:57 2018 +0000
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Jul 2 21:07:35 2018 +0000
----------------------------------------------------------------------
be/src/codegen/gen_ir_descriptions.py | 14 +-
be/src/codegen/impala-ir.cc | 3 +-
be/src/exec/CMakeLists.txt | 10 +-
be/src/exec/aggregation-node.cc | 132 ++
be/src/exec/aggregation-node.h | 60 +
be/src/exec/aggregator.cc | 609 ++++++
be/src/exec/aggregator.h | 211 ++
be/src/exec/exec-node.cc | 9 +-
be/src/exec/exec-node.h | 9 +-
be/src/exec/grouping-aggregator-ir.cc | 241 +++
be/src/exec/grouping-aggregator-partition.cc | 218 +++
be/src/exec/grouping-aggregator.cc | 1098 +++++++++++
be/src/exec/grouping-aggregator.h | 624 ++++++
be/src/exec/non-grouping-aggregator-ir.cc | 30 +
be/src/exec/non-grouping-aggregator.cc | 174 ++
be/src/exec/non-grouping-aggregator.h | 111 ++
be/src/exec/partitioned-aggregation-node-ir.cc | 253 ---
be/src/exec/partitioned-aggregation-node.cc | 1955 -------------------
be/src/exec/partitioned-aggregation-node.h | 734 -------
be/src/exec/streaming-aggregation-node.cc | 153 ++
be/src/exec/streaming-aggregation-node.h | 85 +
21 files changed, 3774 insertions(+), 2959 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index dd2df9e..99a97ae 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -47,14 +47,12 @@ ir_functions = [
"_ZNK6impala14AggFnEvaluator11input_evalsEv"],
["AGG_FN_EVALUATOR_AGG_FN_CTX",
"_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv"],
- ["PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED",
- "_ZN6impala26PartitionedAggregationNode12ProcessBatchILb0EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"],
- ["PART_AGG_NODE_PROCESS_BATCH_AGGREGATED",
- "_ZN6impala26PartitionedAggregationNode12ProcessBatchILb1EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"],
- ["PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING",
- "_ZN6impala26PartitionedAggregationNode22ProcessBatchNoGroupingEPNS_8RowBatchE"],
- ["PART_AGG_NODE_PROCESS_BATCH_STREAMING",
- "_ZN6impala26PartitionedAggregationNode21ProcessBatchStreamingEbNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"],
+ ["GROUPING_AGG_ADD_BATCH_IMPL",
+ "_ZN6impala18GroupingAggregator12AddBatchImplILb0EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"],
+ ["NON_GROUPING_AGG_ADD_BATCH_IMPL",
+ "_ZN6impala21NonGroupingAggregator12AddBatchImplEPNS_8RowBatchE"],
+ ["GROUPING_AGG_ADD_BATCH_STREAMING_IMPL",
+ "_ZN6impala18GroupingAggregator21AddBatchStreamingImplEbNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"],
["AVG_UPDATE_BIGINT",
"_ZN6impala18AggregateFunctions9AvgUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE"],
["AVG_UPDATE_DOUBLE",
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 9c5b3eb..0fa4fe9 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -26,11 +26,12 @@
#pragma clang diagnostic ignored "-Wheader-hygiene"
#include "codegen/codegen-anyval-ir.cc"
+#include "exec/grouping-aggregator-ir.cc"
#include "exec/hash-table-ir.cc"
#include "exec/hdfs-avro-scanner-ir.cc"
#include "exec/hdfs-parquet-scanner-ir.cc"
#include "exec/hdfs-scanner-ir.cc"
-#include "exec/partitioned-aggregation-node-ir.cc"
+#include "exec/non-grouping-aggregator-ir.cc"
#include "exec/partitioned-hash-join-builder-ir.cc"
#include "exec/partitioned-hash-join-node-ir.cc"
#include "exec/select-node-ir.cc"
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 77c6e15..1753cb0 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -25,6 +25,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
add_library(Exec
+ aggregation-node.cc
+ aggregator.cc
analytic-eval-node.cc
base-sequence-scanner.cc
blocking-join-node.cc
@@ -38,6 +40,9 @@ add_library(Exec
exchange-node.cc
external-data-source-executor.cc
filter-context.cc
+ grouping-aggregator.cc
+ grouping-aggregator-ir.cc
+ grouping-aggregator-partition.cc
hash-table.cc
hbase-table-sink.cc
hbase-table-writer.cc
@@ -66,12 +71,12 @@ add_library(Exec
incr-stats-util.cc
nested-loop-join-builder.cc
nested-loop-join-node.cc
+ non-grouping-aggregator.cc
+ non-grouping-aggregator-ir.cc
parquet-column-readers.cc
parquet-column-stats.cc
parquet-metadata-utils.cc
partial-sort-node.cc
- partitioned-aggregation-node.cc
- partitioned-aggregation-node-ir.cc
partitioned-hash-join-builder.cc
partitioned-hash-join-builder-ir.cc
partitioned-hash-join-node.cc
@@ -91,6 +96,7 @@ add_library(Exec
select-node-ir.cc
singular-row-src-node.cc
sort-node.cc
+ streaming-aggregation-node.cc
subplan-node.cc
text-converter.cc
topn-node.cc
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
new file mode 100644
index 0000000..d25284d
--- /dev/null
+++ b/be/src/exec/aggregation-node.cc
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/aggregation-node.h"
+
+#include <sstream>
+
+#include "exec/grouping-aggregator.h"
+#include "exec/non-grouping-aggregator.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+AggregationNode::AggregationNode(
+ ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+ : ExecNode(pool, tnode, descs) {}
+
+Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+ if (tnode.agg_node.grouping_exprs.empty()) {
+ aggregator_.reset(new NonGroupingAggregator(this, pool_, tnode, state->desc_tbl()));
+ } else {
+ aggregator_.reset(new GroupingAggregator(this, pool_, tnode, state->desc_tbl()));
+ }
+ RETURN_IF_ERROR(aggregator_->Init(tnode, state));
+ runtime_profile_->AddChild(aggregator_->runtime_profile());
+ return Status::OK();
+}
+
+Status AggregationNode::Prepare(RuntimeState* state) {
+ SCOPED_TIMER(runtime_profile_->total_time_counter());
+ RETURN_IF_ERROR(ExecNode::Prepare(state));
+ aggregator_->SetDebugOptions(debug_options_);
+ RETURN_IF_ERROR(aggregator_->Prepare(state));
+ state->CheckAndAddCodegenDisabledMessage(runtime_profile());
+ return Status::OK();
+}
+
+void AggregationNode::Codegen(RuntimeState* state) {
+ DCHECK(state->ShouldCodegen());
+ ExecNode::Codegen(state);
+ if (IsNodeCodegenDisabled()) return;
+ aggregator_->Codegen(state);
+}
+
+Status AggregationNode::Open(RuntimeState* state) {
+ SCOPED_TIMER(runtime_profile_->total_time_counter());
+ // Open the child before consuming resources in this node.
+ RETURN_IF_ERROR(child(0)->Open(state));
+ RETURN_IF_ERROR(ExecNode::Open(state));
+
+ RETURN_IF_ERROR(aggregator_->Open(state));
+
+ RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+ // Read all the rows from the child and process them.
+ bool eos = false;
+ do {
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(QueryMaintenance(state));
+ RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
+ RETURN_IF_ERROR(aggregator_->AddBatch(state, &batch));
+ batch.Reset();
+ } while (!eos);
+
+ // The child can be closed at this point in most cases because we have consumed all of
+ // the input from the child and transfered ownership of the resources we need. The
+ // exception is if we are inside a subplan expecting to call Open()/GetNext() on the
+ // child again,
+ if (!IsInSubplan()) child(0)->Close(state);
+
+ RETURN_IF_ERROR(aggregator_->InputDone());
+ return Status::OK();
+}
+
+Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+ SCOPED_TIMER(runtime_profile_->total_time_counter());
+ RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(QueryMaintenance(state));
+
+ if (ReachedLimit()) {
+ *eos = true;
+ return Status::OK();
+ }
+
+ RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, eos));
+ num_rows_returned_ += row_batch->num_rows();
+ COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+ return Status::OK();
+}
+
+Status AggregationNode::Reset(RuntimeState* state) {
+ RETURN_IF_ERROR(aggregator_->Reset(state));
+ return ExecNode::Reset(state);
+}
+
+void AggregationNode::Close(RuntimeState* state) {
+ if (is_closed()) return;
+ aggregator_->Close(state);
+ ExecNode::Close(state);
+}
+
+void AggregationNode::DebugString(int indentation_level, stringstream* out) const {
+ *out << string(indentation_level * 2, ' ');
+ *out << "AggregationNode("
+ << "aggregator=" << aggregator_->DebugString();
+ ExecNode::DebugString(indentation_level, out);
+ *out << ")";
+}
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
new file mode 100644
index 0000000..527a62d
--- /dev/null
+++ b/be/src/exec/aggregation-node.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_AGGREGATION_NODE_H
+#define IMPALA_EXEC_AGGREGATION_NODE_H
+
+#include <memory>
+
+#include "exec/aggregator.h"
+#include "exec/exec-node.h"
+
+namespace impala {
+
+class RowBatch;
+class RuntimeState;
+
+/// Node for doing partitioned hash aggregation.
+/// This node consumes the input from child(0) during Open() and then passes it to the
+/// Aggregator, which does the actual work of aggregating.
+class AggregationNode : public ExecNode {
+ public:
+ AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+
+ virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Prepare(RuntimeState* state) override;
+ virtual void Codegen(RuntimeState* state) override;
+ virtual Status Open(RuntimeState* state) override;
+ virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+ virtual Status Reset(RuntimeState* state) override;
+ virtual void Close(RuntimeState* state) override;
+
+ virtual void DebugString(int indentation_level, std::stringstream* out) const override;
+
+ private:
+ /////////////////////////////////////////
+ /// BEGIN: Members that must be Reset()
+
+ /// Performs the actual work of aggregating input rows.
+ std::unique_ptr<Aggregator> aggregator_;
+
+ /// END: Members that must be Reset()
+ /////////////////////////////////////////
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_AGGREGATION_NODE_H
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
new file mode 100644
index 0000000..70178cc
--- /dev/null
+++ b/be/src/exec/aggregator.cc
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/aggregator.h"
+
+#include <sstream>
+
+#include "codegen/codegen-anyval.h"
+#include "codegen/llvm-codegen.h"
+#include "exec/exec-node.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "exprs/expr-value.h"
+#include "exprs/scalar-expr.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
+
+Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs, const std::string& name)
+ : id_(exec_node->id()),
+ pool_(pool),
+ intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
+ intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
+ output_tuple_id_(tnode.agg_node.output_tuple_id),
+ output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
+ row_desc_(*exec_node->row_desc()),
+ input_row_desc_(*exec_node->child(0)->row_desc()),
+ needs_finalize_(tnode.agg_node.need_finalize),
+ runtime_profile_(RuntimeProfile::Create(pool_, name)),
+ num_rows_returned_(0),
+ rows_returned_counter_(nullptr),
+ build_timer_(nullptr) {}
+
+Aggregator::~Aggregator() {}
+
+Status Aggregator::Init(const TPlanNode& tnode, RuntimeState* state) {
+ DCHECK(intermediate_tuple_desc_ != nullptr);
+ DCHECK(output_tuple_desc_ != nullptr);
+ DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
+
+ int j = num_grouping_exprs();
+ for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
+ SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
+ SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
+ AggFn* agg_fn;
+ RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], input_row_desc_,
+ *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
+ agg_fns_.push_back(agg_fn);
+ }
+
+ RETURN_IF_ERROR(ScalarExpr::Create(tnode.conjuncts, row_desc_, state, &conjuncts_));
+ return Status::OK();
+}
+
+Status Aggregator::Prepare(RuntimeState* state) {
+ mem_tracker_.reset(new MemTracker(
+ runtime_profile_, -1, runtime_profile_->name(), state->instance_mem_tracker()));
+ expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
+ expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+ expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+
+ RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, expr_perm_pool_.get(),
+ expr_results_pool_.get(), &agg_fn_evals_));
+ RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_,
+ expr_perm_pool_.get(), expr_results_pool_.get(), &conjunct_evals_));
+ DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
+
+ rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
+ build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
+
+ return Status::OK();
+}
+
+Status Aggregator::Open(RuntimeState* state) {
+ RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
+ RETURN_IF_ERROR(ScalarExprEvaluator::Open(conjunct_evals_, state));
+ return Status::OK();
+}
+
+void Aggregator::Close(RuntimeState* state) {
+ // Close all the agg-fn-evaluators
+ AggFnEvaluator::Close(agg_fn_evals_, state);
+ AggFn::Close(agg_fns_);
+ ScalarExprEvaluator::Close(conjunct_evals_, state);
+ ScalarExpr::Close(conjuncts_);
+
+ if (expr_perm_pool_.get() != nullptr) expr_perm_pool_->FreeAll();
+ if (expr_results_pool_.get() != nullptr) expr_results_pool_->FreeAll();
+ if (expr_mem_tracker_.get() != nullptr) expr_mem_tracker_->Close();
+ if (mem_tracker_.get() != nullptr) mem_tracker_->Close();
+}
+
+// TODO: codegen this function.
+void Aggregator::InitAggSlots(
+ const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
+ vector<SlotDescriptor*>::const_iterator slot_desc =
+ intermediate_tuple_desc_->slots().begin() + num_grouping_exprs();
+ for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) {
+ // To minimize branching on the UpdateTuple path, initialize the result value so that
+ // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for
+ // SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can
+ // just start adding to the destination value (rather than repeatedly checking the
+ // destination NULL bit. The codegen'd version of UpdateSlot() exploits this to
+ // eliminate a branch per value.
+ //
+ // For boolean and numeric types, the default values are false/0, so the nullable
+ // aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(),
+ // initialize the value to max/min possible value for the same effect.
+ AggFnEvaluator* eval = agg_fn_evals[i];
+ eval->Init(intermediate_tuple);
+
+ DCHECK(agg_fns_[i] == &(eval->agg_fn()));
+ const AggFn* agg_fn = agg_fns_[i];
+ const AggFn::AggregationOp agg_op = agg_fn->agg_op();
+ if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX)
+ && !agg_fn->intermediate_type().IsStringType()
+ && !agg_fn->intermediate_type().IsTimestampType()) {
+ ExprValue default_value;
+ void* default_value_ptr = nullptr;
+ if (agg_op == AggFn::MIN) {
+ default_value_ptr = default_value.SetToMax((*slot_desc)->type());
+ } else {
+ DCHECK_EQ(agg_op, AggFn::MAX);
+ default_value_ptr = default_value.SetToMin((*slot_desc)->type());
+ }
+ RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, nullptr);
+ }
+ }
+}
+
+void Aggregator::UpdateTuple(
+ AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, bool is_merge) noexcept {
+ DCHECK(tuple != nullptr || agg_fns_.empty());
+ for (int i = 0; i < agg_fns_.size(); ++i) {
+ if (is_merge) {
+ agg_fn_evals[i]->Merge(row->GetTuple(0), tuple);
+ } else {
+ agg_fn_evals[i]->Add(row, tuple);
+ }
+ }
+}
+
+Tuple* Aggregator::GetOutputTuple(
+ const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
+ DCHECK(tuple != nullptr || agg_fn_evals.empty()) << tuple;
+ Tuple* dst = tuple;
+ if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
+ dst = Tuple::Create(output_tuple_desc_->byte_size(), pool);
+ }
+ if (needs_finalize_) {
+ AggFnEvaluator::Finalize(agg_fn_evals, tuple, dst);
+ } else {
+ AggFnEvaluator::Serialize(agg_fn_evals, tuple);
+ }
+ // Copy grouping values from tuple to dst.
+ // TODO: Codegen this.
+ if (dst != tuple) {
+ int num_grouping_slots = num_grouping_exprs();
+ for (int i = 0; i < num_grouping_slots; ++i) {
+ SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
+ SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
+ bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
+ void* src_slot = nullptr;
+ if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
+ RawValue::Write(src_slot, dst, dst_slot_desc, nullptr);
+ }
+ }
+ return dst;
+}
+
+// IR Generation for updating a single aggregation slot. Signature is:
+// void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row)
+//
+// The IR for sum(double_col), which is constructed directly with the IRBuilder, is:
+//
+// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+// <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #33 {
+// entry:
+// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
+// @_ZNK6impala14AggFnEvaluator11input_evalsEv(
+// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+// %0 = getelementptr %"class.impala::ScalarExprEvaluator"*,
+// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
+// %input_eval = load %"class.impala::ScalarExprEvaluator"*,
+// %"class.impala::ScalarExprEvaluator"** %0
+// %input0 = call { i8, double } @GetSlotRef(%"class.impala::ScalarExprEvaluator"*
+// %input_eval, %"class.impala::TupleRow"* %row)
+// %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>,
+// <{ double, i8 }>* %agg_tuple, i32 0, i32 0
+// %dst_val = load double, double* %dst_slot_ptr
+// %1 = extractvalue { i8, double } %input0, 0
+// %is_null = trunc i8 %1 to i1
+// br i1 %is_null, label %ret, label %not_null
+//
+// ret: ; preds = %not_null, %entry
+// ret void
+//
+// not_null: ; preds = %entry
+// %val = extractvalue { i8, double } %input0, 1
+// %2 = fadd double %dst_val, %val
+// %3 = bitcast <{ double, i8 }>* %agg_tuple to i8*
+// %null_byte_ptr = getelementptr inbounds i8, i8* %3, i32 8
+// %null_byte = load i8, i8* %null_byte_ptr
+// %null_bit_cleared = and i8 %null_byte, -2
+// store i8 %null_bit_cleared, i8* %null_byte_ptr
+// store double %2, double* %dst_slot_ptr
+// br label %ret
+// }
+//
+// The IR for ndv(timestamp_col), which uses the UDA interface, is:
+//
+// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+// <{ [1024 x i8] }>* %agg_tuple,
+// %"class.impala::TupleRow"* %row) #39 {
+// entry:
+// %dst_lowered_ptr = alloca { i64, i8* }
+// %0 = alloca { i64, i64 }
+// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
+// @_ZNK6impala14AggFnEvaluator11input_evalsEv(
+// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+// %1 = getelementptr %"class.impala::ScalarExprEvaluator"*,
+// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
+// %input_eval = load %"class.impala::ScalarExprEvaluator"*,
+// %"class.impala::ScalarExprEvaluator"** %1
+// %input0 = call { i64, i64 } @GetSlotRef(
+// %"class.impala::ScalarExprEvaluator"* %input_eval,
+// %"class.impala::TupleRow"* %row)
+// %dst_slot_ptr = getelementptr inbounds <{ [1024 x i8] }>,
+// <{ [1024 x i8] }>* %agg_tuple, i32 0, i32 0
+// %2 = bitcast [1024 x i8]* %dst_slot_ptr to i8*
+// %dst = insertvalue { i64, i8* } zeroinitializer, i8* %2, 1
+// %3 = extractvalue { i64, i8* } %dst, 0
+// %4 = and i64 %3, 4294967295
+// %5 = or i64 %4, 4398046511104
+// %dst1 = insertvalue { i64, i8* } %dst, i64 %5, 0
+// %agg_fn_ctx = call %"class.impala_udf::FunctionContext"*
+// @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv(
+// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+// store { i64, i64 } %input0, { i64, i64 }* %0
+// %input_unlowered_ptr =
+// bitcast { i64, i64 }* %0 to %"struct.impala_udf::TimestampVal"*
+// store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr
+// %dst_unlowered_ptr =
+// bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
+// call void @"void impala::AggregateFunctions::HllUpdate<impala_udf::TimestampVal>"(
+// %"class.impala_udf::FunctionContext"* %agg_fn_ctx,
+// %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr,
+// %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
+// %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
+// br label %ret
+//
+// ret: ; preds = %entry
+// ret void
+// }
+//
+Status Aggregator::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
+ SlotDescriptor* slot_desc, llvm::Function** fn) {
+ llvm::PointerType* agg_fn_eval_type = codegen->GetStructPtrType<AggFnEvaluator>();
+ llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
+ if (tuple_struct == nullptr) {
+ return Status("Aggregator::CodegenUpdateSlot(): failed to generate "
+ "intermediate tuple desc");
+ }
+ llvm::PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
+ llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
+
+ LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type));
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
+
+ LlvmBuilder builder(codegen->context());
+ llvm::Value* args[3];
+ *fn = prototype.GeneratePrototype(&builder, &args[0]);
+ llvm::Value* agg_fn_eval_arg = args[0];
+ llvm::Value* agg_tuple_arg = args[1];
+ llvm::Value* row_arg = args[2];
+
+ // Get the vector of input expressions' evaluators.
+ llvm::Value* input_evals_vector = codegen->CodegenCallFunction(&builder,
+ IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg,
+ "input_evals_vector");
+
+ AggFn* agg_fn = agg_fns_[agg_fn_idx];
+ const int num_inputs = agg_fn->GetNumChildren();
+ DCHECK_GE(num_inputs, 1);
+ vector<CodegenAnyVal> input_vals;
+ for (int i = 0; i < num_inputs; ++i) {
+ ScalarExpr* input_expr = agg_fn->GetChild(i);
+ llvm::Function* input_expr_fn;
+ RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(codegen, &input_expr_fn));
+ DCHECK(input_expr_fn != nullptr);
+
+ // Call input expr function with the matching evaluator to get src slot value.
+ llvm::Value* input_eval =
+ codegen->CodegenArrayAt(&builder, input_evals_vector, i, "input_eval");
+ string input_name = Substitute("input$0", i);
+ CodegenAnyVal input_val = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
+ input_expr->type(), input_expr_fn,
+ llvm::ArrayRef<llvm::Value*>({input_eval, row_arg}), input_name.c_str());
+ input_vals.push_back(input_val);
+ }
+
+ AggFn::AggregationOp agg_op = agg_fn->agg_op();
+ const ColumnType& dst_type = agg_fn->intermediate_type();
+ bool dst_is_int_or_float_or_bool = dst_type.IsIntegerType()
+ || dst_type.IsFloatingPointType() || dst_type.IsBooleanType();
+ bool dst_is_numeric_or_bool = dst_is_int_or_float_or_bool || dst_type.IsDecimalType();
+
+ llvm::BasicBlock* ret_block = llvm::BasicBlock::Create(codegen->context(), "ret", *fn);
+
+ // Emit the code to compute 'result' and set the NULL indicator if needed. First check
+ // for special cases where we can emit a very simple instruction sequence, then fall
+ // back to the general-purpose approach of calling the cross-compiled builtin UDA.
+ CodegenAnyVal& src = input_vals[0];
+
+ // 'dst_slot_ptr' points to the slot in the aggregate tuple to update.
+ llvm::Value* dst_slot_ptr = builder.CreateStructGEP(
+ nullptr, agg_tuple_arg, slot_desc->llvm_field_idx(), "dst_slot_ptr");
+ // TODO: consider moving the following codegen logic to AggFn.
+ if (agg_op == AggFn::COUNT) {
+ src.CodegenBranchIfNull(&builder, ret_block);
+ llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
+ llvm::Value* result = agg_fn->is_merge() ?
+ builder.CreateAdd(dst_value, src.GetVal(), "count_sum") :
+ builder.CreateAdd(dst_value, codegen->GetI64Constant(1), "count_inc");
+ builder.CreateStore(result, dst_slot_ptr);
+ DCHECK(!slot_desc->is_nullable());
+ } else if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && dst_is_numeric_or_bool) {
+ bool is_min = agg_op == AggFn::MIN;
+ src.CodegenBranchIfNull(&builder, ret_block);
+ codegen->CodegenMinMax(
+ &builder, slot_desc->type(), src.GetVal(), dst_slot_ptr, is_min, *fn);
+
+ // Dst may have been NULL, make sure to unset the NULL bit.
+ DCHECK(slot_desc->is_nullable());
+ slot_desc->CodegenSetNullIndicator(
+ codegen, &builder, agg_tuple_arg, codegen->false_value());
+ } else if (agg_op == AggFn::SUM && dst_is_int_or_float_or_bool) {
+ src.CodegenBranchIfNull(&builder, ret_block);
+ llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
+ llvm::Value* result = dst_type.IsFloatingPointType() ?
+ builder.CreateFAdd(dst_value, src.GetVal()) :
+ builder.CreateAdd(dst_value, src.GetVal());
+ builder.CreateStore(result, dst_slot_ptr);
+
+ if (slot_desc->is_nullable()) {
+ slot_desc->CodegenSetNullIndicator(
+ codegen, &builder, agg_tuple_arg, codegen->false_value());
+ } else {
+ // 'slot_desc' is not nullable if the aggregate function is sum_init_zero(),
+ // because the slot is initialized to be zero and the null bit is nonexistent.
+ DCHECK_EQ(agg_fn->fn_name(), "sum_init_zero");
+ }
+ } else {
+ // The remaining cases are implemented using the UDA interface.
+ // Create intermediate argument 'dst' from 'dst_value'
+ CodegenAnyVal dst = CodegenAnyVal::GetNonNullVal(codegen, &builder, dst_type, "dst");
+
+ // For a subset of builtins we generate a different code sequence that exploits two
+ // properties of the builtins. First, NULL input values can be skipped. Second, the
+ // value of the slot was initialized in the right way in InitAggSlots() (e.g. 0 for
+ // SUM) that we get the right result if UpdateSlot() pretends that the NULL bit of
+ // 'dst' is unset. Empirically this optimisation makes TPC-H Q1 5-10% faster.
+ bool special_null_handling = !agg_fn->intermediate_type().IsStringType()
+ && !agg_fn->intermediate_type().IsTimestampType()
+ && (agg_op == AggFn::MIN || agg_op == AggFn::MAX || agg_op == AggFn::SUM
+ || agg_op == AggFn::AVG || agg_op == AggFn::NDV);
+ if (slot_desc->is_nullable()) {
+ if (special_null_handling) {
+ src.CodegenBranchIfNull(&builder, ret_block);
+ slot_desc->CodegenSetNullIndicator(
+ codegen, &builder, agg_tuple_arg, codegen->false_value());
+ } else {
+ dst.SetIsNull(slot_desc->CodegenIsNull(codegen, &builder, agg_tuple_arg));
+ }
+ }
+ dst.LoadFromNativePtr(dst_slot_ptr);
+
+ // Get the FunctionContext object for the AggFnEvaluator.
+ llvm::Function* get_agg_fn_ctx_fn =
+ codegen->GetFunction(IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, false);
+ DCHECK(get_agg_fn_ctx_fn != nullptr);
+ llvm::Value* agg_fn_ctx_val =
+ builder.CreateCall(get_agg_fn_ctx_fn, {agg_fn_eval_arg}, "agg_fn_ctx");
+
+ // Call the UDA to update/merge 'src' into 'dst', with the result stored in
+ // 'updated_dst_val'.
+ CodegenAnyVal updated_dst_val;
+ RETURN_IF_ERROR(CodegenCallUda(
+ codegen, &builder, agg_fn, agg_fn_ctx_val, input_vals, dst, &updated_dst_val));
+ // Copy the value back to the slot. In the FIXED_UDA_INTERMEDIATE case, the
+ // UDA function writes directly to the slot so there is nothing to copy.
+ if (dst_type.type != TYPE_FIXED_UDA_INTERMEDIATE) {
+ updated_dst_val.StoreToNativePtr(dst_slot_ptr);
+ }
+
+ if (slot_desc->is_nullable() && !special_null_handling) {
+ // Set NULL bit in the slot based on the return value.
+ llvm::Value* result_is_null = updated_dst_val.GetIsNull("result_is_null");
+ slot_desc->CodegenSetNullIndicator(
+ codegen, &builder, agg_tuple_arg, result_is_null);
+ }
+ }
+ builder.CreateBr(ret_block);
+
+ builder.SetInsertPoint(ret_block);
+ builder.CreateRetVoid();
+
+ // Avoid producing huge UpdateTuple() function after inlining - LLVM's optimiser
+ // memory/CPU usage scales super-linearly with function size.
+ // E.g. compute stats on all columns of a 1000-column table previously took 4 minutes to
+ // codegen because all the UpdateSlot() functions were inlined.
+ if (agg_fn_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
+ codegen->SetNoInline(*fn);
+ }
+
+ *fn = codegen->FinalizeFunction(*fn);
+ if (*fn == nullptr) {
+ return Status("Aggregator::CodegenUpdateSlot(): codegen'd "
+ "UpdateSlot() function failed verification, see log");
+ }
+ return Status::OK();
+}
+
+Status Aggregator::CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder,
+ AggFn* agg_fn, llvm::Value* agg_fn_ctx_val, const vector<CodegenAnyVal>& input_vals,
+ const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) {
+ llvm::Function* uda_fn;
+ RETURN_IF_ERROR(agg_fn->CodegenUpdateOrMergeFunction(codegen, &uda_fn));
+
+ // Set up arguments for call to UDA, which are the FunctionContext*, followed by
+ // pointers to all input values, followed by a pointer to the destination value.
+ vector<llvm::Value*> uda_fn_args;
+ uda_fn_args.push_back(agg_fn_ctx_val);
+
+ // Create pointers to input args to pass to uda_fn. We must use the unlowered type,
+ // e.g. IntVal, because the UDA interface expects the values to be passed as const
+ // references to the classes.
+ DCHECK_EQ(agg_fn->GetNumChildren(), input_vals.size());
+ for (int i = 0; i < input_vals.size(); ++i) {
+ uda_fn_args.push_back(input_vals[i].GetUnloweredPtr("input_unlowered_ptr"));
+ }
+
+ // Create pointer to dst to pass to uda_fn. We must use the unlowered type for the
+ // same reason as above.
+ llvm::Value* dst_lowered_ptr = dst_val.GetLoweredPtr("dst_lowered_ptr");
+ const ColumnType& dst_type = agg_fn->intermediate_type();
+ llvm::Type* dst_unlowered_ptr_type =
+ CodegenAnyVal::GetUnloweredPtrType(codegen, dst_type);
+ llvm::Value* dst_unlowered_ptr = builder->CreateBitCast(
+ dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr");
+ uda_fn_args.push_back(dst_unlowered_ptr);
+
+ // Call 'uda_fn'
+ builder->CreateCall(uda_fn, uda_fn_args);
+
+ // Convert intermediate 'dst_arg' back to the native type.
+ llvm::Value* anyval_result = builder->CreateLoad(dst_lowered_ptr, "anyval_result");
+
+ *updated_dst_val = CodegenAnyVal(codegen, builder, dst_type, anyval_result);
+ return Status::OK();
+}
+
+// IR codegen for the UpdateTuple loop. This loop is query specific and based on the
+// aggregate functions. The function signature must match the non- codegen'd UpdateTuple
+// exactly.
+// For the query:
+// select count(*), count(int_col), sum(double_col) the IR looks like:
+//
+// define void @UpdateTuple(%"class.impala::Aggregator"* %this_ptr,
+// %"class.impala::AggFnEvaluator"** %agg_fn_evals, %"class.impala::Tuple"* %tuple,
+// %"class.impala::TupleRow"* %row, i1 %is_merge) #33 {
+// entry:
+// %tuple1 = bitcast %"class.impala::Tuple"* %tuple to <{ i64, i64, double, i8 }>*
+// %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>,
+// <{ i64, i64, double, i8 }>* %tuple1, i32 0, i32 0
+// %count_star_val = load i64, i64* %src_slot
+// %count_star_inc = add i64 %count_star_val, 1
+// store i64 %count_star_inc, i64* %src_slot
+// %0 = getelementptr %"class.impala::AggFnEvaluator"*,
+// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1
+// %agg_fn_eval =
+// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %0
+// call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
+// %1 = getelementptr %"class.impala::AggFnEvaluator"*,
+// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2
+// %agg_fn_eval2 =
+// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %1
+// call void @UpdateSlot.2(%"class.impala::AggFnEvaluator"* %agg_fn_eval2,
+// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
+// ret void
+// }
+//
+Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) {
+ for (const SlotDescriptor* slot_desc : intermediate_tuple_desc_->slots()) {
+ if (slot_desc->type().type == TYPE_CHAR) {
+ return Status::Expected("Aggregator::CodegenUpdateTuple(): cannot "
+ "codegen CHAR in aggregations");
+ }
+ }
+
+ if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == nullptr) {
+ return Status::Expected("Aggregator::CodegenUpdateTuple(): failed to"
+ " generate intermediate tuple desc");
+ }
+
+ // Get the types to match the UpdateTuple signature
+ llvm::PointerType* agg_node_ptr_type = codegen->GetStructPtrType<Aggregator>();
+ llvm::PointerType* evals_type = codegen->GetStructPtrPtrType<AggFnEvaluator>();
+ llvm::PointerType* tuple_ptr_type = codegen->GetStructPtrType<Tuple>();
+ llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
+
+ llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
+ llvm::PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct);
+ LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_evals", evals_type));
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type));
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->bool_type()));
+
+ LlvmBuilder builder(codegen->context());
+ llvm::Value* args[5];
+ *fn = prototype.GeneratePrototype(&builder, &args[0]);
+ llvm::Value* agg_fn_evals_arg = args[1];
+ llvm::Value* tuple_arg = args[2];
+ llvm::Value* row_arg = args[3];
+
+ // Cast the parameter types to the internal llvm runtime types.
+ // TODO: get rid of this by using right type in function signature
+ tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple");
+
+ // Loop over each expr and generate the IR for that slot. If the expr is not
+ // count(*), generate a helper IR function to update the slot and call that.
+ int j = num_grouping_exprs();
+ for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
+ SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
+ AggFn* agg_fn = agg_fns_[i];
+ if (agg_fn->is_count_star()) {
+ // TODO: we should be able to hoist this up to the loop over the batch and just
+ // increment the slot by the number of rows in the batch.
+ int field_idx = slot_desc->llvm_field_idx();
+ llvm::Value* const_one = codegen->GetI64Constant(1);
+ llvm::Value* slot_ptr =
+ builder.CreateStructGEP(nullptr, tuple_arg, field_idx, "src_slot");
+ llvm::Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
+ llvm::Value* count_inc =
+ builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
+ builder.CreateStore(count_inc, slot_ptr);
+ } else {
+ llvm::Function* update_slot_fn;
+ RETURN_IF_ERROR(CodegenUpdateSlot(codegen, i, slot_desc, &update_slot_fn));
+
+ // Load agg_fn_evals_[i]
+ llvm::Value* agg_fn_eval_val =
+ codegen->CodegenArrayAt(&builder, agg_fn_evals_arg, i, "agg_fn_eval");
+
+ // Call UpdateSlot(agg_fn_evals_[i], tuple, row);
+ llvm::Value* update_slot_args[] = {agg_fn_eval_val, tuple_arg, row_arg};
+ builder.CreateCall(update_slot_fn, update_slot_args);
+ }
+ }
+ builder.CreateRetVoid();
+
+ // Avoid inlining big UpdateTuple function into outer loop - we're unlikely to get
+ // any benefit from it since the function call overhead will be amortized.
+ if (agg_fns_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
+ codegen->SetNoInline(*fn);
+ }
+
+ // CodegenProcessBatch() does the final optimizations.
+ *fn = codegen->FinalizeFunction(*fn);
+ if (*fn == nullptr) {
+ return Status("Aggregator::CodegenUpdateTuple(): codegen'd "
+ "UpdateTuple() function failed verification, see log");
+ }
+ return Status::OK();
+}
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
new file mode 100644
index 0000000..ab13d45
--- /dev/null
+++ b/be/src/exec/aggregator.h
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_AGGREGATOR_H
+#define IMPALA_EXEC_AGGREGATOR_H
+
+#include <vector>
+
+#include "common/global-types.h"
+#include "common/status.h"
+#include "gen-cpp/Types_types.h"
+#include "util/runtime-profile.h"
+
+namespace llvm {
+class Function;
+class Value;
+} // namespace llvm
+
+namespace impala {
+
+class AggFn;
+class AggFnEvaluator;
+class CodegenAnyVal;
+class DescriptorTbl;
+class ExecNode;
+class LlvmBuilder;
+class LlvmCodeGen;
+class MemPool;
+class MemTracker;
+class ObjectPool;
+class RowBatch;
+class RowDescriptor;
+class RuntimeState;
+class ScalarExpr;
+class ScalarExprEvaluator;
+class SlotDescriptor;
+class TPlanNode;
+class Tuple;
+class TupleDescriptor;
+class TupleRow;
+
+/// Base class for aggregating rows. Used in the AggregationNode and
+/// StreamingAggregationNode.
+///
+/// Rows are added by calling AddBatch(). Once all rows have been added, InputDone() must
+/// be called and the results can be fetched with GetNext().
+class Aggregator {
+ public:
+ Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs, const std::string& name);
+ virtual ~Aggregator();
+
+ /// Aggregators follow the same lifecycle as ExecNodes, except that after Open() and
+ /// before GetNext() rows should be added with AddBatch(), followed by InputDone()[
+ virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual void Codegen(RuntimeState* state) = 0;
+ virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual Status GetNext(
+ RuntimeState* state, RowBatch* row_batch, bool* eos) WARN_UNUSED_RESULT = 0;
+ virtual Status Reset(RuntimeState* state) WARN_UNUSED_RESULT = 0;
+ virtual void Close(RuntimeState* state);
+
+ /// Adds all of the rows in 'batch' to the aggregation.
+ virtual Status AddBatch(RuntimeState* state, RowBatch* batch) = 0;
+ /// Indicates that all batches have been added. Must be called before GetNext().
+ virtual Status InputDone() = 0;
+
+ virtual int num_grouping_exprs() = 0;
+ RuntimeProfile* runtime_profile() { return runtime_profile_; }
+
+ virtual void SetDebugOptions(const TDebugOptions& debug_options) = 0;
+
+ virtual std::string DebugString(int indentation_level = 0) const = 0;
+ virtual void DebugString(int indentation_level, std::stringstream* out) const = 0;
+
+ static const char* LLVM_CLASS_NAME;
+
+ protected:
+ /// The id of the ExecNode this Aggregator corresponds to.
+ int id_;
+ ObjectPool* pool_;
+
+ /// Account for peak memory used by this aggregator.
+ std::unique_ptr<MemTracker> mem_tracker_;
+
+ /// MemTracker used by 'expr_perm_pool_' and 'expr_results_pool_'.
+ std::unique_ptr<MemTracker> expr_mem_tracker_;
+
+ /// MemPool for allocations made by expression evaluators in this aggregator that are
+ /// "permanent" and live until Close() is called. Created in Prepare().
+ std::unique_ptr<MemPool> expr_perm_pool_;
+
+ /// MemPool for allocations made by expression evaluators in this aggregator that hold
+ /// intermediate or final results of expression evaluation. Should be cleared
+ /// periodically to free accumulated memory. QueryMaintenance() clears this pool, but
+ /// it may be appropriate for Aggregator implementation to clear it at other points in
+ /// execution where the memory is not needed.
+ std::unique_ptr<MemPool> expr_results_pool_;
+
+ /// Tuple into which Update()/Merge()/Serialize() results are stored.
+ TupleId intermediate_tuple_id_;
+ TupleDescriptor* intermediate_tuple_desc_;
+
+ /// Tuple into which Finalize() results are stored. Possibly the same as
+ /// the intermediate tuple.
+ TupleId output_tuple_id_;
+ TupleDescriptor* output_tuple_desc_;
+
+ /// The RowDescriptor for the exec node this aggregator corresponds to.
+ const RowDescriptor& row_desc_;
+ /// The RowDescriptor for the child of the exec node this aggregator corresponds to.
+ const RowDescriptor& input_row_desc_;
+
+ /// Certain aggregates require a finalize step, which is the final step of the
+ /// aggregate after consuming all input rows. The finalize step converts the aggregate
+ /// value into its final form. This is true if this aggregator contains aggregate that
+ /// requires a finalize step.
+ const bool needs_finalize_;
+
+ /// The list of all aggregate operations for this aggregator.
+ std::vector<AggFn*> agg_fns_;
+
+ /// Evaluators for each aggregate function. If this is a grouping aggregation, these
+ /// evaluators are only used to create cloned per-partition evaluators. The cloned
+ /// evaluators are then used to evaluate the functions. If this is a non-grouping
+ /// aggregation these evaluators are used directly to evaluate the functions.
+ ///
+ /// Permanent and result allocations for these allocators are allocated from
+ /// 'expr_perm_pool_' and 'expr_results_pool_' respectively.
+ std::vector<AggFnEvaluator*> agg_fn_evals_;
+
+ /// Conjuncts and their evaluators in this aggregator. 'conjuncts_' live in the
+ /// query-state's object pool while the evaluators live in this aggregator's
+ /// object pool.
+ std::vector<ScalarExpr*> conjuncts_;
+ std::vector<ScalarExprEvaluator*> conjunct_evals_;
+
+ /// Runtime profile for this aggregator. Owned by 'pool_'.
+ RuntimeProfile* const runtime_profile_;
+
+ int64_t num_rows_returned_;
+ RuntimeProfile::Counter* rows_returned_counter_;
+
+ /// Time spent processing the child rows
+ RuntimeProfile::Counter* build_timer_;
+
+ /// Initializes the aggregate function slots of an intermediate tuple.
+ /// Any var-len data is allocated from the FunctionContexts.
+ void InitAggSlots(
+ const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple);
+
+ /// Updates the given aggregation intermediate tuple with aggregation values computed
+ /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls Update() or
+ /// Merge() is controlled by the evaluator itself, unless enforced explicitly by passing
+ /// in is_merge == true. The override is needed to merge spilled and non-spilled rows
+ /// belonging to the same partition independent of whether the agg fn evaluators have
+ /// is_merge() == true.
+ /// This function is replaced by codegen (which is why we don't use a vector argument
+ /// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts.
+ /// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too.
+ void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row,
+ bool is_merge = false) noexcept;
+
+ /// Called on the intermediate tuple of each group after all input rows have been
+ /// consumed and aggregated. Computes the final aggregate values to be returned in
+ /// GetNext() using the agg fn evaluators' Serialize() or Finalize().
+ /// For the Finalize() case if the output tuple is different from the intermediate
+ /// tuple, then a new tuple is allocated from 'pool' to hold the final result.
+ /// Grouping values are copied into the output tuple and the the output tuple holding
+ /// the finalized/serialized aggregate values is returned.
+ /// TODO: Coordinate the allocation of new tuples with the release of memory
+ /// so as not to make memory consumption blow up.
+ Tuple* GetOutputTuple(
+ const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool);
+
+ /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx]
+ /// and returns the IR function in 'fn'. Returns non-OK status if codegen
+ /// is unsuccessful.
+ Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
+ SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT;
+
+ /// Codegen a call to a function implementing the UDA interface with input values
+ /// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate
+ /// function, and 'updated_dst_val' is set to the new value after the Update or Merge
+ /// operation is applied. The instruction sequence for the UDA call is inserted at
+ /// the insert position of 'builder'.
+ Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn,
+ llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals,
+ const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) WARN_UNUSED_RESULT;
+
+ /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
+ Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT;
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_AGGREGATOR_H
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index eeefaed..384d65d 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -29,6 +29,7 @@
#include "common/status.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
+#include "exec/aggregation-node.h"
#include "exec/analytic-eval-node.h"
#include "exec/cardinality-check-node.h"
#include "exec/data-source-scan-node.h"
@@ -42,11 +43,11 @@
#include "exec/kudu-util.h"
#include "exec/nested-loop-join-node.h"
#include "exec/partial-sort-node.h"
-#include "exec/partitioned-aggregation-node.h"
#include "exec/partitioned-hash-join-node.h"
#include "exec/select-node.h"
#include "exec/singular-row-src-node.h"
#include "exec/sort-node.h"
+#include "exec/streaming-aggregation-node.h"
#include "exec/subplan-node.h"
#include "exec/topn-node.h"
#include "exec/union-node.h"
@@ -303,7 +304,11 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
}
break;
case TPlanNodeType::AGGREGATION_NODE:
- *node = pool->Add(new PartitionedAggregationNode(pool, tnode, descs));
+ if (tnode.agg_node.use_streaming_preaggregation) {
+ *node = pool->Add(new StreamingAggregationNode(pool, tnode, descs));
+ } else {
+ *node = pool->Add(new AggregationNode(pool, tnode, descs));
+ }
break;
case TPlanNodeType::HASH_JOIN_NODE:
*node = pool->Add(new PartitionedHashJoinNode(pool, tnode, descs));
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index ad9ae10..9a87a56 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -211,6 +211,7 @@ class ExecNode {
MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
MemPool* expr_perm_pool() { return expr_perm_pool_.get(); }
MemPool* expr_results_pool() { return expr_results_pool_.get(); }
+ const TBackendResourceProfile& resource_profile() { return resource_profile_; }
bool is_closed() const { return is_closed_; }
/// Return true if codegen was disabled by the planner for this ExecNode. Does not
@@ -220,6 +221,10 @@ class ExecNode {
/// Extract node id from p->name().
static int GetNodeIdFromProfile(RuntimeProfile* p);
+ /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode.
+ /// Valid to call in or after Prepare().
+ bool IsInSubplan() const { return containing_subplan_ != NULL; }
+
/// Names of counters shared by all exec nodes
static const std::string ROW_THROUGHPUT_COUNTER;
@@ -322,10 +327,6 @@ class ExecNode {
/// Set by SubplanNode::Init(). Not owned.
SubplanNode* containing_subplan_;
- /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode.
- /// Valid to call in or after Prepare().
- bool IsInSubplan() const { return containing_subplan_ != NULL; }
-
/// If true, codegen should be disabled for this exec node.
const bool disable_codegen_;
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator-ir.cc b/be/src/exec/grouping-aggregator-ir.cc
new file mode 100644
index 0000000..d3dbf17
--- /dev/null
+++ b/be/src/exec/grouping-aggregator-ir.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/grouping-aggregator.h"
+
+#include "exec/hash-table.inline.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "runtime/row-batch.h"
+#include "runtime/tuple-row.h"
+
+using namespace impala;
+
+template <bool AGGREGATED_ROWS>
+Status GroupingAggregator::AddBatchImpl(RowBatch* batch,
+ TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) {
+ DCHECK(!hash_partitions_.empty());
+ DCHECK(!is_streaming_preagg_);
+
+ // Make sure that no resizes will happen when inserting individual rows to the hash
+ // table of each partition by pessimistically assuming that all the rows in each batch
+ // will end up to the same partition.
+ // TODO: Once we have a histogram with the number of rows per partition, we will have
+ // accurate resize calls.
+ RETURN_IF_ERROR(
+ CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx));
+
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+ const int cache_size = expr_vals_cache->capacity();
+ const int num_rows = batch->num_rows();
+ for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
+ EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, prefetch_mode, ht_ctx);
+
+ FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
+ RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
+ expr_vals_cache->NextRow();
+ }
+ DCHECK(expr_vals_cache->AtEnd());
+ }
+ return Status::OK();
+}
+
+template <bool AGGREGATED_ROWS>
+void IR_ALWAYS_INLINE GroupingAggregator::EvalAndHashPrefetchGroup(RowBatch* batch,
+ int start_row_idx, TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx) {
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+ const int cache_size = expr_vals_cache->capacity();
+
+ expr_vals_cache->Reset();
+ FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) {
+ TupleRow* row = batch_iter.Get();
+ bool is_null;
+ if (AGGREGATED_ROWS) {
+ is_null = !ht_ctx->EvalAndHashBuild(row);
+ } else {
+ is_null = !ht_ctx->EvalAndHashProbe(row);
+ }
+ // Hoist lookups out of non-null branch to speed up non-null case.
+ const uint32_t hash = expr_vals_cache->CurExprValuesHash();
+ const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+ HashTable* hash_tbl = GetHashTable(partition_idx);
+ if (is_null) {
+ expr_vals_cache->SetRowNull();
+ } else if (prefetch_mode != TPrefetchMode::NONE) {
+ if (LIKELY(hash_tbl != nullptr)) hash_tbl->PrefetchBucket<false>(hash);
+ }
+ expr_vals_cache->NextRow();
+ }
+
+ expr_vals_cache->ResetForRead();
+}
+
+template <bool AGGREGATED_ROWS>
+Status GroupingAggregator::ProcessRow(
+ TupleRow* __restrict__ row, HashTableCtx* __restrict__ ht_ctx) {
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+ // Hoist lookups out of non-null branch to speed up non-null case.
+ const uint32_t hash = expr_vals_cache->CurExprValuesHash();
+ const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+ if (expr_vals_cache->IsRowNull()) return Status::OK();
+ // To process this row, we first see if it can be aggregated or inserted into this
+ // partition's hash table. If we need to insert it and that fails, due to OOM, we
+ // spill the partition. The partition to spill is not necessarily dst_partition,
+ // so we can try again to insert the row.
+ HashTable* hash_tbl = GetHashTable(partition_idx);
+ Partition* dst_partition = hash_partitions_[partition_idx];
+ DCHECK(dst_partition != nullptr);
+ DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == nullptr);
+ if (hash_tbl == nullptr) {
+ // This partition is already spilled, just append the row.
+ return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row);
+ }
+
+ DCHECK(dst_partition->aggregated_row_stream->is_pinned());
+ bool found;
+ // Find the appropriate bucket in the hash table. There will always be a free
+ // bucket because we checked the size above.
+ HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
+ DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
+ if (AGGREGATED_ROWS) {
+ // If the row is already an aggregate row, it cannot match anything in the
+ // hash table since we process the aggregate rows first. These rows should
+ // have been aggregated in the initial pass.
+ DCHECK(!found);
+ } else if (found) {
+ // Row is already in hash table. Do the aggregation and we're done.
+ UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row);
+ return Status::OK();
+ }
+
+ // If we are seeing this result row for the first time, we need to construct the
+ // result row and initialize it.
+ return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it);
+}
+
+template <bool AGGREGATED_ROWS>
+Status GroupingAggregator::AddIntermediateTuple(Partition* __restrict__ partition,
+ TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it) {
+ while (true) {
+ DCHECK(partition->aggregated_row_stream->is_pinned());
+ Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
+ partition->aggregated_row_stream.get(), &add_batch_status_);
+
+ if (LIKELY(intermediate_tuple != nullptr)) {
+ UpdateTuple(
+ partition->agg_fn_evals.data(), intermediate_tuple, row, AGGREGATED_ROWS);
+ // After copying and initializing the tuple, insert it into the hash table.
+ insert_it.SetTuple(intermediate_tuple, hash);
+ return Status::OK();
+ } else if (!add_batch_status_.ok()) {
+ return std::move(add_batch_status_);
+ }
+
+ // We did not have enough memory to add intermediate_tuple to the stream.
+ RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
+ if (partition->is_spilled()) {
+ return AppendSpilledRow<AGGREGATED_ROWS>(partition, row);
+ }
+ }
+}
+
+Status GroupingAggregator::AddBatchStreamingImpl(bool needs_serialize,
+ TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
+ HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
+ DCHECK(is_streaming_preagg_);
+ DCHECK_EQ(out_batch->num_rows(), 0);
+ DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
+
+ RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+ const int num_rows = in_batch->num_rows();
+ const int cache_size = expr_vals_cache->capacity();
+ for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
+ EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, ht_ctx);
+
+ FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
+ // Hoist lookups out of non-null branch to speed up non-null case.
+ TupleRow* in_row = in_batch_iter.Get();
+ const uint32_t hash = expr_vals_cache->CurExprValuesHash();
+ const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+ if (!expr_vals_cache->IsRowNull()
+ && !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
+ GetHashTable(partition_idx), in_row, hash,
+ &remaining_capacity[partition_idx], &add_batch_status_)) {
+ RETURN_IF_ERROR(std::move(add_batch_status_));
+ // Tuple is not going into hash table, add it to the output batch.
+ Tuple* intermediate_tuple = ConstructIntermediateTuple(
+ agg_fn_evals_, out_batch->tuple_data_pool(), &add_batch_status_);
+ if (UNLIKELY(intermediate_tuple == nullptr)) {
+ DCHECK(!add_batch_status_.ok());
+ return std::move(add_batch_status_);
+ }
+ UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row);
+ out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
+ out_batch_iterator.Next();
+ out_batch->CommitLastRow();
+ }
+ DCHECK(add_batch_status_.ok());
+ expr_vals_cache->NextRow();
+ }
+ DCHECK(expr_vals_cache->AtEnd());
+ }
+ if (needs_serialize) {
+ FOREACH_ROW(out_batch, 0, out_batch_iter) {
+ AggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.Get()->GetTuple(0));
+ }
+ }
+
+ return Status::OK();
+}
+
+bool GroupingAggregator::TryAddToHashTable(HashTableCtx* __restrict__ ht_ctx,
+ Partition* __restrict__ partition, HashTable* __restrict__ hash_tbl,
+ TupleRow* __restrict__ in_row, uint32_t hash, int* __restrict__ remaining_capacity,
+ Status* status) {
+ DCHECK(remaining_capacity != nullptr);
+ DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
+ DCHECK_GE(*remaining_capacity, 0);
+ bool found;
+ // This is called from ProcessBatchStreaming() so the rows are not aggregated.
+ HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
+ Tuple* intermediate_tuple;
+ if (found) {
+ intermediate_tuple = it.GetTuple();
+ } else if (*remaining_capacity == 0) {
+ return false;
+ } else {
+ intermediate_tuple = ConstructIntermediateTuple(
+ partition->agg_fn_evals, partition->aggregated_row_stream.get(), status);
+ if (LIKELY(intermediate_tuple != nullptr)) {
+ it.SetTuple(intermediate_tuple, hash);
+ --(*remaining_capacity);
+ } else {
+ // Avoid repeatedly trying to add tuples when under memory pressure.
+ *remaining_capacity = 0;
+ return false;
+ }
+ }
+
+ UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row);
+ return true;
+}
+
+// Instantiate required templates.
+template Status GroupingAggregator::AddBatchImpl<false>(
+ RowBatch*, TPrefetchMode::type, HashTableCtx*);
+template Status GroupingAggregator::AddBatchImpl<true>(
+ RowBatch*, TPrefetchMode::type, HashTableCtx*);
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator-partition.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
new file mode 100644
index 0000000..8fe08f4
--- /dev/null
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/grouping-aggregator.h"
+
+#include <set>
+#include <sstream>
+
+#include "exec/exec-node.h"
+#include "exec/hash-table.inline.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-pool.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+namespace impala {
+
+GroupingAggregator::Partition::~Partition() {
+ DCHECK(is_closed);
+}
+
+Status GroupingAggregator::Partition::InitStreams() {
+ agg_fn_perm_pool.reset(new MemPool(parent->expr_mem_tracker_.get()));
+ DCHECK_EQ(agg_fn_evals.size(), 0);
+ AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_perm_pool.get(),
+ parent->expr_results_pool_.get(), parent->agg_fn_evals_, &agg_fn_evals);
+ // Varlen aggregate function results are stored outside of aggregated_row_stream because
+ // BufferedTupleStream doesn't support relocating varlen data stored in the stream.
+ auto agg_slot =
+ parent->intermediate_tuple_desc_->slots().begin() + parent->grouping_exprs_.size();
+ std::set<SlotId> external_varlen_slots;
+ for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); ++agg_slot) {
+ if ((*agg_slot)->type().IsVarLenStringType()) {
+ external_varlen_slots.insert((*agg_slot)->id());
+ }
+ }
+
+ aggregated_row_stream.reset(
+ new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
+ parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+ parent->resource_profile_.max_row_buffer_size, external_varlen_slots));
+ RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id_, true));
+ bool got_buffer;
+ RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
+ DCHECK(got_buffer) << "Buffer included in reservation " << parent->id_ << "\n"
+ << parent->buffer_pool_client()->DebugString() << "\n"
+ << parent->DebugString(2);
+ if (!parent->is_streaming_preagg_) {
+ unaggregated_row_stream.reset(
+ new BufferedTupleStream(parent->state_, &parent->input_row_desc_,
+ parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+ parent->resource_profile_.max_row_buffer_size));
+ // This stream is only used to spill, no need to ever have this pinned.
+ RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id_, false));
+ // Save memory by waiting until we spill to allocate the write buffer for the
+ // unaggregated row stream.
+ DCHECK(!unaggregated_row_stream->has_write_iterator());
+ }
+ return Status::OK();
+}
+
+Status GroupingAggregator::Partition::InitHashTable(bool* got_memory) {
+ DCHECK(aggregated_row_stream != nullptr);
+ DCHECK(hash_tbl == nullptr);
+ // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
+ // remaining bits can be used for the hash table.
+ // TODO: we could switch to 64 bit hashes and then we don't need a max size.
+ // It might be reasonable to limit individual hash table size for other reasons
+ // though. Always start with small buffers.
+ hash_tbl.reset(HashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr,
+ 1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ));
+ // Please update the error message in CreateHashPartitions() if initial size of
+ // hash table changes.
+ return hash_tbl->Init(got_memory);
+}
+
+Status GroupingAggregator::Partition::SerializeStreamForSpilling() {
+ DCHECK(!parent->is_streaming_preagg_);
+ if (parent->needs_serialize_) {
+ // We need to do a lot more work in this case. This step effectively does a merge
+ // aggregation in this node. We need to serialize the intermediates, spill the
+ // intermediates and then feed them into the aggregate function's merge step.
+ // This is often used when the intermediate is a string type, meaning the current
+ // (before serialization) in-memory layout is not the on-disk block layout.
+ // The disk layout does not support mutable rows. We need to rewrite the stream
+ // into the on disk format.
+ // TODO: if it happens to not be a string, we could serialize in place. This is
+ // a future optimization since it is very unlikely to have a serialize phase
+ // for those UDAs.
+ DCHECK(parent->serialize_stream_.get() != nullptr);
+ DCHECK(!parent->serialize_stream_->is_pinned());
+
+ // Serialize and copy the spilled partition's stream into the new stream.
+ Status status;
+ BufferedTupleStream* new_stream = parent->serialize_stream_.get();
+ HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
+ while (!it.AtEnd()) {
+ Tuple* tuple = it.GetTuple();
+ it.Next();
+ AggFnEvaluator::Serialize(agg_fn_evals, tuple);
+ if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) {
+ DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error";
+ // Even if we can't add to new_stream, finish up processing this agg stream to
+ // make clean up easier (someone has to finalize this stream and we don't want to
+ // remember where we are).
+ parent->CleanupHashTbl(agg_fn_evals, it);
+ hash_tbl->Close();
+ hash_tbl.reset();
+ aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+ return status;
+ }
+ }
+
+ aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+ aggregated_row_stream.swap(parent->serialize_stream_);
+ // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
+ // when we need to spill again. We need to have this available before we need
+ // to spill to make sure it is available. This should be acquirable since we just
+ // freed at least one buffer from this partition's (old) aggregated_row_stream.
+ parent->serialize_stream_.reset(
+ new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
+ parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+ parent->resource_profile_.max_row_buffer_size));
+ status = parent->serialize_stream_->Init(parent->id_, false);
+ if (status.ok()) {
+ bool got_buffer;
+ status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
+ DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation";
+ }
+ if (!status.ok()) {
+ hash_tbl->Close();
+ hash_tbl.reset();
+ return status;
+ }
+ DCHECK(parent->serialize_stream_->has_write_iterator());
+ }
+ return Status::OK();
+}
+
+Status GroupingAggregator::Partition::Spill(bool more_aggregate_rows) {
+ DCHECK(!parent->is_streaming_preagg_);
+ DCHECK(!is_closed);
+ DCHECK(!is_spilled());
+ RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker_.get()));
+
+ RETURN_IF_ERROR(SerializeStreamForSpilling());
+
+ // Free the in-memory result data.
+ AggFnEvaluator::Close(agg_fn_evals, parent->state_);
+ agg_fn_evals.clear();
+
+ if (agg_fn_perm_pool.get() != nullptr) {
+ agg_fn_perm_pool->FreeAll();
+ agg_fn_perm_pool.reset();
+ }
+
+ hash_tbl->Close();
+ hash_tbl.reset();
+
+ // Unpin the stream to free memory, but leave a write buffer in place so we can
+ // continue appending rows to one of the streams in the partition.
+ DCHECK(aggregated_row_stream->has_write_iterator());
+ DCHECK(!unaggregated_row_stream->has_write_iterator());
+ if (more_aggregate_rows) {
+ aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ } else {
+ aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ bool got_buffer;
+ RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
+ DCHECK(got_buffer) << "Accounted in min reservation"
+ << parent->buffer_pool_client()->DebugString();
+ }
+
+ COUNTER_ADD(parent->num_spilled_partitions_, 1);
+ if (parent->num_spilled_partitions_->value() == 1) {
+ parent->runtime_profile()->AppendExecOption("Spilled");
+ }
+ return Status::OK();
+}
+
+void GroupingAggregator::Partition::Close(bool finalize_rows) {
+ if (is_closed) return;
+ is_closed = true;
+ if (aggregated_row_stream.get() != nullptr) {
+ if (finalize_rows && hash_tbl.get() != nullptr) {
+ // We need to walk all the rows and Finalize them here so the UDA gets a chance
+ // to cleanup. If the hash table is gone (meaning this was spilled), the rows
+ // should have been finalized/serialized in Spill().
+ parent->CleanupHashTbl(agg_fn_evals, hash_tbl->Begin(parent->ht_ctx_.get()));
+ }
+ aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+ }
+ if (hash_tbl.get() != nullptr) hash_tbl->Close();
+ if (unaggregated_row_stream.get() != nullptr) {
+ unaggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+ }
+ for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
+ if (agg_fn_perm_pool.get() != nullptr) agg_fn_perm_pool->FreeAll();
+}
+} // namespace impala