You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/03/20 06:47:33 UTC
[incubator-doris] branch master updated: Revert "[CodeStyle] Del
obsolete code of partition_aggregation_node (#3154)" (#3160)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2d3dbc2 Revert "[CodeStyle] Del obsolete code of partition_aggregation_node (#3154)" (#3160)
2d3dbc2 is described below
commit 2d3dbc2c4250499b9787bcc21f74d4060b573ce5
Author: lichaoyong <li...@baidu.com>
AuthorDate: Fri Mar 20 14:47:25 2020 +0800
Revert "[CodeStyle] Del obsolete code of partition_aggregation_node (#3154)" (#3160)
This reverts commit dae013d797c1c2c9e54246d5ace4bdd90b297d43.
---
be/src/common/config.h | 3 +-
be/src/exec/CMakeLists.txt | 6 +-
be/src/exec/exec_node.cpp | 3 +
...node.cc => new_partitioned_aggregation_node.cc} | 94 +-
...n_node.h => new_partitioned_aggregation_node.h} | 14 +-
...r.cc => new_partitioned_aggregation_node_ir.cc} | 20 +-
be/src/exec/partitioned_aggregation_node.cc | 2218 ++++++++------------
be/src/exec/partitioned_aggregation_node.h | 1085 ++++------
be/src/exec/partitioned_aggregation_node_ir.cc | 300 +--
be/src/runtime/row_batch.cpp | 12 +-
10 files changed, 1523 insertions(+), 2232 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f722ff8..d1faee8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -337,7 +337,8 @@ namespace config {
// for partition
CONF_Bool(enable_partitioned_hash_join, "false")
- CONF_Bool(enable_partitioned_aggregation, "true")
+ CONF_Bool(enable_partitioned_aggregation, "false")
+ CONF_Bool(enable_new_partitioned_aggregation, "true")
// for kudu
// "The maximum size of the row batch queue, for Kudu scanners."
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 20e00ef..705c439 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -85,10 +85,12 @@ set(EXEC_FILES
schema_scanner/schema_helper.cpp
partitioned_hash_table.cc
partitioned_hash_table_ir.cc
- new_partitioned_hash_table.cc
- new_partitioned_hash_table_ir.cc
partitioned_aggregation_node.cc
partitioned_aggregation_node_ir.cc
+ new_partitioned_hash_table.cc
+ new_partitioned_hash_table_ir.cc
+ new_partitioned_aggregation_node.cc
+ new_partitioned_aggregation_node_ir.cc
local_file_writer.cpp
broker_writer.cpp
parquet_scanner.cpp
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index c236401..62df715 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -26,6 +26,7 @@
#include "exprs/expr_context.h"
#include "exec/aggregation_node.h"
#include "exec/partitioned_aggregation_node.h"
+#include "exec/new_partitioned_aggregation_node.h"
#include "exec/csv_scan_node.h"
#include "exec/es_scan_node.h"
#include "exec/es_http_scan_node.h"
@@ -382,6 +383,8 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
case TPlanNodeType::AGGREGATION_NODE:
if (config::enable_partitioned_aggregation) {
*node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
+ } else if (config::enable_new_partitioned_aggregation) {
+ *node = pool->add(new NewPartitionedAggregationNode(pool, tnode, descs));
} else {
*node = pool->add(new AggregationNode(pool, tnode, descs));
}
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/new_partitioned_aggregation_node.cc
similarity index 94%
copy from be/src/exec/partitioned_aggregation_node.cc
copy to be/src/exec/new_partitioned_aggregation_node.cc
index 4c68cdb..2468803 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/new_partitioned_aggregation_node.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/partitioned_aggregation_node.h"
+#include "exec/new_partitioned_aggregation_node.h"
#include <math.h>
#include <algorithm>
@@ -93,7 +93,7 @@ static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
static const int STREAMING_HT_MIN_REDUCTION_SIZE =
sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
-PartitionedAggregationNode::PartitionedAggregationNode(
+NewPartitionedAggregationNode::NewPartitionedAggregationNode(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
@@ -142,7 +142,7 @@ PartitionedAggregationNode::PartitionedAggregationNode(
}
}
-Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {
+Status NewPartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode));
DCHECK(intermediate_tuple_desc_ != nullptr);
DCHECK(output_tuple_desc_ != nullptr);
@@ -177,7 +177,7 @@ Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* st
return Status::OK();
}
-Status PartitionedAggregationNode::prepare(RuntimeState* state) {
+Status NewPartitionedAggregationNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
@@ -231,7 +231,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) {
return Status::OK();
}
-Status PartitionedAggregationNode::open(RuntimeState* state) {
+Status NewPartitionedAggregationNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
// Open the child before consuming resources in this node.
RETURN_IF_ERROR(child(0)->open(state));
@@ -329,7 +329,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) {
return Status::OK();
}
-Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch,
+Status NewPartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch,
bool* eos) {
int first_row_idx = row_batch->num_rows();
RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
@@ -337,7 +337,7 @@ Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_b
return Status::OK();
}
-Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
+Status NewPartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
int first_row_idx) {
if (!needs_finalize_ && !needs_serialize_) return Status::OK();
// String data returned by Serialize() or Finalize() is from local expr allocations in
@@ -362,7 +362,7 @@ Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
return Status::OK();
}
-Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc,
+Status NewPartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc,
RowBatch* row_batch, int first_row_idx, MemPool* pool) {
DCHECK(slot_desc.type().is_var_len_string_type());
DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
@@ -383,7 +383,7 @@ Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_des
return Status::OK();
}
-Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
+Status NewPartitionedAggregationNode::GetNextInternal(RuntimeState* state,
RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
@@ -419,7 +419,7 @@ Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
return Status::OK();
}
-void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
+void NewPartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
DCHECK(grouping_exprs_.empty());
int row_idx = row_batch->add_row();
TupleRow* row = row_batch->get_row(row_idx);
@@ -439,7 +439,7 @@ void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
singleton_output_tuple_ = NULL;
}
-Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
+Status NewPartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
RowBatch* row_batch) {
DCHECK(!row_batch->at_capacity());
if (output_iterator_.AtEnd()) {
@@ -496,7 +496,7 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
return Status::OK();
}
-Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
+Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
RowBatch* out_batch) {
DCHECK(!child_eos_);
DCHECK(is_streaming_preagg_);
@@ -565,7 +565,7 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
return Status::OK();
}
-bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
+bool NewPartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
int64_t ht_mem = 0;
int64_t ht_rows = 0;
for (int i = 0; i < PARTITION_FANOUT; ++i) {
@@ -615,7 +615,7 @@ bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
return current_reduction > min_reduction;
}
-void PartitionedAggregationNode::CleanupHashTbl(
+void NewPartitionedAggregationNode::CleanupHashTbl(
const vector<NewAggFnEvaluator*>& agg_fn_evals, NewPartitionedHashTable::Iterator it) {
if (!needs_finalize_ && !needs_serialize_) return;
@@ -640,7 +640,7 @@ void PartitionedAggregationNode::CleanupHashTbl(
}
}
-Status PartitionedAggregationNode::reset(RuntimeState* state) {
+Status NewPartitionedAggregationNode::reset(RuntimeState* state) {
DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
if (!grouping_exprs_.empty()) {
child_eos_ = false;
@@ -652,7 +652,7 @@ Status PartitionedAggregationNode::reset(RuntimeState* state) {
return ExecNode::reset(state);
}
-Status PartitionedAggregationNode::close(RuntimeState* state) {
+Status NewPartitionedAggregationNode::close(RuntimeState* state) {
if (is_closed()) return Status::OK();
if (!singleton_output_tuple_returned_) {
@@ -688,11 +688,11 @@ Status PartitionedAggregationNode::close(RuntimeState* state) {
return ExecNode::close(state);
}
-PartitionedAggregationNode::Partition::~Partition() {
+NewPartitionedAggregationNode::Partition::~Partition() {
DCHECK(is_closed);
}
-Status PartitionedAggregationNode::Partition::InitStreams() {
+Status NewPartitionedAggregationNode::Partition::InitStreams() {
agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
DCHECK_EQ(agg_fn_evals.size(), 0);
NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(),
@@ -735,7 +735,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
return Status::OK();
}
-Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
+Status NewPartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
DCHECK(aggregated_row_stream != nullptr);
DCHECK(hash_tbl == nullptr);
// We use the upper PARTITION_FANOUT num bits to pick the partition so only the
@@ -750,7 +750,7 @@ Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
return hash_tbl->Init(got_memory);
}
-Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
+Status NewPartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
DCHECK(!parent->is_streaming_preagg_);
if (parent->needs_serialize_) {
// We need to do a lot more work in this case. This step effectively does a merge
@@ -813,7 +813,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
return Status::OK();
}
-Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
+Status NewPartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
DCHECK(!parent->is_streaming_preagg_);
DCHECK(!is_closed);
DCHECK(!is_spilled());
@@ -858,7 +858,7 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
return Status::OK();
}
-void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
+void NewPartitionedAggregationNode::Partition::Close(bool finalize_rows) {
if (is_closed) return;
is_closed = true;
if (aggregated_row_stream.get() != NULL) {
@@ -879,7 +879,7 @@ void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
if (agg_fn_pool.get() != NULL) agg_fn_pool->free_all();
}
-Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
+Tuple* NewPartitionedAggregationNode::ConstructSingletonOutputTuple(
const vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
DCHECK(grouping_exprs_.empty());
Tuple* output_tuple = Tuple::create(intermediate_tuple_desc_->byte_size(), pool);
@@ -887,7 +887,7 @@ Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
return output_tuple;
}
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
+Tuple* NewPartitionedAggregationNode::ConstructIntermediateTuple(
const vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool, Status* status) {
const int fixed_size = intermediate_tuple_desc_->byte_size();
const int varlen_size = GroupingExprsVarlenSize();
@@ -907,7 +907,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
return intermediate_tuple;
}
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
+Tuple* NewPartitionedAggregationNode::ConstructIntermediateTuple(
const vector<NewAggFnEvaluator*>& agg_fn_evals, BufferedTupleStream3* stream,
Status* status) {
DCHECK(stream != NULL && status != NULL);
@@ -931,7 +931,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
return tuple;
}
-int PartitionedAggregationNode::GroupingExprsVarlenSize() {
+int NewPartitionedAggregationNode::GroupingExprsVarlenSize() {
int varlen_size = 0;
// TODO: The hash table could compute this as it hashes.
for (int expr_idx: string_grouping_exprs_) {
@@ -943,7 +943,7 @@ int PartitionedAggregationNode::GroupingExprsVarlenSize() {
}
// TODO: codegen this function.
-void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
+void NewPartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
uint8_t* buffer, int varlen_size) {
// Copy over all grouping slots (the variable length data is copied below).
for (int i = 0; i < grouping_exprs_.size(); ++i) {
@@ -971,7 +971,7 @@ void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
}
// TODO: codegen this function.
-void PartitionedAggregationNode::InitAggSlots(
+void NewPartitionedAggregationNode::InitAggSlots(
const vector<NewAggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
vector<SlotDescriptor*>::const_iterator slot_desc =
intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size();
@@ -1008,7 +1008,7 @@ void PartitionedAggregationNode::InitAggSlots(
}
}
-void PartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals,
+void NewPartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals,
Tuple* tuple, TupleRow* row, bool is_merge) {
DCHECK(tuple != NULL || agg_fns_.empty());
for (int i = 0; i < agg_fns_.size(); ++i) {
@@ -1020,7 +1020,7 @@ void PartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals,
}
}
-Tuple* PartitionedAggregationNode::GetOutputTuple(
+Tuple* NewPartitionedAggregationNode::GetOutputTuple(
const vector<NewAggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple;
Tuple* dst = tuple;
@@ -1049,7 +1049,7 @@ Tuple* PartitionedAggregationNode::GetOutputTuple(
}
template <bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AppendSpilledRow(
+Status NewPartitionedAggregationNode::AppendSpilledRow(
Partition* partition, TupleRow* row) {
DCHECK(!is_streaming_preagg_);
DCHECK(partition->is_spilled());
@@ -1070,16 +1070,16 @@ Status PartitionedAggregationNode::AppendSpilledRow(
}
}
-string PartitionedAggregationNode::DebugString(int indentation_level) const {
+string NewPartitionedAggregationNode::DebugString(int indentation_level) const {
stringstream ss;
DebugString(indentation_level, &ss);
return ss.str();
}
-void PartitionedAggregationNode::DebugString(int indentation_level,
+void NewPartitionedAggregationNode::DebugString(int indentation_level,
stringstream* out) const {
*out << string(indentation_level * 2, ' ');
- *out << "PartitionedAggregationNode("
+ *out << "NewPartitionedAggregationNode("
<< "intermediate_tuple_id=" << intermediate_tuple_id_
<< " output_tuple_id=" << output_tuple_id_
<< " needs_finalize=" << needs_finalize_
@@ -1089,7 +1089,7 @@ void PartitionedAggregationNode::DebugString(int indentation_level,
*out << ")";
}
-Status PartitionedAggregationNode::CreateHashPartitions(
+Status NewPartitionedAggregationNode::CreateHashPartitions(
int level, int single_partition_idx) {
if (is_streaming_preagg_) DCHECK_EQ(level, 0);
if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
@@ -1158,7 +1158,7 @@ Status PartitionedAggregationNode::CreateHashPartitions(
return Status::OK();
}
-Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
+Status NewPartitionedAggregationNode::CheckAndResizeHashPartitions(
bool partitioning_aggregated_rows, int num_rows, const NewPartitionedHashTableCtx* ht_ctx) {
DCHECK(!is_streaming_preagg_);
for (int i = 0; i < PARTITION_FANOUT; ++i) {
@@ -1177,7 +1177,7 @@ Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
return Status::OK();
}
-Status PartitionedAggregationNode::NextPartition() {
+Status NewPartitionedAggregationNode::NextPartition() {
DCHECK(output_partition_ == nullptr);
if (!is_in_subplan() && spilled_partitions_.empty()) {
@@ -1226,7 +1226,7 @@ Status PartitionedAggregationNode::NextPartition() {
return Status::OK();
}
-Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) {
+Status NewPartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) {
DCHECK(!spilled_partitions_.empty());
DCHECK(!is_streaming_preagg_);
// Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
@@ -1267,7 +1267,7 @@ Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_parti
return Status::OK();
}
-Status PartitionedAggregationNode::RepartitionSpilledPartition() {
+Status NewPartitionedAggregationNode::RepartitionSpilledPartition() {
DCHECK(!spilled_partitions_.empty());
DCHECK(!is_streaming_preagg_);
// Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
@@ -1313,7 +1313,7 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() {
}
template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) {
+Status NewPartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) {
DCHECK(!is_streaming_preagg_);
if (input_stream->num_rows() > 0) {
while (true) {
@@ -1340,7 +1340,7 @@ Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_str
return Status::OK();
}
-Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
+Status NewPartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
int64_t max_freed_mem = 0;
int partition_idx = -1;
@@ -1371,7 +1371,7 @@ Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
}
-Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
+Status NewPartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
DCHECK(!hash_partitions_.empty());
std::stringstream ss;
ss << "PA(node_id=" << id() << ") partitioned(level=" << hash_partitions_[0]->level
@@ -1415,7 +1415,7 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
return Status::OK();
}
-void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
+void NewPartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
DCHECK(partition->is_spilled());
DCHECK(partition->hash_tbl == nullptr);
// Ensure all pages in the spilled partition's streams are unpinned by invalidating
@@ -1426,7 +1426,7 @@ void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
spilled_partitions_.push_front(partition);
}
-void PartitionedAggregationNode::ClosePartitions() {
+void NewPartitionedAggregationNode::ClosePartitions() {
for (Partition* partition : hash_partitions_) {
if (partition != nullptr) partition->Close(true);
}
@@ -1439,7 +1439,7 @@ void PartitionedAggregationNode::ClosePartitions() {
partition_pool_->clear();
}
-//Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
+//Status NewPartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
// NewAggFnEvaluator::FreeLocalAllocations(agg_fn_evals_);
// for (Partition* partition : hash_partitions_) {
// if (partition != nullptr) {
@@ -1451,9 +1451,9 @@ void PartitionedAggregationNode::ClosePartitions() {
//}
// Instantiate required templates.
-template Status PartitionedAggregationNode::AppendSpilledRow<false>(
+template Status NewPartitionedAggregationNode::AppendSpilledRow<false>(
Partition*, TupleRow*);
-template Status PartitionedAggregationNode::AppendSpilledRow<true>(Partition*, TupleRow*);
+template Status NewPartitionedAggregationNode::AppendSpilledRow<true>(Partition*, TupleRow*);
}
diff --git a/be/src/exec/partitioned_aggregation_node.h b/be/src/exec/new_partitioned_aggregation_node.h
similarity index 98%
copy from be/src/exec/partitioned_aggregation_node.h
copy to be/src/exec/new_partitioned_aggregation_node.h
index 4a2f156..62a3da4 100644
--- a/be/src/exec/partitioned_aggregation_node.h
+++ b/be/src/exec/new_partitioned_aggregation_node.h
@@ -117,10 +117,10 @@ class SlotDescriptor;
/// There are so many contexts in use that a plain "ctx" variable should never be used.
/// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this.
/// TODO: support an Init() method with an initial value in the UDAF interface.
-class PartitionedAggregationNode : public ExecNode {
+class NewPartitionedAggregationNode : public ExecNode {
public:
- PartitionedAggregationNode(ObjectPool* pool,
+ NewPartitionedAggregationNode(ObjectPool* pool,
const TPlanNode& tnode, const DescriptorTbl& descs);
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
@@ -245,16 +245,16 @@ class PartitionedAggregationNode : public ExecNode {
Partition* output_partition_;
NewPartitionedHashTable::Iterator output_iterator_;
- typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, RowBatch*);
+ typedef Status (*ProcessBatchNoGroupingFn)(NewPartitionedAggregationNode*, RowBatch*);
/// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is disabled.
ProcessBatchNoGroupingFn process_batch_no_grouping_fn_;
typedef Status (*ProcessBatchFn)(
- PartitionedAggregationNode*, RowBatch*, NewPartitionedHashTableCtx*);
+ NewPartitionedAggregationNode*, RowBatch*, NewPartitionedHashTableCtx*);
/// Jitted ProcessBatch function pointer. Null if codegen is disabled.
ProcessBatchFn process_batch_fn_;
- typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool,
+ typedef Status (*ProcessBatchStreamingFn)(NewPartitionedAggregationNode*, bool,
RowBatch*, RowBatch*, NewPartitionedHashTableCtx*, int[PARTITION_FANOUT]);
/// Jitted ProcessBatchStreaming function pointer. Null if codegen is disabled.
ProcessBatchStreamingFn process_batch_streaming_fn_;
@@ -358,7 +358,7 @@ class PartitionedAggregationNode : public ExecNode {
/// initially use small buffers. Streaming pre-aggregations do not spill and do not
/// require an unaggregated stream.
struct Partition {
- Partition(PartitionedAggregationNode* parent, int level, int idx)
+ Partition(NewPartitionedAggregationNode* parent, int level, int idx)
: parent(parent), is_closed(false), level(level), idx(idx) {}
~Partition();
@@ -392,7 +392,7 @@ class PartitionedAggregationNode : public ExecNode {
bool is_spilled() const { return hash_tbl.get() == NULL; }
- PartitionedAggregationNode* parent;
+ NewPartitionedAggregationNode* parent;
/// If true, this partition is closed and there is nothing left to do.
bool is_closed;
diff --git a/be/src/exec/partitioned_aggregation_node_ir.cc b/be/src/exec/new_partitioned_aggregation_node_ir.cc
similarity index 93%
copy from be/src/exec/partitioned_aggregation_node_ir.cc
copy to be/src/exec/new_partitioned_aggregation_node_ir.cc
index ad5a52e..6674bbd 100644
--- a/be/src/exec/partitioned_aggregation_node_ir.cc
+++ b/be/src/exec/new_partitioned_aggregation_node_ir.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/partitioned_aggregation_node.h"
+#include "exec/new_partitioned_aggregation_node.h"
#include "exec/new_partitioned_hash_table.inline.h"
#include "exprs/new_agg_fn_evaluator.h"
@@ -27,7 +27,7 @@
using namespace doris;
-Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
+Status NewPartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
Tuple* output_tuple = singleton_output_tuple_;
FOREACH_ROW(batch, 0, batch_iter) {
UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.get());
@@ -36,7 +36,7 @@ Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
}
template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
+Status NewPartitionedAggregationNode::ProcessBatch(RowBatch* batch,
NewPartitionedHashTableCtx* ht_ctx) {
DCHECK(!hash_partitions_.empty());
DCHECK(!is_streaming_preagg_);
@@ -64,7 +64,7 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
}
template<bool AGGREGATED_ROWS>
-void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
+void IR_ALWAYS_INLINE NewPartitionedAggregationNode::EvalAndHashPrefetchGroup(
RowBatch* batch, int start_row_idx,
NewPartitionedHashTableCtx* ht_ctx) {
NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
@@ -95,7 +95,7 @@ void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
}
template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessRow(TupleRow* row,
+Status NewPartitionedAggregationNode::ProcessRow(TupleRow* row,
NewPartitionedHashTableCtx* ht_ctx) {
NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
// Hoist lookups out of non-null branch to speed up non-null case.
@@ -138,7 +138,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* row,
}
template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AddIntermediateTuple(Partition* partition,
+Status NewPartitionedAggregationNode::AddIntermediateTuple(Partition* partition,
TupleRow* row, uint32_t hash, NewPartitionedHashTable::Iterator insert_it) {
while (true) {
DCHECK(partition->aggregated_row_stream->is_pinned());
@@ -162,7 +162,7 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* partition,
}
}
-Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
+Status NewPartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
RowBatch* in_batch, RowBatch* out_batch,
NewPartitionedHashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
DCHECK(is_streaming_preagg_);
@@ -212,7 +212,7 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
return Status::OK();
}
-bool PartitionedAggregationNode::TryAddToHashTable(
+bool NewPartitionedAggregationNode::TryAddToHashTable(
NewPartitionedHashTableCtx* ht_ctx, Partition* partition,
NewPartitionedHashTable* hash_tbl, TupleRow* in_row,
uint32_t hash, int* remaining_capacity, Status* status) {
@@ -244,8 +244,8 @@ bool PartitionedAggregationNode::TryAddToHashTable(
}
// Instantiate required templates.
-template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
+template Status NewPartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
NewPartitionedHashTableCtx*);
-template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
+template Status NewPartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
NewPartitionedHashTableCtx*);
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc
index 4c68cdb..8d31a90 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -18,1081 +18,795 @@
#include "exec/partitioned_aggregation_node.h"
#include <math.h>
-#include <algorithm>
-#include <set>
#include <sstream>
+#include <thrift/protocol/TDebugProtocol.h>
-#include "exec/new_partitioned_hash_table.h"
-#include "exec/new_partitioned_hash_table.inline.h"
-#include "exprs/new_agg_fn_evaluator.h"
-#include "exprs/anyval_util.h"
+#include "exec/partitioned_hash_table.inline.h"
+#include "exprs/agg_fn_evaluator.h"
+#include "exprs/expr.h"
#include "exprs/expr_context.h"
-// #include "exprs/scalar_expr_evaluator.h"
#include "exprs/slot_ref.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/buffered_tuple_stream3.inline.h"
+#include "runtime/buffered_tuple_stream2.inline.h"
#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
#include "runtime/mem_pool.h"
-#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/tuple_row.h"
#include "runtime/tuple.h"
+#include "runtime/tuple_row.h"
#include "udf/udf_internal.h"
+#include "util/runtime_profile.h"
+#include "util/stack_util.h"
#include "gen_cpp/Exprs_types.h"
#include "gen_cpp/PlanNodes_types.h"
-#include "common/names.h"
-
-using namespace strings;
+using std::list;
namespace doris {
-/// The minimum reduction factor (input rows divided by output rows) to grow hash tables
-/// in a streaming preaggregation, given that the hash tables are currently the given
-/// size or above. The sizes roughly correspond to hash table sizes where the bucket
-/// arrays will fit in a cache level. Intuitively, we don't want the working set of the
-/// aggregation to expand to the next level of cache unless we're reducing the input
-/// enough to outweigh the increased memory latency we'll incur for each hash table
-/// lookup.
-///
-/// Note that the current reduction achieved is not always a good estimate of the
-/// final reduction. It may be biased either way depending on the ordering of the
-/// input. If the input order is random, we will underestimate the final reduction
-/// factor because the probability of a row having the same key as a previous row
-/// increases as more input is processed. If the input order is correlated with the
-/// key, skew may bias the estimate. If high cardinality keys appear first, we
-/// may overestimate and if low cardinality keys appear first, we underestimate.
-/// To estimate the eventual reduction achieved, we estimate the final reduction
-/// using the planner's estimated input cardinality and the assumption that input
-/// is in a random order. This means that we assume that the reduction factor will
-/// increase over time.
-struct StreamingHtMinReductionEntry {
- // Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in
- // bytes is greater than this threshold.
- int min_ht_mem;
- // The minimum reduction factor to expand the hash tables.
- double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the cache size
-// of the machine that we're running on.
-static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
- // Expand up to L2 cache always.
- {0, 0.0},
- // Expand into L3 cache if we look like we're getting some reduction.
- {256 * 1024, 1.1},
- // Expand into main memory if we're getting a significant reduction.
- {2 * 1024 * 1024, 2.0},
-};
-
-static const int STREAMING_HT_MIN_REDUCTION_SIZE =
- sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
-
PartitionedAggregationNode::PartitionedAggregationNode(
- ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : ExecNode(pool, tnode, descs),
- intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
- intermediate_tuple_desc_(descs.get_tuple_descriptor(intermediate_tuple_id_)),
- intermediate_row_desc_(intermediate_tuple_desc_, false),
- output_tuple_id_(tnode.agg_node.output_tuple_id),
- output_tuple_desc_(descs.get_tuple_descriptor(output_tuple_id_)),
- needs_finalize_(tnode.agg_node.need_finalize),
- needs_serialize_(false),
- output_partition_(NULL),
- process_batch_no_grouping_fn_(NULL),
- process_batch_fn_(NULL),
- process_batch_streaming_fn_(NULL),
- build_timer_(NULL),
- ht_resize_timer_(NULL),
- get_results_timer_(NULL),
- num_hash_buckets_(NULL),
- partitions_created_(NULL),
- max_partition_level_(NULL),
- num_row_repartitioned_(NULL),
- num_repartitions_(NULL),
- num_spilled_partitions_(NULL),
- largest_partition_percent_(NULL),
- streaming_timer_(NULL),
- num_passthrough_rows_(NULL),
- preagg_estimated_reduction_(NULL),
- preagg_streaming_ht_min_reduction_(NULL),
-// estimated_input_cardinality_(tnode.agg_node.estimated_input_cardinality),
- singleton_output_tuple_(NULL),
- singleton_output_tuple_returned_(true),
- partition_eos_(false),
- child_eos_(false),
- partition_pool_(new ObjectPool()) {
-
- DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
-
- if (tnode.agg_node.__isset.use_streaming_preaggregation) {
- is_streaming_preagg_ = tnode.agg_node.use_streaming_preaggregation;
- if (is_streaming_preagg_) {
- DCHECK(_conjunct_ctxs.empty()) << "Preaggs have no conjuncts";
- DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
- DCHECK(_limit == -1) << "Preaggs have no limits";
- }
- } else {
- is_streaming_preagg_ = false;
- }
+ ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) :
+ ExecNode(pool, tnode, descs),
+ _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
+ _intermediate_tuple_desc(NULL),
+ _output_tuple_id(tnode.agg_node.output_tuple_id),
+ _output_tuple_desc(NULL),
+ _needs_finalize(tnode.agg_node.need_finalize),
+ _needs_serialize(false),
+ _block_mgr_client(NULL),
+ _output_partition(NULL),
+ _process_row_batch_fn(NULL),
+ _build_timer(NULL),
+ _ht_resize_timer(NULL),
+ _get_results_timer(NULL),
+ _num_hash_buckets(NULL),
+ _partitions_created(NULL),
+ // _max_partition_level(NULL),
+ _num_row_repartitioned(NULL),
+ _num_repartitions(NULL),
+ _singleton_output_tuple(NULL),
+ _singleton_output_tuple_returned(true),
+ _partition_pool(new ObjectPool()) {
+ DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
}
Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::init(tnode));
- DCHECK(intermediate_tuple_desc_ != nullptr);
- DCHECK(output_tuple_desc_ != nullptr);
- DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
-
- const RowDescriptor& row_desc = child(0)->row_desc();
- RETURN_IF_ERROR(Expr::create(tnode.agg_node.grouping_exprs, row_desc,
- state, &grouping_exprs_, mem_tracker()));
- // Construct build exprs from intermediate_row_desc_
- for (int i = 0; i < grouping_exprs_.size(); ++i) {
- SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
- //DCHECK(desc->type().type == TYPE_NULL || desc->type() == grouping_exprs_[i]->type());
- // Hack to avoid TYPE_NULL SlotRefs.
- SlotRef* build_expr = _pool->add(desc->type().type != TYPE_NULL ?
- new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN));
- build_exprs_.push_back(build_expr);
- // TODO chenhao
- RETURN_IF_ERROR(build_expr->prepare(state, intermediate_row_desc_, nullptr));
- if (build_expr->type().is_var_len_string_type()) string_grouping_exprs_.push_back(i);
- }
-
- int j = grouping_exprs_.size();
- for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
- SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
- SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
- AggFn* agg_fn;
- RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], row_desc,
- *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
- agg_fns_.push_back(agg_fn);
- needs_serialize_ |= agg_fn->SupportsSerialize();
- }
- return Status::OK();
+ RETURN_IF_ERROR(ExecNode::init(tnode, state));
+ RETURN_IF_ERROR(
+ Expr::create_expr_trees(_pool, tnode.agg_node.grouping_exprs, &_probe_expr_ctxs));
+ for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
+ AggFnEvaluator* evaluator = NULL;
+ RETURN_IF_ERROR(AggFnEvaluator::create(
+ _pool, tnode.agg_node.aggregate_functions[i], &evaluator));
+ _aggregate_evaluators.push_back(evaluator);
+ }
+ return Status::OK();
}
Status PartitionedAggregationNode::prepare(RuntimeState* state) {
- SCOPED_TIMER(_runtime_profile->total_time_counter());
-
- RETURN_IF_ERROR(ExecNode::prepare(state));
- state_ = state;
-
- mem_pool_.reset(new MemPool(mem_tracker()));
- agg_fn_pool_.reset(new MemPool(expr_mem_tracker()));
-
- ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime");
- get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
- num_hash_buckets_ =
- ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
- partitions_created_ =
- ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
- largest_partition_percent_ =
- runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT);
- if (is_streaming_preagg_) {
- runtime_profile()->append_exec_option("Streaming Preaggregation");
- streaming_timer_ = ADD_TIMER(runtime_profile(), "StreamingTime");
- num_passthrough_rows_ =
- ADD_COUNTER(runtime_profile(), "RowsPassedThrough", TUnit::UNIT);
- preagg_estimated_reduction_ = ADD_COUNTER(
- runtime_profile(), "ReductionFactorEstimate", TUnit::DOUBLE_VALUE);
- preagg_streaming_ht_min_reduction_ = ADD_COUNTER(
- runtime_profile(), "ReductionFactorThresholdToExpand", TUnit::DOUBLE_VALUE);
- } else {
- build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
- num_row_repartitioned_ =
- ADD_COUNTER(runtime_profile(), "RowsRepartitioned", TUnit::UNIT);
- num_repartitions_ =
- ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
- num_spilled_partitions_ =
- ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
- max_partition_level_ = runtime_profile()->AddHighWaterMarkCounter(
- "MaxPartitionLevel", TUnit::UNIT);
- }
- // TODO chenhao
- const RowDescriptor& row_desc = child(0)->row_desc();
- RETURN_IF_ERROR(NewAggFnEvaluator::Create(agg_fns_, state, _pool, agg_fn_pool_.get(),
- &agg_fn_evals_, expr_mem_tracker(), row_desc));
-
- expr_results_pool_.reset(new MemPool(_expr_mem_tracker.get()));
- if (!grouping_exprs_.empty()) {
- RowDescriptor build_row_desc(intermediate_tuple_desc_, false);
- RETURN_IF_ERROR(NewPartitionedHashTableCtx::Create(_pool, state, build_exprs_,
- grouping_exprs_, true, vector<bool>(build_exprs_.size(), true),
- state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(),
- expr_results_pool_.get(), expr_mem_tracker(), build_row_desc, row_desc, &ht_ctx_));
- }
- // AddCodegenDisabledMessage(state);
- return Status::OK();
-}
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+
+ RETURN_IF_ERROR(ExecNode::prepare(state));
+ _state = state;
+
+ _mem_pool.reset(new MemPool(mem_tracker()));
+ _agg_fn_pool.reset(new MemPool(expr_mem_tracker()));
+
+ _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+ _ht_resize_timer = ADD_TIMER(runtime_profile(), "HTResizeTime");
+ _get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
+ _num_hash_buckets = ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
+ _partitions_created = ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
+ // _max_partition_level = runtime_profile()->AddHighWaterMarkCounter(
+ // "MaxPartitionLevel", TUnit::UNIT);
+ _num_row_repartitioned = ADD_COUNTER(
+ runtime_profile(), "RowsRepartitioned", TUnit::UNIT);
+ _num_repartitions = ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
+ _num_spilled_partitions = ADD_COUNTER(
+ runtime_profile(), "SpilledPartitions", TUnit::UNIT);
+ // _largest_partition_percent = runtime_profile()->AddHighWaterMarkCounter(
+ // "LargestPartitionPercent", TUnit::UNIT);
+
+ _intermediate_tuple_desc =
+ state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
+ _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+ DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
-Status PartitionedAggregationNode::open(RuntimeState* state) {
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- // Open the child before consuming resources in this node.
- RETURN_IF_ERROR(child(0)->open(state));
- RETURN_IF_ERROR(ExecNode::open(state));
-
- // Claim reservation after the child has been opened to reduce the peak reservation
- // requirement.
- if (!_buffer_pool_client.is_registered() && !grouping_exprs_.empty()) {
- DCHECK_GE(_resource_profile.min_reservation, MinReservation());
- RETURN_IF_ERROR(claim_buffer_reservation(state));
- }
-
- if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state));
- RETURN_IF_ERROR(NewAggFnEvaluator::Open(agg_fn_evals_, state));
- if (grouping_exprs_.empty()) {
- // Create the single output tuple for this non-grouping agg. This must happen after
- // opening the aggregate evaluators.
- singleton_output_tuple_ =
- ConstructSingletonOutputTuple(agg_fn_evals_, mem_pool_.get());
- // Check for failures during NewAggFnEvaluator::Init().
- RETURN_IF_ERROR(state_->query_status());
- singleton_output_tuple_returned_ = false;
- } else {
- if (ht_allocator_ == nullptr) {
- // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call.
- ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(),
- &_buffer_pool_client, _resource_profile.spillable_buffer_size));
-
- if (!is_streaming_preagg_ && needs_serialize_) {
- serialize_stream_.reset(new BufferedTupleStream3(state, &intermediate_row_desc_,
- &_buffer_pool_client, _resource_profile.spillable_buffer_size,
- _resource_profile.max_row_buffer_size));
- RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
- bool got_buffer;
- // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up
- // another buffer during spilling.
- RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
- DCHECK(got_buffer)
- << "Accounted in min reservation" << _buffer_pool_client.DebugString();
- DCHECK(serialize_stream_->has_write_iterator());
- }
- }
- RETURN_IF_ERROR(CreateHashPartitions(0));
- }
-
- // Streaming preaggregations do all processing in GetNext().
- if (is_streaming_preagg_) return Status::OK();
-
- RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
- // Read all the rows from the child and process them.
- bool eos = false;
- do {
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(state->check_query_state(
- "New partitioned aggregation, while getting next from child 0."));
- RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
- if (UNLIKELY(VLOG_ROW_IS_ON)) {
- for (int i = 0; i < batch.num_rows(); ++i) {
- TupleRow* row = batch.get_row(i);
- VLOG_ROW << "input row: " << row->to_string(_children[0]->row_desc());
- }
+ RETURN_IF_ERROR(
+ Expr::prepare(_probe_expr_ctxs, state, child(0)->row_desc(), expr_mem_tracker()));
+ // AddExprCtxsToFree(_probe_expr_ctxs);
+
+ _contains_var_len_grouping_exprs = false;
+ // Construct build exprs from _intermediate_agg_tuple_desc
+ for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
+ SlotDescriptor* desc = _intermediate_tuple_desc->slots()[i];
+ DCHECK(desc->type().type == TYPE_NULL ||
+ desc->type().type == _probe_expr_ctxs[i]->root()->type().type);
+ // Hack to avoid TYPE_NULL SlotRefs.
+ Expr* expr = desc->type().type != TYPE_NULL ?
+ new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN);
+ state->obj_pool()->add(expr);
+ _build_expr_ctxs.push_back(new ExprContext(expr));
+ state->obj_pool()->add(_build_expr_ctxs.back());
+ _contains_var_len_grouping_exprs |= expr->type().is_string_type();
}
-
- SCOPED_TIMER(build_timer_);
- if (grouping_exprs_.empty()) {
- if (process_batch_no_grouping_fn_ != NULL) {
- RETURN_IF_ERROR(process_batch_no_grouping_fn_(this, &batch));
- } else {
- RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch));
- }
- } else {
- // There is grouping, so we will do partitioned aggregation.
- if (process_batch_fn_ != NULL) {
- RETURN_IF_ERROR(process_batch_fn_(this, &batch, ht_ctx_.get()));
- } else {
- RETURN_IF_ERROR(ProcessBatch<false>(&batch, ht_ctx_.get()));
- }
+ // Construct a new row desc for preparing the build exprs because neither the child's
+ // nor this node's output row desc may contain the intermediate tuple, e.g.,
+ // in a single-node plan with an intermediate tuple different from the output tuple.
+ _intermediate_row_desc.reset(new RowDescriptor(_intermediate_tuple_desc, false));
+ RETURN_IF_ERROR(
+ Expr::prepare(_build_expr_ctxs, state, *_intermediate_row_desc, expr_mem_tracker()));
+ // AddExprCtxsToFree(_build_expr_ctxs);
+
+ int j = _probe_expr_ctxs.size();
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
+ // Skip non-materialized slots; we don't have evaluators instantiated for those.
+ while (!_intermediate_tuple_desc->slots()[j]->is_materialized()) {
+ DCHECK_LT(j, _intermediate_tuple_desc->slots().size() - 1)
+ << "#eval= " << _aggregate_evaluators.size()
+ << " #probe=" << _probe_expr_ctxs.size();
+ ++j;
+ }
+ // SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
+ SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
+ FunctionContext* agg_fn_ctx = NULL;
+ // RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(),
+ // intermediate_slot_desc, output_slot_desc, _agg_fn_pool.get(), &agg_fn_ctx));
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(),
+ _agg_fn_pool.get(), output_slot_desc, output_slot_desc,
+ expr_mem_tracker(), &agg_fn_ctx));
+ _agg_fn_ctxs.push_back(agg_fn_ctx);
+ state->obj_pool()->add(agg_fn_ctx);
+ _needs_serialize |= _aggregate_evaluators[i]->supports_serialize();
}
- batch.reset();
- } while (!eos);
-
- // The child can be closed at this point in most cases because we have consumed all of
- // the input from the child and transfered ownership of the resources we need. The
- // exception is if we are inside a subplan expecting to call Open()/GetNext() on the
- // child again,
- if (!is_in_subplan()) child(0)->close(state);
- child_eos_ = true;
-
- // Done consuming child(0)'s input. Move all the partitions in hash_partitions_
- // to spilled_partitions_ or aggregated_partitions_. We'll finish the processing in
- // GetNext().
- if (!grouping_exprs_.empty()) {
- RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned()));
- }
- return Status::OK();
-}
-Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch,
- bool* eos) {
- int first_row_idx = row_batch->num_rows();
- RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
- RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx));
- return Status::OK();
-}
-
-Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
- int first_row_idx) {
- if (!needs_finalize_ && !needs_serialize_) return Status::OK();
- // String data returned by Serialize() or Finalize() is from local expr allocations in
- // the agg function contexts, and will be freed on the next GetNext() call by
- // FreeLocalAllocations(). The data either needs to be copied out now or sent up the
- // plan and copied out by a blocking ancestor. (See IMPALA-3311)
- for (const AggFn* agg_fn : agg_fns_) {
- const SlotDescriptor& slot_desc = agg_fn->output_slot_desc();
- DCHECK(!slot_desc.type().is_collection_type()) << "producing collections NYI";
- if (!slot_desc.type().is_var_len_string_type()) continue;
- if (is_in_subplan()) {
- // Copy string data to the row batch's pool. This is more efficient than
- // MarkNeedsDeepCopy() in a subplan since we are likely producing many small
- // batches.
- RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch,
- first_row_idx, row_batch->tuple_data_pool()));
+ if (_probe_expr_ctxs.empty()) {
+ // Create single output tuple now; we need to output something
+ // even if our input is empty.
+ _singleton_output_tuple =
+ construct_intermediate_tuple(_agg_fn_ctxs, _mem_pool.get(), NULL, NULL);
+ // Check for failures during AggFnEvaluator::init().
+ RETURN_IF_ERROR(_state->query_status());
+ _singleton_output_tuple_returned = false;
} else {
- row_batch->mark_needs_deep_copy();
- break;
+ _ht_ctx.reset(new PartitionedHashTableCtx(_build_expr_ctxs, _probe_expr_ctxs, true, true,
+ state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1));
+ RETURN_IF_ERROR(_state->block_mgr2()->register_client(
+ min_required_buffers(), mem_tracker(), state, &_block_mgr_client));
+ RETURN_IF_ERROR(create_hash_partitions(0));
}
- }
- return Status::OK();
-}
-Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc,
- RowBatch* row_batch, int first_row_idx, MemPool* pool) {
- DCHECK(slot_desc.type().is_var_len_string_type());
- DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
- FOREACH_ROW(row_batch, first_row_idx, batch_iter) {
- Tuple* tuple = batch_iter.get()->get_tuple(0);
- StringValue* sv = reinterpret_cast<StringValue*>(
- tuple->get_slot(slot_desc.tuple_offset()));
- if (sv == NULL || sv->len == 0) continue;
- char* new_ptr = reinterpret_cast<char*>(pool->try_allocate(sv->len));
- if (UNLIKELY(new_ptr == NULL)) {
- string details = Substitute("Cannot perform aggregation at node with id $0."
- " Failed to allocate $1 output bytes.", _id, sv->len);
- return pool->mem_tracker()->MemLimitExceeded(state_, details, sv->len);
+ // TODO: Is there a need to create the stream here? If memory reservations work we may
+ // be able to create this stream lazily and only whenever we need to spill.
+ if (_needs_serialize && _block_mgr_client != NULL) {
+ _serialize_stream.reset(new BufferedTupleStream2(state, *_intermediate_row_desc,
+ state->block_mgr2(), _block_mgr_client, false /* use_initial_small_buffers */,
+ false /* read_write */));
+ RETURN_IF_ERROR(_serialize_stream->init(id(), runtime_profile(), false));
+ DCHECK(_serialize_stream->has_write_block());
}
- memcpy(new_ptr, sv->ptr, sv->len);
- sv->ptr = new_ptr;
- }
- return Status::OK();
-}
-Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
- RowBatch* row_batch, bool* eos) {
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(state->check_query_state("New partitioned aggregation, while getting next."));
- // clear tmp expr result alocations
- expr_results_pool_->clear();
-
- if (reached_limit()) {
- *eos = true;
- return Status::OK();
- }
-
- if (grouping_exprs_.empty()) {
- // There was no grouping, so evaluate the conjuncts and return the single result row.
- // We allow calling GetNext() after eos, so don't return this row again.
- if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
- singleton_output_tuple_returned_ = true;
- *eos = true;
return Status::OK();
- }
-
- if (!child_eos_) {
- // For streaming preaggregations, we process rows from the child as we go.
- DCHECK(is_streaming_preagg_);
- RETURN_IF_ERROR(GetRowsStreaming(state, row_batch));
- } else if (!partition_eos_) {
- RETURN_IF_ERROR(GetRowsFromPartition(state, row_batch));
- }
-
- *eos = partition_eos_ && child_eos_;
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- return Status::OK();
}
-void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
- DCHECK(grouping_exprs_.empty());
- int row_idx = row_batch->add_row();
- TupleRow* row = row_batch->get_row(row_idx);
- Tuple* output_tuple = GetOutputTuple(agg_fn_evals_,
- singleton_output_tuple_, row_batch->tuple_data_pool());
- row->set_tuple(0, output_tuple);
- if (ExecNode::eval_conjuncts(
- _conjunct_ctxs.data(), _conjunct_ctxs.size(), row)) {
- row_batch->commit_last_row();
- ++_num_rows_returned;
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- }
- // Keep the current chunk to amortize the memory allocation over a series
- // of Reset()/Open()/GetNext()* calls.
- row_batch->tuple_data_pool()->acquire_data(mem_pool_.get(), true);
- // This node no longer owns the memory for singleton_output_tuple_.
- singleton_output_tuple_ = NULL;
-}
+Status PartitionedAggregationNode::open(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ RETURN_IF_ERROR(ExecNode::open(state));
-Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
- RowBatch* row_batch) {
- DCHECK(!row_batch->at_capacity());
- if (output_iterator_.AtEnd()) {
- // Done with this partition, move onto the next one.
- if (output_partition_ != NULL) {
- output_partition_->Close(false);
- output_partition_ = NULL;
- }
- if (aggregated_partitions_.empty() && spilled_partitions_.empty()) {
- // No more partitions, all done.
- partition_eos_ = true;
- return Status::OK();
- }
- // Process next partition.
- RETURN_IF_ERROR(NextPartition());
- DCHECK(output_partition_ != NULL);
- }
-
- SCOPED_TIMER(get_results_timer_);
- int count = 0;
- const int N = BitUtil::next_power_of_two(state->batch_size());
- // Keeping returning rows from the current partition.
- while (!output_iterator_.AtEnd()) {
- // This loop can go on for a long time if the conjuncts are very selective. Do query
- // maintenance every N iterations.
- if ((count++ & (N - 1)) == 0) {
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(state->check_query_state(
- "New partitioned aggregation, while getting rows from partition."));
- }
+ RETURN_IF_ERROR(Expr::open(_probe_expr_ctxs, state));
+ RETURN_IF_ERROR(Expr::open(_build_expr_ctxs, state));
- int row_idx = row_batch->add_row();
- TupleRow* row = row_batch->get_row(row_idx);
- Tuple* intermediate_tuple = output_iterator_.GetTuple();
- Tuple* output_tuple = GetOutputTuple(
- output_partition_->agg_fn_evals, intermediate_tuple, row_batch->tuple_data_pool());
- output_iterator_.Next();
- row->set_tuple(0, output_tuple);
- // TODO chenhao
- // DCHECK_EQ(_conjunct_ctxs.size(), _conjuncts.size());
- if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), row)) {
- row_batch->commit_last_row();
- ++_num_rows_returned;
- if (reached_limit() || row_batch->at_capacity()) {
- break;
- }
+ DCHECK_EQ(_aggregate_evaluators.size(), _agg_fn_ctxs.size());
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state, _agg_fn_ctxs[i]));
}
- }
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- partition_eos_ = reached_limit();
- if (output_iterator_.AtEnd()) row_batch->mark_needs_deep_copy();
+ // Read all the rows from the child and process them.
+ RETURN_IF_ERROR(_children[0]->open(state));
+ RowBatch batch(_children[0]->row_desc(), state->batch_size(), mem_tracker());
+ bool eos = false;
+ do {
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while getting next from child 0."));
+ RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
+
+ if (UNLIKELY(VLOG_ROW_IS_ON)) {
+ for (int i = 0; i < batch.num_rows(); ++i) {
+ TupleRow* row = batch.get_row(i);
+ VLOG_ROW << "partition-agg-node input row: "
+ << row->to_string(_children[0]->row_desc());
+ }
+ }
- return Status::OK();
-}
+ SCOPED_TIMER(_build_timer);
+ if (_process_row_batch_fn != NULL) {
+ RETURN_IF_ERROR(_process_row_batch_fn(this, &batch, _ht_ctx.get()));
+ } else if (_probe_expr_ctxs.empty()) {
+ RETURN_IF_ERROR(process_batch_no_grouping(&batch));
+ } else {
+ // VLOG_ROW << "partition-agg-node batch: " << batch->to_string();
+ // There is grouping, so we will do partitioned aggregation.
+ RETURN_IF_ERROR(process_batch<false>(&batch, _ht_ctx.get()));
+ }
+ batch.reset();
+ } while (!eos);
-Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
- RowBatch* out_batch) {
- DCHECK(!child_eos_);
- DCHECK(is_streaming_preagg_);
+ // Unless we are inside a subplan expecting to call open()/get_next() on the child
+ // again, the child can be closed at this point. We have consumed all of the input
+ // from the child and transfered ownership of the resources we need.
+ // if (!IsInSubplan()) {
+ child(0)->close(state);
+ // }
- if (child_batch_ == NULL) {
- child_batch_.reset(new RowBatch(child(0)->row_desc(), state->batch_size(),
- mem_tracker()));
- }
+ // Done consuming child(0)'s input. Move all the partitions in _hash_partitions
+ // to _spilled_partitions/_aggregated_partitions. We'll finish the processing in
+ // get_next().
+ if (!_probe_expr_ctxs.empty()) {
+ RETURN_IF_ERROR(move_hash_partitions(child(0)->rows_returned()));
+ }
+ return Status::OK();
+}
- do {
- DCHECK_EQ(out_batch->num_rows(), 0);
+Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(state->check_query_state(
- "New partitioned aggregation, while getting rows in streaming."));
-
- RETURN_IF_ERROR(child(0)->get_next(state, child_batch_.get(), &child_eos_));
- SCOPED_TIMER(streaming_timer_);
+ RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, before evaluating conjuncts."));
- int remaining_capacity[PARTITION_FANOUT];
- bool ht_needs_expansion = false;
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- NewPartitionedHashTable* hash_tbl = GetHashTable(i);
- remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
- ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
+ if (reached_limit()) {
+ *eos = true;
+ return Status::OK();
}
- // Stop expanding hash tables if we're not reducing the input sufficiently. As our
- // hash tables expand out of each level of cache hierarchy, every hash table lookup
- // will take longer. We also may not be able to expand hash tables because of memory
- // pressure. In this case HashTable::CheckAndResize() will fail. In either case we
- // should always use the remaining space in the hash table to avoid wasting memory.
- if (ht_needs_expansion && ShouldExpandPreaggHashTables()) {
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- NewPartitionedHashTable* ht = GetHashTable(i);
- if (remaining_capacity[i] < child_batch_->num_rows()) {
- SCOPED_TIMER(ht_resize_timer_);
- bool resized;
- RETURN_IF_ERROR(
- ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get(), &resized));
- if (resized) {
- remaining_capacity[i] = ht->NumInsertsBeforeResize();
- }
+ ExprContext** ctxs = &_conjunct_ctxs[0];
+ int num_ctxs = _conjunct_ctxs.size();
+ if (_probe_expr_ctxs.empty()) {
+ // There was grouping, so evaluate the conjuncts and return the single result row.
+ // We allow calling get_next() after eos, so don't return this row again.
+ if (!_singleton_output_tuple_returned) {
+ int row_idx = row_batch->add_row();
+ TupleRow* row = row_batch->get_row(row_idx);
+ Tuple* output_tuple = get_output_tuple(
+ _agg_fn_ctxs, _singleton_output_tuple, row_batch->tuple_data_pool());
+ row->set_tuple(0, output_tuple);
+ if (ExecNode::eval_conjuncts(ctxs, num_ctxs, row)) {
+ row_batch->commit_last_row();
+ ++_num_rows_returned;
+ }
+ _singleton_output_tuple_returned = true;
}
- }
+ // Keep the current chunk to amortize the memory allocation over a series
+ // of reset()/open()/get_next()* calls.
+ row_batch->tuple_data_pool()->acquire_data(_mem_pool.get(), true);
+ *eos = true;
+ COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+ return Status::OK();
}
- if (process_batch_streaming_fn_ != NULL) {
- RETURN_IF_ERROR(process_batch_streaming_fn_(this, needs_serialize_,
- child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
- } else {
- RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_,
- child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
+ if (_output_iterator.at_end()) {
+ // Done with this partition, move onto the next one.
+ if (_output_partition != NULL) {
+ _output_partition->close(false);
+ _output_partition = NULL;
+ }
+ if (_aggregated_partitions.empty() && _spilled_partitions.empty()) {
+ // No more partitions, all done.
+ *eos = true;
+ return Status::OK();
+ }
+ // Process next partition.
+ RETURN_IF_ERROR(next_partition());
+ DCHECK(_output_partition != NULL);
}
- child_batch_->reset(); // All rows from child_batch_ were processed.
- } while (out_batch->num_rows() == 0 && !child_eos_);
-
- if (child_eos_) {
- child(0)->close(state);
- child_batch_.reset();
- RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned()));
- }
-
- _num_rows_returned += out_batch->num_rows();
- COUNTER_SET(num_passthrough_rows_, _num_rows_returned);
- return Status::OK();
-}
+ SCOPED_TIMER(_get_results_timer);
+ int count = 0;
+ const int N = BitUtil::next_power_of_two(state->batch_size());
+ // Keeping returning rows from the current partition.
+ while (!_output_iterator.at_end() && !row_batch->at_capacity()) {
+ // This loop can go on for a long time if the conjuncts are very selective. Do query
+ // maintenance every N iterations.
+ if ((count++ & (N - 1)) == 0) {
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while evaluating conjuncts."));
+ }
-bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
- int64_t ht_mem = 0;
- int64_t ht_rows = 0;
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- NewPartitionedHashTable* ht = hash_partitions_[i]->hash_tbl.get();
- ht_mem += ht->CurrentMemSize();
- ht_rows += ht->size();
- }
-
- // Need some rows in tables to have valid statistics.
- if (ht_rows == 0) return true;
-
- // Find the appropriate reduction factor in our table for the current hash table sizes.
- int cache_level = 0;
- while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
- ++cache_level;
- }
-
- // Compare the number of rows in the hash table with the number of input rows that
- // were aggregated into it. Exclude passed through rows from this calculation since
- // they were not in hash tables.
- const int64_t input_rows = _children[0]->rows_returned();
- const int64_t aggregated_input_rows = input_rows - _num_rows_returned;
- // TODO chenhao
-// const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_;
- double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows;
-
- // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be
- // inaccurate, which could lead to a divide by zero below.
- if (aggregated_input_rows <= 0) return true;
-
- // Extrapolate the current reduction factor (r) using the formula
- // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data
- // set, N is the number of input rows, excluding passed-through rows, and n is the
- // number of rows inserted or merged into the hash tables. This is a very rough
- // approximation but is good enough to be useful.
- // TODO: consider collecting more statistics to better estimate reduction.
-// double estimated_reduction = aggregated_input_rows >= expected_input_rows
-// ? current_reduction
-// : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
- STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
-// COUNTER_SET(preagg_estimated_reduction_, estimated_reduction);
- COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
-// return estimated_reduction > min_reduction;
- return current_reduction > min_reduction;
+ int row_idx = row_batch->add_row();
+ TupleRow* row = row_batch->get_row(row_idx);
+ Tuple* intermediate_tuple = _output_iterator.get_tuple();
+ Tuple* output_tuple = get_output_tuple(
+ _output_partition->agg_fn_ctxs, intermediate_tuple, row_batch->tuple_data_pool());
+ _output_iterator.next();
+ row->set_tuple(0, output_tuple);
+ if (ExecNode::eval_conjuncts(ctxs, num_ctxs, row)) {
+ row_batch->commit_last_row();
+ ++_num_rows_returned;
+ if (reached_limit()) {
+ break; // TODO: remove this check? is this expensive?
+ }
+ }
+ }
+ COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+ *eos = reached_limit();
+ if (_output_iterator.at_end()) {
+ row_batch->mark_need_to_return();
+ }
+ return Status::OK();
}
-void PartitionedAggregationNode::CleanupHashTbl(
- const vector<NewAggFnEvaluator*>& agg_fn_evals, NewPartitionedHashTable::Iterator it) {
- if (!needs_finalize_ && !needs_serialize_) return;
-
- // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
- // them in order to free any memory allocated by UDAs.
- if (needs_finalize_) {
- // Finalize() requires a dst tuple but we don't actually need the result,
- // so allocate a single dummy tuple to avoid accumulating memory.
- Tuple* dummy_dst = NULL;
- dummy_dst = Tuple::create(output_tuple_desc_->byte_size(), mem_pool_.get());
- while (!it.AtEnd()) {
- Tuple* tuple = it.GetTuple();
- NewAggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst);
- it.Next();
+void PartitionedAggregationNode::cleanup_hash_tbl(
+ const vector<FunctionContext*>& agg_fn_ctxs, PartitionedHashTable::Iterator it) {
+ if (!_needs_finalize && !_needs_serialize) {
+ return;
}
- } else {
- while (!it.AtEnd()) {
- Tuple* tuple = it.GetTuple();
- NewAggFnEvaluator::Serialize(agg_fn_evals, tuple);
- it.Next();
+
+ // Iterate through the remaining rows in the hash table and call serialize/finalize on
+ // them in order to free any memory allocated by UDAs.
+ if (_needs_finalize) {
+ // finalize() requires a dst tuple but we don't actually need the result,
+ // so allocate a single dummy tuple to avoid accumulating memory.
+ Tuple* dummy_dst = NULL;
+ dummy_dst = Tuple::create(_output_tuple_desc->byte_size(), _mem_pool.get());
+ while (!it.at_end()) {
+ Tuple* tuple = it.get_tuple();
+ AggFnEvaluator::finalize(_aggregate_evaluators, agg_fn_ctxs, tuple, dummy_dst);
+ it.next();
+ }
+ } else {
+ while (!it.at_end()) {
+ Tuple* tuple = it.get_tuple();
+ AggFnEvaluator::serialize(_aggregate_evaluators, agg_fn_ctxs, tuple);
+ it.next();
+ }
}
- }
}
Status PartitionedAggregationNode::reset(RuntimeState* state) {
- DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
- if (!grouping_exprs_.empty()) {
- child_eos_ = false;
- partition_eos_ = false;
- // Reset the HT and the partitions for this grouping agg.
- ht_ctx_->set_level(0);
- ClosePartitions();
- }
- return ExecNode::reset(state);
+ if (_probe_expr_ctxs.empty()) {
+ // Re-create the single output tuple for this non-grouping agg.
+ _singleton_output_tuple =
+ construct_intermediate_tuple(_agg_fn_ctxs, _mem_pool.get(), NULL, NULL);
+ // Check for failures during AggFnEvaluator::init().
+ RETURN_IF_ERROR(_state->query_status());
+ _singleton_output_tuple_returned = false;
+ } else {
+ // Reset the HT and the partitions for this grouping agg.
+ _ht_ctx->set_level(0);
+ close_partitions();
+ create_hash_partitions(0);
+ }
+ // return ExecNode::reset(state);
+ return Status::OK();
}
Status PartitionedAggregationNode::close(RuntimeState* state) {
- if (is_closed()) return Status::OK();
-
- if (!singleton_output_tuple_returned_) {
- GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, mem_pool_.get());
- }
-
- // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
- // them in order to free any memory allocated by UDAs
- if (output_partition_ != NULL) {
- CleanupHashTbl(output_partition_->agg_fn_evals, output_iterator_);
- output_partition_->Close(false);
- }
-
- ClosePartitions();
- child_batch_.reset();
-
- // Close all the agg-fn-evaluators
- NewAggFnEvaluator::Close(agg_fn_evals_, state);
-
- if (expr_results_pool_.get() != nullptr) {
- expr_results_pool_->free_all();
- }
- if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->free_all();
- if (mem_pool_.get() != nullptr) mem_pool_->free_all();
- if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
- ht_ctx_.reset();
- if (serialize_stream_.get() != nullptr) {
- serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- }
- Expr::close(grouping_exprs_);
- Expr::close(build_exprs_);
- AggFn::Close(agg_fns_);
- return ExecNode::close(state);
-}
+ if (is_closed()) {
+ return Status::OK();
+ }
+
+ if (!_singleton_output_tuple_returned) {
+ DCHECK_EQ(_agg_fn_ctxs.size(), _aggregate_evaluators.size());
+ get_output_tuple(_agg_fn_ctxs, _singleton_output_tuple, _mem_pool.get());
+ }
+
+ // Iterate through the remaining rows in the hash table and call serialize/finalize on
+ // them in order to free any memory allocated by UDAs
+ if (_output_partition != NULL) {
+ cleanup_hash_tbl(_output_partition->agg_fn_ctxs, _output_iterator);
+ _output_partition->close(false);
+ }
+
+ close_partitions();
+
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->close(state);
+ }
+ for (int i = 0; i < _agg_fn_ctxs.size(); ++i) {
+ _agg_fn_ctxs[i]->impl()->close();
+ }
+ if (_agg_fn_pool.get() != NULL) {
+ _agg_fn_pool->free_all();
+ }
+ if (_mem_pool.get() != NULL) {
+ _mem_pool->free_all();
+ }
+ if (_ht_ctx.get() != NULL) {
+ _ht_ctx->close();
+ }
+ if (_serialize_stream.get() != NULL) {
+ _serialize_stream->close();
+ }
+
+ if (_block_mgr_client != NULL) {
+ state->block_mgr2()->clear_reservations(_block_mgr_client);
+ }
-PartitionedAggregationNode::Partition::~Partition() {
- DCHECK(is_closed);
+ Expr::close(_probe_expr_ctxs, state);
+ Expr::close(_build_expr_ctxs, state);
+ return ExecNode::close(state);
}
-Status PartitionedAggregationNode::Partition::InitStreams() {
- agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
- DCHECK_EQ(agg_fn_evals.size(), 0);
- NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(),
- parent->agg_fn_evals_, &agg_fn_evals);
-
- // Varlen aggregate function results are stored outside of aggregated_row_stream because
- // BufferedTupleStream3 doesn't support relocating varlen data stored in the stream.
- auto agg_slot = parent->intermediate_tuple_desc_->slots().begin() +
- parent->grouping_exprs_.size();
- std::set<SlotId> external_varlen_slots;
- for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); ++agg_slot) {
- if ((*agg_slot)->type().is_var_len_string_type()) {
- external_varlen_slots.insert((*agg_slot)->id());
+Status PartitionedAggregationNode::Partition::init_streams() {
+ agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
+ DCHECK_EQ(agg_fn_ctxs.size(), 0);
+ for (int i = 0; i < parent->_agg_fn_ctxs.size(); ++i) {
+ agg_fn_ctxs.push_back(parent->_agg_fn_ctxs[i]->impl()->clone(agg_fn_pool.get()));
+ parent->_partition_pool->add(agg_fn_ctxs[i]);
}
- }
-
- aggregated_row_stream.reset(new BufferedTupleStream3(parent->state_,
- &parent->intermediate_row_desc_, &parent->_buffer_pool_client,
- parent->_resource_profile.spillable_buffer_size,
- parent->_resource_profile.max_row_buffer_size, external_varlen_slots));
- RETURN_IF_ERROR(
- aggregated_row_stream->Init(parent->id(), true));
- bool got_buffer;
- RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
- DCHECK(got_buffer) << "Buffer included in reservation " << parent->_id << "\n"
- << parent->_buffer_pool_client.DebugString() << "\n"
- << parent->DebugString(2);
-
- if (!parent->is_streaming_preagg_) {
- unaggregated_row_stream.reset(new BufferedTupleStream3(parent->state_,
- &(parent->child(0)->row_desc()), &parent->_buffer_pool_client,
- parent->_resource_profile.spillable_buffer_size,
- parent->_resource_profile.max_row_buffer_size));
+
+ aggregated_row_stream.reset(new BufferedTupleStream2(parent->_state,
+ *parent->_intermediate_row_desc, parent->_state->block_mgr2(),
+ parent->_block_mgr_client, true /* use_initial_small_buffers */,
+ false /* read_write */));
+ RETURN_IF_ERROR(aggregated_row_stream->init(parent->id(), parent->runtime_profile(), true));
+
+ unaggregated_row_stream.reset(new BufferedTupleStream2(parent->_state,
+ parent->child(0)->row_desc(), parent->_state->block_mgr2(),
+ parent->_block_mgr_client, true /* use_initial_small_buffers */,
+ false /* read_write */));
// This stream is only used to spill, no need to ever have this pinned.
- RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), false));
- // Save memory by waiting until we spill to allocate the write buffer for the
- // unaggregated row stream.
- DCHECK(!unaggregated_row_stream->has_write_iterator());
- }
- return Status::OK();
+ RETURN_IF_ERROR(unaggregated_row_stream->init(parent->id(), parent->runtime_profile(), false));
+ DCHECK(unaggregated_row_stream->has_write_block());
+ return Status::OK();
}
-Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
- DCHECK(aggregated_row_stream != nullptr);
- DCHECK(hash_tbl == nullptr);
- // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
- // remaining bits can be used for the hash table.
- // TODO: we could switch to 64 bit hashes and then we don't need a max size.
- // It might be reasonable to limit individual hash table size for other reasons
- // though. Always start with small buffers.
- hash_tbl.reset(NewPartitionedHashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr,
- 1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ));
- // Please update the error message in CreateHashPartitions() if initial size of
- // hash table changes.
- return hash_tbl->Init(got_memory);
+bool PartitionedAggregationNode::Partition::init_hash_table() {
+ DCHECK(hash_tbl.get() == NULL);
+ // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
+ // remaining bits can be used for the hash table.
+ // TODO: we could switch to 64 bit hashes and then we don't need a max size.
+ // It might be reasonable to limit individual hash table size for other reasons
+ // though. Always start with small buffers.
+ // TODO: How many buckets? We currently use a default value, 1024.
+ static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
+ hash_tbl.reset(PartitionedHashTable::create(parent->_state, parent->_block_mgr_client, 1,
+ NULL, 1 << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ));
+ return hash_tbl->init();
}
-Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
- DCHECK(!parent->is_streaming_preagg_);
- if (parent->needs_serialize_) {
- // We need to do a lot more work in this case. This step effectively does a merge
- // aggregation in this node. We need to serialize the intermediates, spill the
- // intermediates and then feed them into the aggregate function's merge step.
- // This is often used when the intermediate is a string type, meaning the current
- // (before serialization) in-memory layout is not the on-disk block layout.
- // The disk layout does not support mutable rows. We need to rewrite the stream
- // into the on disk format.
- // TODO: if it happens to not be a string, we could serialize in place. This is
- // a future optimization since it is very unlikely to have a serialize phase
- // for those UDAs.
- DCHECK(parent->serialize_stream_.get() != NULL);
- DCHECK(!parent->serialize_stream_->is_pinned());
-
- // Serialize and copy the spilled partition's stream into the new stream.
- Status status = Status::OK();
- BufferedTupleStream3* new_stream = parent->serialize_stream_.get();
- NewPartitionedHashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
- while (!it.AtEnd()) {
- Tuple* tuple = it.GetTuple();
- it.Next();
- NewAggFnEvaluator::Serialize(agg_fn_evals, tuple);
- if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) {
- DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error";
+Status PartitionedAggregationNode::Partition::clean_up() {
+ if (parent->_needs_serialize && aggregated_row_stream->num_rows() != 0) {
+ // We need to do a lot more work in this case. This step effectively does a merge
+ // aggregation in this node. We need to serialize the intermediates, spill the
+ // intermediates and then feed them into the aggregate function's merge step.
+ // This is often used when the intermediate is a string type, meaning the current
+ // (before serialization) in-memory layout is not the on-disk block layout.
+ // The disk layout does not support mutable rows. We need to rewrite the stream
+ // into the on disk format.
+ // TODO: if it happens to not be a string, we could serialize in place. This is
+ // a future optimization since it is very unlikely to have a serialize phase
+ // for those UDAs.
+ DCHECK(parent->_serialize_stream.get() != NULL);
+ DCHECK(!parent->_serialize_stream->is_pinned());
+ DCHECK(parent->_serialize_stream->has_write_block());
+
+ const vector<AggFnEvaluator*>& evaluators = parent->_aggregate_evaluators;
+
+ // serialize and copy the spilled partition's stream into the new stream.
+ Status status = Status::OK();
+ bool failed_to_add = false;
+ BufferedTupleStream2* new_stream = parent->_serialize_stream.get();
+ PartitionedHashTable::Iterator it = hash_tbl->begin(parent->_ht_ctx.get());
+ while (!it.at_end()) {
+ Tuple* tuple = it.get_tuple();
+ it.next();
+ AggFnEvaluator::serialize(evaluators, agg_fn_ctxs, tuple);
+ if (UNLIKELY(!new_stream->add_row(reinterpret_cast<TupleRow*>(&tuple), &status))) {
+ failed_to_add = true;
+ break;
+ }
+ }
+
// Even if we can't add to new_stream, finish up processing this agg stream to make
// clean up easier (someone has to finalize this stream and we don't want to remember
// where we are).
- parent->CleanupHashTbl(agg_fn_evals, it);
- hash_tbl->Close();
- hash_tbl.reset();
- aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- return status;
- }
+ if (failed_to_add) {
+ parent->cleanup_hash_tbl(agg_fn_ctxs, it);
+ hash_tbl->close();
+ hash_tbl.reset();
+ aggregated_row_stream->close();
+ RETURN_IF_ERROR(status);
+ return parent->_state->block_mgr2()->mem_limit_too_low_error(parent->_block_mgr_client,
+ parent->id());
+ }
+ DCHECK(status.ok());
+
+ aggregated_row_stream->close();
+ aggregated_row_stream.swap(parent->_serialize_stream);
+ // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
+ // when we need to spill again. We need to have this available before we need
+ // to spill to make sure it is available. This should be acquirable since we just
+ // freed at least one buffer from this partition's (old) aggregated_row_stream.
+ parent->_serialize_stream.reset(new BufferedTupleStream2(parent->_state,
+ *parent->_intermediate_row_desc, parent->_state->block_mgr2(),
+ parent->_block_mgr_client, false /* use_initial_small_buffers */,
+ false /* read_write */));
+ status = parent->_serialize_stream->init(parent->id(), parent->runtime_profile(), false);
+ if (!status.ok()) {
+ hash_tbl->close();
+ hash_tbl.reset();
+ return status;
+ }
+ DCHECK(parent->_serialize_stream->has_write_block());
}
+ return Status::OK();
+}
+
+Status PartitionedAggregationNode::Partition::spill() {
+ DCHECK(!is_closed);
+ DCHECK(!is_spilled());
- aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- aggregated_row_stream.swap(parent->serialize_stream_);
- // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
- // when we need to spill again. We need to have this available before we need
- // to spill to make sure it is available. This should be acquirable since we just
- // freed at least one buffer from this partition's (old) aggregated_row_stream.
- parent->serialize_stream_.reset(new BufferedTupleStream3(parent->state_,
- &parent->intermediate_row_desc_, &parent->_buffer_pool_client,
- parent->_resource_profile.spillable_buffer_size,
- parent->_resource_profile.max_row_buffer_size));
- status = parent->serialize_stream_->Init(parent->id(), false);
- if (status.ok()) {
- bool got_buffer;
- status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
- DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation";
+ RETURN_IF_ERROR(clean_up());
+
+ // Free the in-memory result data.
+ for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
+ agg_fn_ctxs[i]->impl()->close();
}
- if (!status.ok()) {
- hash_tbl->Close();
- hash_tbl.reset();
- return status;
+
+ if (agg_fn_pool.get() != NULL) {
+ agg_fn_pool->free_all();
+ agg_fn_pool.reset();
}
- DCHECK(parent->serialize_stream_->has_write_iterator());
- }
- return Status::OK();
-}
-Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
- DCHECK(!parent->is_streaming_preagg_);
- DCHECK(!is_closed);
- DCHECK(!is_spilled());
- // TODO(ml): enable spill
- std::stringstream msg;
- msg << "New partitioned Aggregation in spill";
- LIMIT_EXCEEDED(parent->mem_tracker(), parent->state_, msg.str());
- // RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker()));
-
- RETURN_IF_ERROR(SerializeStreamForSpilling());
-
- // Free the in-memory result data.
- NewAggFnEvaluator::Close(agg_fn_evals, parent->state_);
- agg_fn_evals.clear();
-
- if (agg_fn_pool.get() != NULL) {
- agg_fn_pool->free_all();
- agg_fn_pool.reset();
- }
-
- hash_tbl->Close();
- hash_tbl.reset();
-
- // Unpin the stream to free memory, but leave a write buffer in place so we can
- // continue appending rows to one of the streams in the partition.
- DCHECK(aggregated_row_stream->has_write_iterator());
- DCHECK(!unaggregated_row_stream->has_write_iterator());
- if (more_aggregate_rows) {
-// aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL_EXCEPT_CURRENT);
- } else {
-// aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
- bool got_buffer;
- RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
- DCHECK(got_buffer)
- << "Accounted in min reservation" << parent->_buffer_pool_client.DebugString();
- }
-
- COUNTER_UPDATE(parent->num_spilled_partitions_, 1);
- if (parent->num_spilled_partitions_->value() == 1) {
- parent->add_runtime_exec_option("Spilled");
- }
- return Status::OK();
-}
+ hash_tbl->close();
+ hash_tbl.reset();
-void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
- if (is_closed) return;
- is_closed = true;
- if (aggregated_row_stream.get() != NULL) {
- if (finalize_rows && hash_tbl.get() != NULL) {
- // We need to walk all the rows and Finalize them here so the UDA gets a chance
- // to cleanup. If the hash table is gone (meaning this was spilled), the rows
- // should have been finalized/serialized in Spill().
- parent->CleanupHashTbl(agg_fn_evals, hash_tbl->Begin(parent->ht_ctx_.get()));
+ // Try to switch both streams to IO-sized buffers to avoid allocating small buffers
+ // for spilled partition.
+ bool got_buffer = true;
+ if (aggregated_row_stream->using_small_buffers()) {
+ RETURN_IF_ERROR(aggregated_row_stream->switch_to_io_buffers(&got_buffer));
+ }
+ // Unpin the stream as soon as possible to increase the changes that the
+ // switch_to_io_buffers() call below will succeed.
+ DCHECK(!got_buffer || aggregated_row_stream->has_write_block())
+ << aggregated_row_stream->debug_string();
+ RETURN_IF_ERROR(aggregated_row_stream->unpin_stream(false));
+
+ if (got_buffer && unaggregated_row_stream->using_small_buffers()) {
+ RETURN_IF_ERROR(unaggregated_row_stream->switch_to_io_buffers(&got_buffer));
+ }
+ if (!got_buffer) {
+ // We'll try again to get the buffers when the stream fills up the small buffers.
+ VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition "
+ << this << " of agg=" << parent->_id << " agg small buffers="
+ << aggregated_row_stream->using_small_buffers()
+ << " unagg small buffers="
+ << unaggregated_row_stream->using_small_buffers();
+ VLOG_FILE << get_stack_trace();
}
- aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- }
- if (hash_tbl.get() != NULL) hash_tbl->Close();
- if (unaggregated_row_stream.get() != NULL) {
- unaggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- }
-
- for (NewAggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
- if (agg_fn_pool.get() != NULL) agg_fn_pool->free_all();
-}
-
-Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
- const vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
- DCHECK(grouping_exprs_.empty());
- Tuple* output_tuple = Tuple::create(intermediate_tuple_desc_->byte_size(), pool);
- InitAggSlots(agg_fn_evals, output_tuple);
- return output_tuple;
-}
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
- const vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool, Status* status) {
- const int fixed_size = intermediate_tuple_desc_->byte_size();
- const int varlen_size = GroupingExprsVarlenSize();
- const int tuple_data_size = fixed_size + varlen_size;
- uint8_t* tuple_data = pool->try_allocate(tuple_data_size);
- if (UNLIKELY(tuple_data == NULL)) {
- string details = Substitute("Cannot perform aggregation at node with id $0. Failed "
- "to allocate $1 bytes for intermediate tuple.", _id, tuple_data_size);
- *status = pool->mem_tracker()->MemLimitExceeded(state_, details, tuple_data_size);
- return NULL;
- }
- memset(tuple_data, 0, fixed_size);
- Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(tuple_data);
- uint8_t* varlen_data = tuple_data + fixed_size;
- CopyGroupingValues(intermediate_tuple, varlen_data, varlen_size);
- InitAggSlots(agg_fn_evals, intermediate_tuple);
- return intermediate_tuple;
+ COUNTER_UPDATE(parent->_num_spilled_partitions, 1);
+ if (parent->_num_spilled_partitions->value() == 1) {
+ parent->add_runtime_exec_option("Spilled");
+ }
+ return Status::OK();
}
-Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
- const vector<NewAggFnEvaluator*>& agg_fn_evals, BufferedTupleStream3* stream,
- Status* status) {
- DCHECK(stream != NULL && status != NULL);
- // Allocate space for the entire tuple in the stream.
- const int fixed_size = intermediate_tuple_desc_->byte_size();
- const int varlen_size = GroupingExprsVarlenSize();
- const int tuple_size = fixed_size + varlen_size;
- uint8_t* tuple_data = stream->AddRowCustomBegin(tuple_size, status);
- if (UNLIKELY(tuple_data == nullptr)) {
- // If we failed to allocate and did not hit an error (indicated by a non-ok status),
- // the caller of this function can try to free some space, e.g. through spilling, and
- // re-attempt to allocate space for this row.
- return nullptr;
- }
- Tuple* tuple = reinterpret_cast<Tuple*>(tuple_data);
- tuple->init(fixed_size);
- uint8_t* varlen_buffer = tuple_data + fixed_size;
- CopyGroupingValues(tuple, varlen_buffer, varlen_size);
- InitAggSlots(agg_fn_evals, tuple);
- stream->AddRowCustomEnd(tuple_size);
- return tuple;
-}
+void PartitionedAggregationNode::Partition::close(bool finalize_rows) {
+ if (is_closed) {
+ return;
+ }
+ is_closed = true;
+ if (aggregated_row_stream.get() != NULL) {
+ if (finalize_rows && hash_tbl.get() != NULL) {
+ // We need to walk all the rows and finalize them here so the UDA gets a chance
+ // to cleanup. If the hash table is gone (meaning this was spilled), the rows
+ // should have been finalized/serialized in spill().
+ parent->cleanup_hash_tbl(agg_fn_ctxs, hash_tbl->begin(parent->_ht_ctx.get()));
+ }
+ aggregated_row_stream->close();
+ }
+ if (hash_tbl.get() != NULL) {
+ hash_tbl->close();
+ }
+ if (unaggregated_row_stream.get() != NULL) {
+ unaggregated_row_stream->close();
+ }
-int PartitionedAggregationNode::GroupingExprsVarlenSize() {
- int varlen_size = 0;
- // TODO: The hash table could compute this as it hashes.
- for (int expr_idx: string_grouping_exprs_) {
- StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx));
- // Avoid branching by multiplying length by null bit.
- varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx);
- }
- return varlen_size;
+ for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
+ agg_fn_ctxs[i]->impl()->close();
+ }
+ if (agg_fn_pool.get() != NULL) {
+ agg_fn_pool->free_all();
+ }
}
-// TODO: codegen this function.
-void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
- uint8_t* buffer, int varlen_size) {
- // Copy over all grouping slots (the variable length data is copied below).
- for (int i = 0; i < grouping_exprs_.size(); ++i) {
- SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i];
- if (ht_ctx_->ExprValueNull(i)) {
- intermediate_tuple->set_null(slot_desc->null_indicator_offset());
+Tuple* PartitionedAggregationNode::construct_intermediate_tuple(
+ const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool,
+ BufferedTupleStream2* stream, Status* status) {
+ Tuple* intermediate_tuple = NULL;
+ uint8_t* buffer = NULL;
+ if (pool != NULL) {
+ DCHECK(stream == NULL && status == NULL);
+ intermediate_tuple = Tuple::create(_intermediate_tuple_desc->byte_size(), pool);
} else {
- void* src = ht_ctx_->ExprValue(i);
- void* dst = intermediate_tuple->get_slot(slot_desc->tuple_offset());
- memcpy(dst, src, slot_desc->slot_size());
+ DCHECK(stream != NULL && status != NULL);
+ // Figure out how big it will be to copy the entire tuple. We need the tuple to end
+ // up in one block in the stream.
+ int size = _intermediate_tuple_desc->byte_size();
+ if (_contains_var_len_grouping_exprs) {
+ // TODO: This is likely to be too slow. The hash table could maintain this as
+ // it hashes.
+ for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
+ if (!_probe_expr_ctxs[i]->root()->type().is_string_type()) {
+ continue;
+ }
+ if (_ht_ctx->last_expr_value_null(i)) {
+ continue;
+ }
+ StringValue* sv = reinterpret_cast<StringValue*>(_ht_ctx->last_expr_value(i));
+ size += sv->len;
+ }
+ }
+
+ // Now that we know the size of the row, allocate space for it in the stream.
+ buffer = stream->allocate_row(size, status);
+ if (buffer == NULL) {
+ if (!status->ok() || !stream->using_small_buffers()) {
+ return NULL;
+ }
+ // IMPALA-2352: Make a best effort to switch to IO buffers and re-allocate.
+ // If switch_to_io_buffers() fails the caller of this function can try to free
+ // some space, e.g. through spilling, and re-attempt to allocate space for
+ // this row.
+ bool got_buffer = false;
+ *status = stream->switch_to_io_buffers(&got_buffer);
+ if (!status->ok() || !got_buffer) {
+ return NULL;
+ }
+ buffer = stream->allocate_row(size, status);
+ if (buffer == NULL) {
+ return NULL;
+ }
+ }
+ intermediate_tuple = reinterpret_cast<Tuple*>(buffer);
+ // TODO: remove this. we shouldn't need to zero the entire tuple.
+ intermediate_tuple->init(size);
+ buffer += _intermediate_tuple_desc->byte_size();
+ }
+
+ // Copy grouping values.
+ vector<SlotDescriptor*>::const_iterator slot_desc = _intermediate_tuple_desc->slots().begin();
+ for (int i = 0; i < _probe_expr_ctxs.size(); ++i, ++slot_desc) {
+ if (_ht_ctx->last_expr_value_null(i)) {
+ intermediate_tuple->set_null((*slot_desc)->null_indicator_offset());
+ } else {
+ void* src = _ht_ctx->last_expr_value(i);
+ void* dst = intermediate_tuple->get_slot((*slot_desc)->tuple_offset());
+ if (stream == NULL) {
+ RawValue::write(src, dst, (*slot_desc)->type(), pool);
+ } else {
+ RawValue::write(src, (*slot_desc)->type(), dst, &buffer);
+ }
+ }
}
- }
-
- for (int expr_idx: string_grouping_exprs_) {
- if (ht_ctx_->ExprValueNull(expr_idx)) continue;
-
- SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx];
- // ptr and len were already copied to the fixed-len part of string value
- StringValue* sv = reinterpret_cast<StringValue*>(
- intermediate_tuple->get_slot(slot_desc->tuple_offset()));
- memcpy(buffer, sv->ptr, sv->len);
- sv->ptr = reinterpret_cast<char*>(buffer);
- buffer += sv->len;
- }
+
+ // Initialize aggregate output.
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++slot_desc) {
+ while (!(*slot_desc)->is_materialized()) {
+ ++slot_desc;
+ }
+ AggFnEvaluator* evaluator = _aggregate_evaluators[i];
+ evaluator->init(agg_fn_ctxs[i], intermediate_tuple);
+ // Codegen specific path for min/max.
+ // To minimize branching on the update_tuple path, initialize the result value
+ // so that update_tuple doesn't have to check if the aggregation
+ // dst slot is null.
+ // TODO: remove when we don't use the irbuilder for codegen here. This optimization
+ // will no longer be necessary when all aggregates are implemented with the UDA
+ // interface.
+ // if ((*slot_desc)->type().type != TYPE_STRING &&
+ // (*slot_desc)->type().type != TYPE_VARCHAR &&
+ // (*slot_desc)->type().type != TYPE_TIMESTAMP &&
+ // (*slot_desc)->type().type != TYPE_CHAR &&
+ // (*slot_desc)->type().type != TYPE_DECIMAL) {
+ if (!(*slot_desc)->type().is_string_type()
+ && !(*slot_desc)->type().is_date_type()) {
+ ExprValue default_value;
+ void* default_value_ptr = NULL;
+ switch (evaluator->agg_op()) {
+ case AggFnEvaluator::MIN:
+ default_value_ptr = default_value.set_to_max((*slot_desc)->type());
+ RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
+ break;
+ case AggFnEvaluator::MAX:
+ default_value_ptr = default_value.set_to_min((*slot_desc)->type());
+ RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ return intermediate_tuple;
}
-// TODO: codegen this function.
-void PartitionedAggregationNode::InitAggSlots(
- const vector<NewAggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
- vector<SlotDescriptor*>::const_iterator slot_desc =
- intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size();
- for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) {
- // To minimize branching on the UpdateTuple path, initialize the result value so that
- // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for
- // SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can
- // just start adding to the destination value (rather than repeatedly checking the
- // destination NULL bit. The codegen'd version of UpdateSlot() exploits this to
- // eliminate a branch per value.
- //
- // For boolean and numeric types, the default values are false/0, so the nullable
- // aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(),
- // initialize the value to max/min possible value for the same effect.
- NewAggFnEvaluator* eval = agg_fn_evals[i];
- eval->Init(intermediate_tuple);
-
- DCHECK(agg_fns_[i] == &(eval->agg_fn()));
- const AggFn* agg_fn = agg_fns_[i];
- const AggFn::AggregationOp agg_op = agg_fn->agg_op();
- if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) &&
- !agg_fn->intermediate_type().is_string_type() &&
- !agg_fn->intermediate_type().is_date_type()) {
- ExprValue default_value;
- void* default_value_ptr = NULL;
- if (agg_op == AggFn::MIN) {
- default_value_ptr = default_value.set_to_max((*slot_desc)->type());
- } else {
- DCHECK_EQ(agg_op, AggFn::MAX);
- default_value_ptr = default_value.set_to_min((*slot_desc)->type());
- }
- RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
+void PartitionedAggregationNode::update_tuple(FunctionContext** agg_fn_ctxs,
+ Tuple* tuple, TupleRow* row, bool is_merge) {
+ DCHECK(tuple != NULL || _aggregate_evaluators.empty());
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ if (is_merge) {
+ _aggregate_evaluators[i]->merge(agg_fn_ctxs[i], row->get_tuple(0), tuple);
+ } else {
+ _aggregate_evaluators[i]->add(agg_fn_ctxs[i], row, tuple);
+ }
}
- }
}
-void PartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals,
- Tuple* tuple, TupleRow* row, bool is_merge) {
- DCHECK(tuple != NULL || agg_fns_.empty());
- for (int i = 0; i < agg_fns_.size(); ++i) {
- if (is_merge) {
- agg_fn_evals[i]->Merge(row->get_tuple(0), tuple);
+Tuple* PartitionedAggregationNode::get_output_tuple(
+ const vector<FunctionContext*>& agg_fn_ctxs, Tuple* tuple, MemPool* pool) {
+ DCHECK(tuple != NULL || _aggregate_evaluators.empty()) << tuple;
+ Tuple* dst = tuple;
+ // if (_needs_finalize && _intermediate_tuple_id != _output_tuple_id) {
+ if (_needs_finalize) {
+ dst = Tuple::create(_output_tuple_desc->byte_size(), pool);
+ }
+ if (_needs_finalize) {
+ AggFnEvaluator::finalize(_aggregate_evaluators, agg_fn_ctxs, tuple, dst);
} else {
- agg_fn_evals[i]->Add(row, tuple);
+ AggFnEvaluator::serialize(_aggregate_evaluators, agg_fn_ctxs, tuple);
+ }
+ // Copy grouping values from tuple to dst.
+ // TODO: Codegen this.
+ if (dst != tuple) {
+ int num_grouping_slots = _probe_expr_ctxs.size();
+ for (int i = 0; i < num_grouping_slots; ++i) {
+ SlotDescriptor* src_slot_desc = _intermediate_tuple_desc->slots()[i];
+ SlotDescriptor* dst_slot_desc = _output_tuple_desc->slots()[i];
+ bool src_slot_null = tuple->is_null(src_slot_desc->null_indicator_offset());
+ void* src_slot = NULL;
+ if (!src_slot_null) {
+ src_slot = tuple->get_slot(src_slot_desc->tuple_offset());
+ }
+ RawValue::write(src_slot, dst, dst_slot_desc, NULL);
+ }
}
- }
+ return dst;
}
-Tuple* PartitionedAggregationNode::GetOutputTuple(
- const vector<NewAggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
- DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple;
- Tuple* dst = tuple;
- if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
- dst = Tuple::create(output_tuple_desc_->byte_size(), pool);
- }
- if (needs_finalize_) {
- NewAggFnEvaluator::Finalize(agg_fn_evals, tuple, dst);
- } else {
- NewAggFnEvaluator::Serialize(agg_fn_evals, tuple);
- }
- // Copy grouping values from tuple to dst.
- // TODO: Codegen this.
- if (dst != tuple) {
- int num_grouping_slots = grouping_exprs_.size();
- for (int i = 0; i < num_grouping_slots; ++i) {
- SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
- SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
- bool src_slot_null = tuple->is_null(src_slot_desc->null_indicator_offset());
- void* src_slot = NULL;
- if (!src_slot_null) src_slot = tuple->get_slot(src_slot_desc->tuple_offset());
- RawValue::write(src_slot, dst, dst_slot_desc, NULL);
+Status PartitionedAggregationNode::append_spilled_row(BufferedTupleStream2* stream, TupleRow* row) {
+ DCHECK(stream != NULL);
+ DCHECK(!stream->is_pinned());
+ DCHECK(stream->has_write_block());
+ if (LIKELY(stream->add_row(row, &_process_batch_status))) {
+ return Status::OK();
}
- }
- return dst;
-}
-template <bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AppendSpilledRow(
- Partition* partition, TupleRow* row) {
- DCHECK(!is_streaming_preagg_);
- DCHECK(partition->is_spilled());
- BufferedTupleStream3* stream = AGGREGATED_ROWS ?
- partition->aggregated_row_stream.get() :
- partition->unaggregated_row_stream.get();
- DCHECK(!stream->is_pinned());
- Status status;
- if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
- RETURN_IF_ERROR(status);
-
- // Keep trying to free memory by spilling until we succeed or hit an error.
- // Running out of partitions to spill is treated as an error by SpillPartition().
- while (true) {
- RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
- if (stream->AddRow(row, &status)) return Status::OK();
- RETURN_IF_ERROR(status);
- }
-}
+ // Adding fails iff either we hit an error or haven't switched to I/O buffers.
+ RETURN_IF_ERROR(_process_batch_status);
+ while (true) {
+ bool got_buffer = false;
+ RETURN_IF_ERROR(stream->switch_to_io_buffers(&got_buffer));
+ if (got_buffer) {
+ break;
+ }
+ RETURN_IF_ERROR(spill_partition());
+ }
-string PartitionedAggregationNode::DebugString(int indentation_level) const {
- stringstream ss;
- DebugString(indentation_level, &ss);
- return ss.str();
+ // Adding the row should succeed after the I/O buffer switch.
+ if (stream->add_row(row, &_process_batch_status)) {
+ return Status::OK();
+ }
+ DCHECK(!_process_batch_status.ok());
+ return _process_batch_status;
}
-void PartitionedAggregationNode::DebugString(int indentation_level,
- stringstream* out) const {
- *out << string(indentation_level * 2, ' ');
- *out << "PartitionedAggregationNode("
- << "intermediate_tuple_id=" << intermediate_tuple_id_
- << " output_tuple_id=" << output_tuple_id_
- << " needs_finalize=" << needs_finalize_
- << " grouping_exprs=" << Expr::debug_string(grouping_exprs_)
- << " agg_exprs=" << AggFn::DebugString(agg_fns_);
- ExecNode::debug_string(indentation_level, out);
- *out << ")";
+void PartitionedAggregationNode::debug_string(int indentation_level, stringstream* out) const {
+ *out << string(indentation_level * 2, ' ');
+ *out << "PartitionedAggregationNode("
+ << "intermediate_tuple_id=" << _intermediate_tuple_id
+ << " output_tuple_id=" << _output_tuple_id
+ << " needs_finalize=" << _needs_finalize
+ << " probe_exprs=" << Expr::debug_string(_probe_expr_ctxs)
+ << " agg_exprs=" << AggFnEvaluator::debug_string(_aggregate_evaluators);
+ ExecNode::debug_string(indentation_level, out);
+ *out << ")";
}
-Status PartitionedAggregationNode::CreateHashPartitions(
- int level, int single_partition_idx) {
- if (is_streaming_preagg_) DCHECK_EQ(level, 0);
- if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
+Status PartitionedAggregationNode::create_hash_partitions(int level) {
+ if (level >= MAX_PARTITION_DEPTH) {
stringstream error_msg;
error_msg << "Cannot perform aggregation at hash aggregation node with id "
<< _id << '.'
@@ -1100,360 +814,280 @@ Status PartitionedAggregationNode::CreateHashPartitions(
<< MAX_PARTITION_DEPTH << " times."
<< " This could mean there is significant skew in the data or the memory limit is"
<< " set too low.";
- return state_->set_mem_limit_exceeded(error_msg.str());
- }
- ht_ctx_->set_level(level);
-
- DCHECK(hash_partitions_.empty());
- int num_partitions_created = 0;
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- hash_tbls_[i] = nullptr;
- if (single_partition_idx == -1 || i == single_partition_idx) {
- Partition* new_partition = partition_pool_->add(new Partition(this, level, i));
- ++num_partitions_created;
- hash_partitions_.push_back(new_partition);
- RETURN_IF_ERROR(new_partition->InitStreams());
- } else {
- hash_partitions_.push_back(nullptr);
- }
- }
-
- // Now that all the streams are reserved (meaning we have enough memory to execute
- // the algorithm), allocate the hash tables. These can fail and we can still continue.
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- Partition* partition = hash_partitions_[i];
- if (partition == nullptr) continue;
- if (partition->aggregated_row_stream == nullptr) {
- // Failed to create the aggregated row stream - cannot create a hash table.
- // Just continue with a NULL hash table so rows will be passed through.
- DCHECK(is_streaming_preagg_);
- } else {
- bool got_memory;
- RETURN_IF_ERROR(partition->InitHashTable(&got_memory));
- // Spill the partition if we cannot create a hash table for a merge aggregation.
- if (UNLIKELY(!got_memory)) {
- DCHECK(!is_streaming_preagg_) << "Preagg reserves enough memory for hash tables";
- // If we're repartitioning, we will be writing aggregated rows first.
- RETURN_IF_ERROR(partition->Spill(level > 0));
- }
+ return _state->set_mem_limit_exceeded(error_msg.str());
}
- hash_tbls_[i] = partition->hash_tbl.get();
- }
- // In this case we did not have to repartition, so ensure that while building the hash
- // table all rows will be inserted into the partition at 'single_partition_idx' in case
- // a non deterministic grouping expression causes a row to hash to a different
- // partition index.
- if (single_partition_idx != -1) {
- Partition* partition = hash_partitions_[single_partition_idx];
+ _ht_ctx->set_level(level);
+
+ DCHECK(_hash_partitions.empty());
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- hash_partitions_[i] = partition;
- hash_tbls_[i] = partition->hash_tbl.get();
+ Partition* new_partition = new Partition(this, level);
+ DCHECK(new_partition != NULL);
+ _hash_partitions.push_back(_partition_pool->add(new_partition));
+ RETURN_IF_ERROR(new_partition->init_streams());
}
- }
-
- COUNTER_UPDATE(partitions_created_, num_partitions_created);
- if (!is_streaming_preagg_) {
- COUNTER_SET(max_partition_level_, level);
- }
- return Status::OK();
-}
+ DCHECK_GT(_state->block_mgr2()->num_reserved_buffers_remaining(_block_mgr_client), 0);
-Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
- bool partitioning_aggregated_rows, int num_rows, const NewPartitionedHashTableCtx* ht_ctx) {
- DCHECK(!is_streaming_preagg_);
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- Partition* partition = hash_partitions_[i];
- if (partition == nullptr) continue;
- while (!partition->is_spilled()) {
- {
- SCOPED_TIMER(ht_resize_timer_);
- bool resized;
- RETURN_IF_ERROR(partition->hash_tbl->CheckAndResize(num_rows, ht_ctx, &resized));
- if (resized) break;
- }
- RETURN_IF_ERROR(SpillPartition(partitioning_aggregated_rows));
+ // Now that all the streams are reserved (meaning we have enough memory to execute
+ // the algorithm), allocate the hash tables. These can fail and we can still continue.
+ for (int i = 0; i < PARTITION_FANOUT; ++i) {
+ if (!_hash_partitions[i]->init_hash_table()) {
+ RETURN_IF_ERROR(_hash_partitions[i]->spill());
+ }
}
- }
- return Status::OK();
+ COUNTER_UPDATE(_partitions_created, PARTITION_FANOUT);
+ // COUNTER_SET(_max_partition_level, level);
+ return Status::OK();
}
-Status PartitionedAggregationNode::NextPartition() {
- DCHECK(output_partition_ == nullptr);
-
- if (!is_in_subplan() && spilled_partitions_.empty()) {
- // All partitions are in memory. Release reservation that was used for previous
- // partitions that is no longer needed. If we have spilled partitions, we want to
- // hold onto all reservation in case it is needed to process the spilled partitions.
- DCHECK(!_buffer_pool_client.has_unpinned_pages());
- Status status = release_unused_reservation();
- DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are "
- << "no unpinned pages. " << status.get_error_msg();
- }
-
- // Keep looping until we get to a partition that fits in memory.
- Partition* partition = nullptr;
- while (true) {
- // First return partitions that are fully aggregated (and in memory).
- if (!aggregated_partitions_.empty()) {
- partition = aggregated_partitions_.front();
- DCHECK(!partition->is_spilled());
- aggregated_partitions_.pop_front();
- break;
+Status PartitionedAggregationNode::check_and_resize_hash_partitions(int num_rows,
+ PartitionedHashTableCtx* ht_ctx) {
+ for (int i = 0; i < PARTITION_FANOUT; ++i) {
+ Partition* partition = _hash_partitions[i];
+ while (!partition->is_spilled()) {
+ {
+ SCOPED_TIMER(_ht_resize_timer);
+ if (partition->hash_tbl->check_and_resize(num_rows, ht_ctx)) {
+ break;
+ }
+ }
+ // There was not enough memory for the resize. Spill a partition and retry.
+ RETURN_IF_ERROR(spill_partition());
+ }
}
-
- // No aggregated partitions in memory - we should not be using any reservation aside
- // from 'serialize_stream_'.
- DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0,
- _buffer_pool_client.GetUsedReservation()) << _buffer_pool_client.DebugString();
-
- // Try to fit a single spilled partition in memory. We can often do this because
- // we only need to fit 1/PARTITION_FANOUT of the data in memory.
- // TODO: in some cases when the partition probably won't fit in memory it could
- // be better to skip directly to repartitioning.
- RETURN_IF_ERROR(BuildSpilledPartition(&partition));
- if (partition != nullptr) break;
-
- // If we can't fit the partition in memory, repartition it.
- RETURN_IF_ERROR(RepartitionSpilledPartition());
- }
- DCHECK(!partition->is_spilled());
- DCHECK(partition->hash_tbl.get() != nullptr);
- DCHECK(partition->aggregated_row_stream->is_pinned());
-
- output_partition_ = partition;
- output_iterator_ = output_partition_->hash_tbl->Begin(ht_ctx_.get());
- COUNTER_UPDATE(num_hash_buckets_, output_partition_->hash_tbl->num_buckets());
- return Status::OK();
+ return Status::OK();
}
-Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) {
- DCHECK(!spilled_partitions_.empty());
- DCHECK(!is_streaming_preagg_);
- // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
- Partition* src_partition = spilled_partitions_.front();
- DCHECK(src_partition->is_spilled());
-
- // Create a new hash partition from the rows of the spilled partition. This is simpler
- // than trying to finish building a partially-built partition in place. We only
- // initialise one hash partition that all rows in 'src_partition' will hash to.
- RETURN_IF_ERROR(CreateHashPartitions(src_partition->level, src_partition->idx));
- Partition* dst_partition = hash_partitions_[src_partition->idx];
- DCHECK(dst_partition != nullptr);
-
- // Rebuild the hash table over spilled aggregate rows then start adding unaggregated
- // rows to the hash table. It's possible the partition will spill at either stage.
- // In that case we need to finish processing 'src_partition' so that all rows are
- // appended to 'dst_partition'.
- // TODO: if the partition spills again but the aggregation reduces the input
- // significantly, we could do better here by keeping the incomplete hash table in
- // memory and only spilling unaggregated rows that didn't fit in the hash table
- // (somewhat similar to the passthrough pre-aggregation).
- RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get()));
- RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get()));
- src_partition->Close(false);
- spilled_partitions_.pop_front();
- hash_partitions_.clear();
-
- if (dst_partition->is_spilled()) {
- PushSpilledPartition(dst_partition);
- *built_partition = nullptr;
- // Spilled the partition - we should not be using any reservation except from
- // 'serialize_stream_'.
- DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0,
- _buffer_pool_client.GetUsedReservation()) << _buffer_pool_client.DebugString();
- } else {
- *built_partition = dst_partition;
- }
- return Status::OK();
+int64_t PartitionedAggregationNode::largest_spilled_partition() const {
+ int64_t max_rows = 0;
+ for (int i = 0; i < _hash_partitions.size(); ++i) {
+ Partition* partition = _hash_partitions[i];
+ if (partition->is_closed || !partition->is_spilled()) {
+ continue;
+ }
+ int64_t rows = partition->aggregated_row_stream->num_rows() +
+ partition->unaggregated_row_stream->num_rows();
+ if (rows > max_rows) {
+ max_rows = rows;
+ }
+ }
+ return max_rows;
}
-Status PartitionedAggregationNode::RepartitionSpilledPartition() {
- DCHECK(!spilled_partitions_.empty());
- DCHECK(!is_streaming_preagg_);
- // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
- Partition* partition = spilled_partitions_.front();
- DCHECK(partition->is_spilled());
-
- // Create the new hash partitions to repartition into. This will allocate a
- // write buffer for each partition's aggregated row stream.
- RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1));
- COUNTER_UPDATE(num_repartitions_, 1);
-
- // Rows in this partition could have been spilled into two streams, depending
- // on if it is an aggregated intermediate, or an unaggregated row. Aggregated
- // rows are processed first to save a hash table lookup in ProcessBatch().
- RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
-
- // Prepare write buffers so we can append spilled rows to unaggregated partitions.
- for (Partition* hash_partition : hash_partitions_) {
- if (!hash_partition->is_spilled()) continue;
- // The aggregated rows have been repartitioned. Free up at least a buffer's worth of
- // reservation and use it to pin the unaggregated write buffer.
-// hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
- bool got_buffer;
- RETURN_IF_ERROR(
- hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
- DCHECK(got_buffer)
- << "Accounted in min reservation" << _buffer_pool_client.DebugString();
- }
- RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
-
- COUNTER_UPDATE(num_row_repartitioned_, partition->aggregated_row_stream->num_rows());
- COUNTER_UPDATE(num_row_repartitioned_, partition->unaggregated_row_stream->num_rows());
-
- partition->Close(false);
- spilled_partitions_.pop_front();
-
- // Done processing this partition. Move the new partitions into
- // spilled_partitions_/aggregated_partitions_.
- int64_t num_input_rows = partition->aggregated_row_stream->num_rows()
- + partition->unaggregated_row_stream->num_rows();
- RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
- return Status::OK();
-}
+Status PartitionedAggregationNode::next_partition() {
+ DCHECK(_output_partition == NULL);
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) {
- DCHECK(!is_streaming_preagg_);
- if (input_stream->num_rows() > 0) {
+ // Keep looping until we get to a partition that fits in memory.
+ Partition* partition = NULL;
while (true) {
- bool got_buffer = false;
- RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer));
- if (got_buffer) break;
- // Did not have a buffer to read the input stream. Spill and try again.
- RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
+ partition = NULL;
+ // First return partitions that are fully aggregated (and in memory).
+ if (!_aggregated_partitions.empty()) {
+ partition = _aggregated_partitions.front();
+ DCHECK(!partition->is_spilled());
+ _aggregated_partitions.pop_front();
+ break;
+ }
+
+ if (partition == NULL) {
+ DCHECK(!_spilled_partitions.empty());
+ DCHECK_EQ(_state->block_mgr2()->num_pinned_buffers(_block_mgr_client),
+ _needs_serialize ? 1 : 0);
+
+ // TODO: we can probably do better than just picking the first partition. We
+ // can base this on the amount written to disk, etc.
+ partition = _spilled_partitions.front();
+ DCHECK(partition->is_spilled());
+
+ // Create the new hash partitions to repartition into.
+ // TODO: we don't need to repartition here. We are now working on 1 / FANOUT
+ // of the input so it's reasonably likely it can fit. We should look at this
+ // partitions size and just do the aggregation if it fits in memory.
+ RETURN_IF_ERROR(create_hash_partitions(partition->level + 1));
+ COUNTER_UPDATE(_num_repartitions, 1);
+
+ // Rows in this partition could have been spilled into two streams, depending
+ // on if it is an aggregated intermediate, or an unaggregated row.
+ // Note: we must process the aggregated rows first to save a hash table lookup
+ // in process_batch().
+ RETURN_IF_ERROR(process_stream<true>(partition->aggregated_row_stream.get()));
+ RETURN_IF_ERROR(process_stream<false>(partition->unaggregated_row_stream.get()));
+
+ COUNTER_UPDATE(_num_row_repartitioned, partition->aggregated_row_stream->num_rows());
+ COUNTER_UPDATE(_num_row_repartitioned, partition->unaggregated_row_stream->num_rows());
+
+ partition->close(false);
+ _spilled_partitions.pop_front();
+
+ // Done processing this partition. Move the new partitions into
+ // _spilled_partitions/_aggregated_partitions.
+ int64_t num_input_rows = partition->aggregated_row_stream->num_rows() +
+ partition->unaggregated_row_stream->num_rows();
+
+ // Check if there was any reduction in the size of partitions after repartitioning.
+ int64_t largest_partition = largest_spilled_partition();
+ DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
+ "more rows than the input";
+ if (num_input_rows == largest_partition) {
+ // Status status = Status::MemTrackerExceeded();
+ // status.AddDetail(Substitute("Cannot perform aggregation at node with id $0. "
+ // "Repartitioning did not reduce the size of a spilled partition. "
+ // "Repartitioning level $1. Number of rows $2.",
+ // _id, partition->level + 1, num_input_rows));
+ // _state->SetMemTrackerExceeded();
+ stringstream error_msg;
+ error_msg << "Cannot perform aggregation at node with id " << _id << ". "
+ << "Repartitioning did not reduce the size of a spilled partition. "
+ << "Repartitioning level " << partition->level + 1
+ << ". Number of rows " << num_input_rows << " .";
+ return Status::MemoryLimitExceeded(error_msg.str());
+ }
+ RETURN_IF_ERROR(move_hash_partitions(num_input_rows));
+ }
}
- bool eos = false;
- const RowDescriptor* desc =
- AGGREGATED_ROWS ? &intermediate_row_desc_ : &(_children[0]->row_desc());
- RowBatch batch(*desc, state_->batch_size(), const_cast<MemTracker*>(mem_tracker()));
- do {
- RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
- RETURN_IF_ERROR(
- ProcessBatch<AGGREGATED_ROWS>(&batch, ht_ctx_.get()));
- RETURN_IF_ERROR(state_->check_query_state("New partitioned aggregation, while processing stream."));
- batch.reset();
- } while (!eos);
- }
- input_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- return Status::OK();
+ DCHECK(partition->hash_tbl.get() != NULL);
+ DCHECK(partition->aggregated_row_stream->is_pinned());
+
+ _output_partition = partition;
+ _output_iterator = _output_partition->hash_tbl->begin(_ht_ctx.get());
+ COUNTER_UPDATE(_num_hash_buckets, _output_partition->hash_tbl->num_buckets());
+ return Status::OK();
}
-Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
- int64_t max_freed_mem = 0;
- int partition_idx = -1;
-
- // Iterate over the partitions and pick the largest partition that is not spilled.
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- if (hash_partitions_[i] == nullptr) continue;
- if (hash_partitions_[i]->is_closed) continue;
- if (hash_partitions_[i]->is_spilled()) continue;
- // Pass 'true' because we need to keep the write block pinned. See Partition::Spill().
- int64_t mem = hash_partitions_[i]->aggregated_row_stream->BytesPinned(true);
- mem += hash_partitions_[i]->hash_tbl->ByteSize();
- mem += hash_partitions_[i]->agg_fn_pool->total_reserved_bytes();
- DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory.
- if (mem > max_freed_mem) {
- max_freed_mem = mem;
- partition_idx = i;
+template<bool AGGREGATED_ROWS>
+Status PartitionedAggregationNode::process_stream(BufferedTupleStream2* input_stream) {
+ if (input_stream->num_rows() > 0) {
+ while (true) {
+ bool got_buffer = false;
+ RETURN_IF_ERROR(input_stream->prepare_for_read(true, &got_buffer));
+ if (got_buffer) {
+ break;
+ }
+ // Did not have a buffer to read the input stream. Spill and try again.
+ RETURN_IF_ERROR(spill_partition());
+ }
+
+ bool eos = false;
+ RowBatch batch(AGGREGATED_ROWS ? *_intermediate_row_desc : _children[0]->row_desc(),
+ _state->batch_size(), mem_tracker());
+ do {
+ RETURN_IF_ERROR(input_stream->get_next(&batch, &eos));
+ RETURN_IF_ERROR(process_batch<AGGREGATED_ROWS>(&batch, _ht_ctx.get()));
+ RETURN_IF_ERROR(_state->query_status());
+ // free_local_allocations();
+ batch.reset();
+ } while (!eos);
}
- }
- DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to "
- << "reclaim memory: " << _buffer_pool_client.DebugString();
- // Remove references to the destroyed hash table from 'hash_tbls_'.
- // Additionally, we might be dealing with a rebuilt spilled partition, where all
- // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_'
- // remains consistent in that case.
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr;
- }
- return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
+ input_stream->close();
+ return Status::OK();
}
-Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
- DCHECK(!hash_partitions_.empty());
- std::stringstream ss;
- ss << "PA(node_id=" << id() << ") partitioned(level=" << hash_partitions_[0]->level
- << ") " << num_input_rows << " rows into:" << std::endl;
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* partition = hash_partitions_[i];
- if (partition == nullptr) continue;
- // We might be dealing with a rebuilt spilled partition, where all partitions are
- // pointing to a single in-memory partition, so make sure we only proceed for the
- // right partition.
- if(i != partition->idx) continue;
- int64_t aggregated_rows = 0;
- if (partition->aggregated_row_stream != nullptr) {
- aggregated_rows = partition->aggregated_row_stream->num_rows();
- }
- int64_t unaggregated_rows = 0;
- if (partition->unaggregated_row_stream != nullptr) {
- unaggregated_rows = partition->unaggregated_row_stream->num_rows();
+Status PartitionedAggregationNode::spill_partition() {
+ int64_t max_freed_mem = 0;
+ int partition_idx = -1;
+
+ // Iterate over the partitions and pick the largest partition that is not spilled.
+ for (int i = 0; i < _hash_partitions.size(); ++i) {
+ if (_hash_partitions[i]->is_closed) {
+ continue;
+ }
+ if (_hash_partitions[i]->is_spilled()) {
+ continue;
+ }
+ // TODO: In PHJ the bytes_in_mem() call also calculates the mem used by the
+ // _write_block, why do we ignore it here?
+ int64_t mem = _hash_partitions[i]->aggregated_row_stream->bytes_in_mem(true);
+ mem += _hash_partitions[i]->hash_tbl->byte_size();
+ mem += _hash_partitions[i]->agg_fn_pool->total_reserved_bytes();
+ if (mem > max_freed_mem) {
+ max_freed_mem = mem;
+ partition_idx = i;
+ }
}
- double total_rows = aggregated_rows + unaggregated_rows;
- double percent = total_rows * 100 / num_input_rows;
- ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
- << " (fraction=" << std::fixed << std::setprecision(2) << percent << "%)" << std::endl
- << " #aggregated rows:" << aggregated_rows << std::endl
- << " #unaggregated rows: " << unaggregated_rows << std::endl;
-
- // TODO: update counters to support doubles.
- COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
-
- if (total_rows == 0) {
- partition->Close(false);
- } else if (partition->is_spilled()) {
- PushSpilledPartition(partition);
- } else {
- aggregated_partitions_.push_back(partition);
+ if (partition_idx == -1) {
+ // Could not find a partition to spill. This means the mem limit was just too low.
+ return _state->block_mgr2()->mem_limit_too_low_error(_block_mgr_client, id());
}
- }
- VLOG(2) << ss.str();
- hash_partitions_.clear();
- return Status::OK();
+ return _hash_partitions[partition_idx]->spill();
}
-void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
- DCHECK(partition->is_spilled());
- DCHECK(partition->hash_tbl == nullptr);
- // Ensure all pages in the spilled partition's streams are unpinned by invalidating
- // the streams' read and write iterators. We may need all the memory to process the
- // next spilled partitions.
-// partition->aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
-// partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
- spilled_partitions_.push_front(partition);
+Status PartitionedAggregationNode::move_hash_partitions(int64_t num_input_rows) {
+ DCHECK(!_hash_partitions.empty());
+ stringstream ss;
+ ss << "PA(node_id=" << id() << ") partitioned(level="
+ << _hash_partitions[0]->level << ") "
+ << num_input_rows << " rows into:" << std::endl;
+ for (int i = 0; i < _hash_partitions.size(); ++i) {
+ Partition* partition = _hash_partitions[i];
+ int64_t aggregated_rows = partition->aggregated_row_stream->num_rows();
+ int64_t unaggregated_rows = partition->unaggregated_row_stream->num_rows();
+ int64_t total_rows = aggregated_rows + unaggregated_rows;
+ double percent = static_cast<double>(total_rows * 100) / num_input_rows;
+ ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
+ << " (fraction=" << std::fixed << std::setprecision(2) << percent << "%)" << std::endl
+ << " #aggregated rows:" << aggregated_rows << std::endl
+ << " #unaggregated rows: " << unaggregated_rows << std::endl;
+
+ // TODO: update counters to support doubles.
+ // COUNTER_SET(_largest_partition_percent, static_cast<int64_t>(percent));
+
+ if (total_rows == 0) {
+ partition->close(false);
+ } else if (partition->is_spilled()) {
+ DCHECK(partition->hash_tbl.get() == NULL);
+ // We need to unpin all the spilled partitions to make room to allocate new
+ // _hash_partitions when we repartition the spilled partitions.
+ // TODO: we only need to do this when we have memory pressure. This might be
+ // okay though since the block mgr should only write these to disk if there
+ // is memory pressure.
+ RETURN_IF_ERROR(partition->aggregated_row_stream->unpin_stream(true));
+ RETURN_IF_ERROR(partition->unaggregated_row_stream->unpin_stream(true));
+
+ // Push new created partitions at the front. This means a depth first walk
+ // (more finely partitioned partitions are processed first). This allows us
+ // to delete blocks earlier and bottom out the recursion earlier.
+ _spilled_partitions.push_front(partition);
+ } else {
+ _aggregated_partitions.push_back(partition);
+ }
+
+ }
+ VLOG(2) << ss.str();
+ _hash_partitions.clear();
+ return Status::OK();
}
-void PartitionedAggregationNode::ClosePartitions() {
- for (Partition* partition : hash_partitions_) {
- if (partition != nullptr) partition->Close(true);
- }
- hash_partitions_.clear();
- for (Partition* partition : aggregated_partitions_) partition->Close(true);
- aggregated_partitions_.clear();
- for (Partition* partition : spilled_partitions_) partition->Close(true);
- spilled_partitions_.clear();
- memset(hash_tbls_, 0, sizeof(hash_tbls_));
- partition_pool_->clear();
+void PartitionedAggregationNode::close_partitions() {
+ for (int i = 0; i < _hash_partitions.size(); ++i) {
+ _hash_partitions[i]->close(true);
+ }
+ for (list<Partition*>::iterator it = _aggregated_partitions.begin();
+ it != _aggregated_partitions.end(); ++it) {
+ (*it)->close(true);
+ }
+ for (list<Partition*>::iterator it = _spilled_partitions.begin();
+ it != _spilled_partitions.end(); ++it) {
+ (*it)->close(true);
+ }
+ _aggregated_partitions.clear();
+ _spilled_partitions.clear();
+ _hash_partitions.clear();
+ _partition_pool->clear();
}
-//Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
-// NewAggFnEvaluator::FreeLocalAllocations(agg_fn_evals_);
-// for (Partition* partition : hash_partitions_) {
-// if (partition != nullptr) {
-// NewAggFnEvaluator::FreeLocalAllocations(partition->agg_fn_evals);
-// }
-// }
-// if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
-// return ExecNode::QueryMaintenance(state);
-//}
-
-// Instantiate required templates.
-template Status PartitionedAggregationNode::AppendSpilledRow<false>(
- Partition*, TupleRow*);
-template Status PartitionedAggregationNode::AppendSpilledRow<true>(Partition*, TupleRow*);
+#if 0
+// Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
+// for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+// ExprContext::free_local_allocations(_aggregate_evaluators[i]->input_expr_ctxs());
+// }
+// ExprContext::free_local_allocations(_agg_fn_ctxs);
+// for (int i = 0; i < _hash_partitions.size(); ++i) {
+// ExprContext::free_local_allocations(_hash_partitions[i]->agg_fn_ctxs);
+// }
+// return ExecNode::QueryMaintenance(state);
+// }
+//
+#endif
}
-
diff --git a/be/src/exec/partitioned_aggregation_node.h b/be/src/exec/partitioned_aggregation_node.h
index 4a2f156..bbcc3e0 100644
--- a/be/src/exec/partitioned_aggregation_node.h
+++ b/be/src/exec/partitioned_aggregation_node.h
@@ -15,26 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef DORIS_BE_SRC_EXEC_NEW_PARTITIONED_AGGREGATION_NODE_H
-#define DORIS_BE_SRC_EXEC_NEW_PARTITIONED_AGGREGATION_NODE_H
-
-#include <deque>
+#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H
+#define DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H
+#include <functional>
#include <boost/scoped_ptr.hpp>
#include "exec/exec_node.h"
-#include "exec/new_partitioned_hash_table.h"
-#include "runtime/buffered_tuple_stream3.h"
-#include "runtime/bufferpool/suballocator.h"
+#include "exec/partitioned_hash_table.inline.h"
+#include "runtime/buffered_block_mgr2.h"
+#include "runtime/buffered_tuple_stream2.h"
#include "runtime/descriptors.h" // for TupleId
#include "runtime/mem_pool.h"
#include "runtime/string_value.h"
namespace doris {
-class AggFn;
-class NewAggFnEvaluator;
-class CodegenAnyVal;
+class AggFnEvaluator;
class RowBatch;
class RuntimeState;
struct StringValue;
@@ -42,661 +39,423 @@ class Tuple;
class TupleDescriptor;
class SlotDescriptor;
-/// Node for doing partitioned hash aggregation.
-/// This node consumes the input (which can be from the child(0) or a spilled partition).
-/// 1. Each row is hashed and we pick a dst partition (hash_partitions_).
-/// 2. If the dst partition is not spilled, we probe into the partitions hash table
-/// to aggregate/insert the row.
-/// 3. If the partition is already spilled, the input row is spilled.
-/// 4. When all the input is consumed, we walk hash_partitions_, put the spilled ones
-/// into spilled_partitions_ and the non-spilled ones into aggregated_partitions_.
-/// aggregated_partitions_ contain partitions that are fully processed and the result
-/// can just be returned. Partitions in spilled_partitions_ need to be repartitioned
-/// and we just repeat these steps.
+// Node for doing partitioned hash aggregation.
+// This node consumes the input (which can be from the child(0) or a spilled partition).
+// 1. Each row is hashed and we pick a dst partition (_hash_partitions).
+// 2. If the dst partition is not spilled, we probe into the partitions hash table
+// to aggregate/insert the row.
+// 3. If the partition is already spilled, the input row is spilled.
+// 4. When all the input is consumed, we walk _hash_partitions, put the spilled ones
+// into _spilled_partitions and the non-spilled ones into _aggregated_partitions.
+// _aggregated_partitions contain partitions that are fully processed and the result
+// can just be returned. Partitions in _spilled_partitions need to be repartitioned
+// and we just repeat these steps.
+//
+// Each partition contains these structures:
+// 1) Hash Table for aggregated rows. This contains just the hash table directory
+// structure but not the rows themselves. This is NULL for spilled partitions when
+// we stop maintaining the hash table.
+// 2) MemPool for var-len result data for rows in the hash table. If the aggregate
+// function returns a string, we cannot append it to the tuple stream as that
+// structure is immutable. Instead, when we need to spill, we sweep and copy the
+// rows into a tuple stream.
+// 3) Aggregated tuple stream for rows that are/were in the hash table. This stream
+// contains rows that are aggregated. When the partition is not spilled, this stream
+// is pinned and contains the memory referenced by the hash table.
+// In the case where the aggregate function does not return a string (meaning the
+// size of all the slots is known when the row is constructed), this stream contains
+// all the memory for the result rows and the MemPool (2) is not used.
+// 4) Unaggregated tuple stream. Stream to spill unaggregated rows.
+// Rows in this stream always have child(0)'s layout.
+//
+// Buffering: Each stream and hash table needs to maintain at least one buffer for
+// some duration of the processing. To minimize the memory requirements of small queries
+// (i.e. memory usage is less than one IO-buffer per partition), the streams and hash
+// tables of each partition start using small (less than IO-sized) buffers, regardless
+// of the level.
//
-/// Each partition contains these structures:
-/// 1) Hash Table for aggregated rows. This contains just the hash table directory
-/// structure but not the rows themselves. This is NULL for spilled partitions when
-/// we stop maintaining the hash table.
-/// 2) MemPool for var-len result data for rows in the hash table. If the aggregate
-/// function returns a string, we cannot append it to the tuple stream as that
-/// structure is immutable. Instead, when we need to spill, we sweep and copy the
-/// rows into a tuple stream.
-/// 3) Aggregated tuple stream for rows that are/were in the hash table. This stream
-/// contains rows that are aggregated. When the partition is not spilled, this stream
-/// is pinned and contains the memory referenced by the hash table.
-/// In the case where the aggregate function does not return a string (meaning the
-/// size of all the slots is known when the row is constructed), this stream contains
-/// all the memory for the result rows and the MemPool (2) is not used.
-/// 4) Unaggregated tuple stream. Stream to spill unaggregated rows.
-/// Rows in this stream always have child(0)'s layout.
-///
-/// Buffering: Each stream and hash table needs to maintain at least one buffer for
-/// some duration of the processing. To minimize the memory requirements of small queries
-/// (i.e. memory usage is less than one IO-buffer per partition), the streams and hash
-/// tables of each partition start using small (less than IO-sized) buffers, regardless
-/// of the level.
-///
-/// Two-phase aggregation: we support two-phase distributed aggregations, where
-/// pre-aggregrations attempt to reduce the size of data before shuffling data across the
-/// network to be merged by the merge aggregation node. This exec node supports a
-/// streaming mode for pre-aggregations where it maintains a hash table of aggregated
-/// rows, but can pass through unaggregated rows (after transforming them into the
-/// same tuple format as aggregated rows) when a heuristic determines that it is better
-/// to send rows across the network instead of consuming additional memory and CPU
-/// resources to expand its hash table. The planner decides whether a given
-/// pre-aggregation should use the streaming preaggregation algorithm or the same
-/// blocking aggregation algorithm as used in merge aggregations.
-/// TODO: make this less of a heuristic by factoring in the cost of the exchange vs the
-/// cost of the pre-aggregation.
-///
-/// If there are no grouping expressions, there is only a single output row for both
-/// preaggregations and merge aggregations. This case is handled separately to avoid
-/// building hash tables. There is also no need to do streaming preaggregations.
-///
-/// Handling memory pressure: the node uses two different strategies for responding to
-/// memory pressure, depending on whether it is a streaming pre-aggregation or not. If
-/// the node is a streaming preaggregation, it stops growing its hash table further by
-/// converting unaggregated rows into the aggregated tuple format and passing them
-/// through. If the node is not a streaming pre-aggregation, it responds to memory
-/// pressure by spilling partitions to disk.
-///
-/// TODO: Buffer rows before probing into the hash table?
-/// TODO: After spilling, we can still maintain a very small hash table just to remove
-/// some number of rows (from likely going to disk).
-/// TODO: Consider allowing to spill the hash table structure in addition to the rows.
-/// TODO: Do we want to insert a buffer before probing into the partition's hash table?
-/// TODO: Use a prefetch/batched probe interface.
-/// TODO: Return rows from the aggregated_row_stream rather than the HT.
-/// TODO: Think about spilling heuristic.
-/// TODO: When processing a spilled partition, we have a lot more information and can
-/// size the partitions/hash tables better.
-/// TODO: Start with unpartitioned (single partition) and switch to partitioning and
-/// spilling only if the size gets large, say larger than the LLC.
-/// TODO: Simplify or cleanup the various uses of agg_fn_ctx, agg_fn_ctx_, and ctx.
-/// There are so many contexts in use that a plain "ctx" variable should never be used.
-/// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this.
-/// TODO: support an Init() method with an initial value in the UDAF interface.
+// TODO: Buffer rows before probing into the hash table?
+// TODO: After spilling, we can still maintain a very small hash table just to remove
+// some number of rows (from likely going to disk).
+// TODO: Consider allowing to spill the hash table structure in addition to the rows.
+// TODO: Do we want to insert a buffer before probing into the partition's hash table?
+// TODO: Use a prefetch/batched probe interface.
+// TODO: Return rows from the aggregated_row_stream rather than the HT.
+// TODO: Think about spilling heuristic.
+// TODO: When processing a spilled partition, we have a lot more information and can
+// size the partitions/hash tables better.
+// TODO: Start with unpartitioned (single partition) and switch to partitioning and
+// spilling only if the size gets large, say larger than the LLC.
+// TODO: Simplify or cleanup the various uses of agg_fn_ctx, _agg_fn_ctx, and ctx.
+// There are so many contexts in use that a plain "ctx" variable should never be used.
+// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this.
class PartitionedAggregationNode : public ExecNode {
- public:
-
- PartitionedAggregationNode(ObjectPool* pool,
- const TPlanNode& tnode, const DescriptorTbl& descs);
-
- virtual Status init(const TPlanNode& tnode, RuntimeState* state);
- virtual Status prepare(RuntimeState* state);
-// virtual void Codegen(RuntimeState* state);
- virtual Status open(RuntimeState* state);
- virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
- virtual Status reset(RuntimeState* state);
- virtual Status close(RuntimeState* state);
-
- protected:
- /// Frees local allocations from aggregate_evals_ and agg_fn_evals
-// virtual Status QueryMaintenance(RuntimeState* state);
- virtual std::string DebugString(int indentation_level) const;
- virtual void DebugString(int indentation_level, std::stringstream* out) const;
-
- private:
- struct Partition;
-
- /// Number of initial partitions to create. Must be a power of 2.
- static const int PARTITION_FANOUT = 16;
-
- /// Needs to be the log(PARTITION_FANOUT).
- /// We use the upper bits to pick the partition and lower bits in the HT.
- /// TODO: different hash functions here too? We don't need that many bits to pick
- /// the partition so this might be okay.
- static const int NUM_PARTITIONING_BITS = 4;
-
- /// Maximum number of times we will repartition. The maximum build table we can process
- /// (if we have enough scratch disk space) in case there is no skew is:
- /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH).
- /// In the case where there is skew, repartitioning is unlikely to help (assuming a
- /// reasonable hash function).
- /// Note that we need to have at least as many SEED_PRIMES in NewPartitionedHashTableCtx.
- /// TODO: we can revisit and try harder to explicitly detect skew.
- static const int MAX_PARTITION_DEPTH = 16;
-
- /// Default initial number of buckets in a hash table.
- /// TODO: rethink this ?
- static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
-
- /// Codegen doesn't allow for automatic Status variables because then exception
- /// handling code is needed to destruct the Status, and our function call substitution
- /// doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by
- /// placing the Status here so exceptions won't need to destruct it.
- /// TODO: fix IMPALA-1948 and remove this.
- Status process_batch_status_;
-
- /// Tuple into which Update()/Merge()/Serialize() results are stored.
- TupleId intermediate_tuple_id_;
- TupleDescriptor* intermediate_tuple_desc_;
-
- /// Row with the intermediate tuple as its only tuple.
- /// Construct a new row desc for preparing the build exprs because neither the child's
- /// nor this node's output row desc may contain the intermediate tuple, e.g.,
- /// in a single-node plan with an intermediate tuple different from the output tuple.
- /// Lives in the query state's obj_pool.
- RowDescriptor intermediate_row_desc_;
-
- /// Tuple into which Finalize() results are stored. Possibly the same as
- /// the intermediate tuple.
- TupleId output_tuple_id_;
- TupleDescriptor* output_tuple_desc_;
-
- /// Certain aggregates require a finalize step, which is the final step of the
- /// aggregate after consuming all input rows. The finalize step converts the aggregate
- /// value into its final form. This is true if this node contains aggregate that
- /// requires a finalize step.
- const bool needs_finalize_;
-
- /// True if this is first phase of a two-phase distributed aggregation for which we
- /// are doing a streaming preaggregation.
- bool is_streaming_preagg_;
-
- /// True if any of the evaluators require the serialize step.
- bool needs_serialize_;
-
- /// The list of all aggregate operations for this exec node.
- std::vector<AggFn*> agg_fns_;
-
- /// Evaluators for each aggregate function. If this is a grouping aggregation, these
- /// evaluators are only used to create cloned per-partition evaluators. The cloned
- /// evaluators are then used to evaluate the functions. If this is a non-grouping
- /// aggregation these evaluators are used directly to evaluate the functions.
- ///
- /// Permanent and result allocations for these allocators are allocated from
- /// 'expr_perm_pool_' and 'expr_results_pool_' respectively.
- std::vector<NewAggFnEvaluator*> agg_fn_evals_;
- boost::scoped_ptr<MemPool> agg_fn_pool_;
-
- /// Exprs used to evaluate input rows
- std::vector<Expr*> grouping_exprs_;
-
- /// Exprs used to insert constructed aggregation tuple into the hash table.
- /// All the exprs are simply SlotRefs for the intermediate tuple.
- std::vector<Expr*> build_exprs_;
-
- /// Exprs used to evaluate input rows
- /// TODO (pengyubing) Is this variable useful?
- std::vector<ExprContext*> grouping_expr_ctxs_;
-
- /// Indices of grouping exprs with var-len string types in grouping_expr_ctxs_. We need
- /// to do more work for var-len expressions when allocating and spilling rows. All
- /// var-len grouping exprs have type string.
- std::vector<int> string_grouping_exprs_;
-
- RuntimeState* state_;
- /// Allocator for hash table memory.
- boost::scoped_ptr<Suballocator> ht_allocator_;
- /// MemPool used to allocate memory for when we don't have grouping and don't initialize
- /// the partitioning structures, or during Close() when creating new output tuples.
- /// For non-grouping aggregations, the ownership of the pool's memory is transferred
- /// to the output batch on eos. The pool should not be Reset() to allow amortizing
- /// memory allocation over a series of Reset()/Open()/GetNext()* calls.
- boost::scoped_ptr<MemPool> mem_pool_;
-
- // MemPool for allocations made by copying expr results
- boost::scoped_ptr<MemPool> expr_results_pool_;
-
- /// The current partition and iterator to the next row in its hash table that we need
- /// to return in GetNext()
- Partition* output_partition_;
- NewPartitionedHashTable::Iterator output_iterator_;
-
- typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, RowBatch*);
- /// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is disabled.
- ProcessBatchNoGroupingFn process_batch_no_grouping_fn_;
-
- typedef Status (*ProcessBatchFn)(
- PartitionedAggregationNode*, RowBatch*, NewPartitionedHashTableCtx*);
- /// Jitted ProcessBatch function pointer. Null if codegen is disabled.
- ProcessBatchFn process_batch_fn_;
-
- typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool,
- RowBatch*, RowBatch*, NewPartitionedHashTableCtx*, int[PARTITION_FANOUT]);
- /// Jitted ProcessBatchStreaming function pointer. Null if codegen is disabled.
- ProcessBatchStreamingFn process_batch_streaming_fn_;
-
- /// Time spent processing the child rows
- RuntimeProfile::Counter* build_timer_;
-
- /// Total time spent resizing hash tables.
- RuntimeProfile::Counter* ht_resize_timer_;
-
- /// Time spent returning the aggregated rows
- RuntimeProfile::Counter* get_results_timer_;
-
- /// Total number of hash buckets across all partitions.
- RuntimeProfile::Counter* num_hash_buckets_;
-
- /// Total number of partitions created.
- RuntimeProfile::Counter* partitions_created_;
-
- /// Level of max partition (i.e. number of repartitioning steps).
- RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
-
- /// Number of rows that have been repartitioned.
- RuntimeProfile::Counter* num_row_repartitioned_;
-
- /// Number of partitions that have been repartitioned.
- RuntimeProfile::Counter* num_repartitions_;
-
- /// Number of partitions that have been spilled.
- RuntimeProfile::Counter* num_spilled_partitions_;
-
- /// The largest fraction after repartitioning. This is expected to be
- /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
- RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
-
- /// Time spent in streaming preagg algorithm.
- RuntimeProfile::Counter* streaming_timer_;
-
- /// The number of rows passed through without aggregation.
- RuntimeProfile::Counter* num_passthrough_rows_;
-
- /// The estimated reduction of the preaggregation.
- RuntimeProfile::Counter* preagg_estimated_reduction_;
-
- /// Expose the minimum reduction factor to continue growing the hash tables.
- RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_;
-
- /// The estimated number of input rows from the planner.
- int64_t estimated_input_cardinality_;
-
- /////////////////////////////////////////
- /// BEGIN: Members that must be Reset()
-
- /// Result of aggregation w/o GROUP BY.
- /// Note: can be NULL even if there is no grouping if the result tuple is 0 width
- /// e.g. select 1 from table group by col.
- Tuple* singleton_output_tuple_;
- bool singleton_output_tuple_returned_;
-
- /// Row batch used as argument to GetNext() for the child node preaggregations. Store
- /// in node to avoid reallocating for every GetNext() call when streaming.
- boost::scoped_ptr<RowBatch> child_batch_;
-
- /// If true, no more rows to output from partitions.
- bool partition_eos_;
-
- /// True if no more rows to process from child.
- bool child_eos_;
-
- /// Used for hash-related functionality, such as evaluating rows and calculating hashes.
- /// It also owns the evaluators for the grouping and build expressions used during hash
- /// table insertion and probing.
- boost::scoped_ptr<NewPartitionedHashTableCtx> ht_ctx_;
-
- /// Object pool that holds the Partition objects in hash_partitions_.
- boost::scoped_ptr<ObjectPool> partition_pool_;
-
- /// Current partitions we are partitioning into. IMPALA-5788: For the case where we
- /// rebuild a spilled partition that fits in memory, all pointers in this vector will
- /// point to a single in-memory partition.
- std::vector<Partition*> hash_partitions_;
-
- /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we
- /// rebuild a spilled partition that fits in memory, all pointers in this array will
- /// point to the hash table that is a part of a single in-memory partition.
- NewPartitionedHashTable* hash_tbls_[PARTITION_FANOUT];
-
- /// All partitions that have been spilled and need further processing.
- std::deque<Partition*> spilled_partitions_;
-
- /// All partitions that are aggregated and can just return the results in GetNext().
- /// After consuming all the input, hash_partitions_ is split into spilled_partitions_
- /// and aggregated_partitions_, depending on if it was spilled or not.
- std::deque<Partition*> aggregated_partitions_;
-
- /// END: Members that must be Reset()
- /////////////////////////////////////////
-
- /// The hash table and streams (aggregated and unaggregated) for an individual
- /// partition. The streams of each partition always (i.e. regardless of level)
- /// initially use small buffers. Streaming pre-aggregations do not spill and do not
- /// require an unaggregated stream.
- struct Partition {
- Partition(PartitionedAggregationNode* parent, int level, int idx)
- : parent(parent), is_closed(false), level(level), idx(idx) {}
-
- ~Partition();
-
- /// Initializes aggregated_row_stream and unaggregated_row_stream (if a spilling
- /// aggregation), allocating one buffer for each. Spilling merge aggregations must
- /// have enough reservation for the initial buffer for the stream, so this should
- /// not fail due to OOM. Preaggregations do not reserve any buffers: if does not
- /// have enough reservation for the initial buffer, the aggregated row stream is not
- /// created and an OK status is returned.
- Status InitStreams();
-
- /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL.
- /// Sets 'got_memory' to true if the hash table was initialised or false on OOM.
- Status InitHashTable(bool* got_memory);
-
- /// Called in case we need to serialize aggregated rows. This step effectively does
- /// a merge aggregation in this node.
- Status SerializeStreamForSpilling();
-
- /// Closes this partition. If finalize_rows is true, this iterates over all rows
- /// in aggregated_row_stream and finalizes them (this is only used in the cancellation
- /// path).
- void Close(bool finalize_rows);
-
- /// Spill this partition. 'more_aggregate_rows' = true means that more aggregate rows
- /// may be appended to the the partition before appending unaggregated rows. On
- /// success, one of the streams is left with a write iterator: the aggregated stream
- /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise.
- Status Spill(bool more_aggregate_rows);
-
- bool is_spilled() const { return hash_tbl.get() == NULL; }
-
- PartitionedAggregationNode* parent;
-
- /// If true, this partition is closed and there is nothing left to do.
- bool is_closed;
-
- /// How many times rows in this partition have been repartitioned. Partitions created
- /// from the node's children's input is level 0, 1 after the first repartitionining,
- /// etc.
- const int level;
-
- /// The index of this partition within 'hash_partitions_' at its level.
- const int idx;
-
- /// Hash table for this partition.
- /// Can be NULL if this partition is no longer maintaining a hash table (i.e.
- /// is spilled or we are passing through all rows for this partition).
- boost::scoped_ptr<NewPartitionedHashTable> hash_tbl;
-
- /// Clone of parent's agg_fn_evals_. Permanent allocations come from
- /// 'agg_fn_perm_pool' and result allocations come from the ExecNode's
- /// 'expr_results_pool_'.
- std::vector<NewAggFnEvaluator*> agg_fn_evals;
- boost::scoped_ptr<MemPool> agg_fn_pool;
-
- /// Tuple stream used to store aggregated rows. When the partition is not spilled,
- /// (meaning the hash table is maintained), this stream is pinned and contains the
- /// memory referenced by the hash table. When it is spilled, this consumes reservation
- /// for a write buffer only during repartitioning of aggregated rows.
- ///
- /// For streaming preaggs, this may be NULL if sufficient memory is not available.
- /// In that case hash_tbl is also NULL and all rows for the partition will be passed
- /// through.
- boost::scoped_ptr<BufferedTupleStream3> aggregated_row_stream;
-
- /// Unaggregated rows that are spilled. Always NULL for streaming pre-aggregations.
- /// Always unpinned. Has a write buffer allocated when the partition is spilled and
- /// unaggregated rows are being processed.
- boost::scoped_ptr<BufferedTupleStream3> unaggregated_row_stream;
- };
-
- /// Stream used to store serialized spilled rows. Only used if needs_serialize_
- /// is set. This stream is never pinned and only used in Partition::Spill as a
- /// a temporary buffer.
- boost::scoped_ptr<BufferedTupleStream3> serialize_stream_;
-
- /// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
- NewPartitionedHashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
- NewPartitionedHashTable* ht = hash_tbls_[partition_idx];
- DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get());
- return ht;
- }
-
- /// Materializes 'row_batch' in either grouping or non-grouping case.
- Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
-
- /// Helper function called by GetNextInternal() to ensure that string data referenced in
- /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the
- /// first row that should be processed in 'row_batch'.
- Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx);
-
- /// Copies string data from the specified slot into 'pool', and sets the StringValues'
- /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from
- /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type.
- Status CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch,
- int first_row_idx, MemPool* pool);
-
- /// Constructs singleton output tuple, allocating memory from pool.
- Tuple* ConstructSingletonOutputTuple(
- const std::vector<NewAggFnEvaluator*>& agg_fn_evals, MemPool* pool);
-
- /// Copies grouping values stored in 'ht_ctx_' that were computed over 'current_row_'
- /// using 'grouping_expr_evals_'. Aggregation expr slots are set to their initial
- /// values. Returns NULL if there was not enough memory to allocate the tuple or errors
- /// occurred. In which case, 'status' is set. Allocates tuple and var-len data for
- /// grouping exprs from stream. Var-len data for aggregate exprs is allocated from the
- /// FunctionContexts, so is stored outside the stream. If stream's small buffers get
- /// full, it will attempt to switch to IO-buffers.
- Tuple* ConstructIntermediateTuple(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
- BufferedTupleStream3* stream, Status* status);
-
- /// Constructs intermediate tuple, allocating memory from pool instead of the stream.
- /// Returns NULL and sets status if there is not enough memory to allocate the tuple.
- Tuple* ConstructIntermediateTuple(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
- MemPool* pool, Status* status);
-
- /// Returns the number of bytes of variable-length data for the grouping values stored
- /// in 'ht_ctx_'.
- int GroupingExprsVarlenSize();
-
- /// Initializes intermediate tuple by copying grouping values stored in 'ht_ctx_' that
- /// that were computed over 'current_row_' using 'grouping_expr_evals_'. Writes the
- /// var-len data into buffer. 'buffer' points to the start of a buffer of at least the
- /// size of the variable-length data: 'varlen_size'.
- void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size);
-
- /// Initializes the aggregate function slots of an intermediate tuple.
- /// Any var-len data is allocated from the FunctionContexts.
- void InitAggSlots(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
- Tuple* intermediate_tuple);
-
- /// Updates the given aggregation intermediate tuple with aggregation values computed
- /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls Update() or
- /// Merge() is controlled by the evaluator itself, unless enforced explicitly by passing
- /// in is_merge == true. The override is needed to merge spilled and non-spilled rows
- /// belonging to the same partition independent of whether the agg fn evaluators have
- /// is_merge() == true.
- /// This function is replaced by codegen (which is why we don't use a vector argument
- /// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts.
- void UpdateTuple(NewAggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row,
- bool is_merge = false);
-
- /// Called on the intermediate tuple of each group after all input rows have been
- /// consumed and aggregated. Computes the final aggregate values to be returned in
- /// GetNext() using the agg fn evaluators' Serialize() or Finalize().
- /// For the Finalize() case if the output tuple is different from the intermediate
- /// tuple, then a new tuple is allocated from 'pool' to hold the final result.
- /// Grouping values are copied into the output tuple and the the output tuple holding
- /// the finalized/serialized aggregate values is returned.
- /// TODO: Coordinate the allocation of new tuples with the release of memory
- /// so as not to make memory consumption blow up.
- Tuple* GetOutputTuple(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
- Tuple* tuple, MemPool* pool);
-
- /// Do the aggregation for all tuple rows in the batch when there is no grouping.
- /// This function is replaced by codegen.
- Status ProcessBatchNoGrouping(RowBatch* batch);
-
- /// Processes a batch of rows. This is the core function of the algorithm. We partition
- /// the rows into hash_partitions_, spilling as necessary.
- /// If AGGREGATED_ROWS is true, it means that the rows in the batch are already
- /// pre-aggregated.
- /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
- /// hash table buckets will be prefetched based on the hash values computed. Note
- /// that 'prefetch_mode' will be substituted with constants during codegen time.
- //
- /// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for
- /// performance.
- template<bool AGGREGATED_ROWS>
- Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, NewPartitionedHashTableCtx* ht_ctx);
-
- /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in
- /// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on
- /// the capacity of the cache. 'prefetch_mode' specifies the prefetching mode in use.
- /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes will be
- /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant.
- template<bool AGGREGATED_ROWS>
- void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx, NewPartitionedHashTableCtx* ht_ctx);
-
- /// This function processes each individual row in ProcessBatch(). Must be inlined into
- /// ProcessBatch for codegen to substitute function calls with codegen'd versions.
- /// May spill partitions if not enough memory is available.
- template <bool AGGREGATED_ROWS>
- Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, NewPartitionedHashTableCtx* ht_ctx);
-
- /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is
- /// the context for the partition's hash table and hash is the precomputed hash of
- /// the row. The row can be an unaggregated or aggregated row depending on
- /// AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate
- /// tuple to the partition's stream. Must be inlined into ProcessBatch for codegen
- /// to substitute function calls with codegen'd versions. insert_it is an iterator
- /// for insertion returned from NewPartitionedHashTable::FindBuildRowBucket().
- template<bool AGGREGATED_ROWS>
- Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition,
- TupleRow* row, uint32_t hash, NewPartitionedHashTable::Iterator insert_it);
-
- /// Append a row to a spilled partition. May spill partitions if needed to switch to
- /// I/O buffers. Selects the correct stream according to the argument. Inlined into
- /// ProcessBatch().
- template<bool AGGREGATED_ROWS>
- Status IR_ALWAYS_INLINE AppendSpilledRow(Partition* partition, TupleRow* row);
-
- /// Reads all the rows from input_stream and process them by calling ProcessBatch().
- template<bool AGGREGATED_ROWS>
- Status ProcessStream(BufferedTupleStream3* input_stream);
-
- /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
- void GetSingletonOutput(RowBatch* row_batch);
-
- /// Get rows for the next rowbatch from the next partition. Sets 'partition_eos_' to
- /// true if all rows from all partitions have been returned or the limit is reached.
- Status GetRowsFromPartition(RuntimeState* state, RowBatch* row_batch);
-
- /// Get output rows from child for streaming pre-aggregation. Aggregates some rows with
- /// hash table and passes through other rows converted into the intermediate
- /// tuple format. Sets 'child_eos_' once all rows from child have been returned.
- Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch);
-
- /// Return true if we should keep expanding hash tables in the preagg. If false,
- /// the preagg should pass through any rows it can't fit in its tables.
- bool ShouldExpandPreaggHashTables() const;
-
- /// Streaming processing of in_batch from child. Rows from child are either aggregated
- /// into the hash table or added to 'out_batch' in the intermediate tuple format.
- /// 'in_batch' is processed entirely, and 'out_batch' must have enough capacity to
- /// store all of the rows in 'in_batch'.
- /// 'needs_serialize' is an argument so that codegen can replace it with a constant,
- /// rather than using the member variable 'needs_serialize_'.
- /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
- /// hash table buckets will be prefetched based on the hash values computed. Note
- /// that 'prefetch_mode' will be substituted with constants during codegen time.
- /// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the number of
- /// additional rows that can be added to the hash table per partition. It is updated
- /// by ProcessBatchStreaming() when it inserts new rows.
- /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser.
- Status ProcessBatchStreaming(bool needs_serialize,
- RowBatch* in_batch, RowBatch* out_batch, NewPartitionedHashTableCtx* ht_ctx,
- int remaining_capacity[PARTITION_FANOUT]);
-
- /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming
- /// aggregation. The input row must have been evaluated with 'ht_ctx', with 'hash' set
- /// to the corresponding hash. If the tuple already exists in the hash table, update
- /// the tuple and return true. Otherwise try to create a new entry in the hash table,
- /// returning true if successful or false if the table is full. 'remaining_capacity'
- /// keeps track of how many more entries can be added to the hash table so we can avoid
- /// retrying inserts. It is decremented if an insert succeeds and set to zero if an
- /// insert fails. If an error occurs, returns false and sets 'status'.
- bool IR_ALWAYS_INLINE TryAddToHashTable(NewPartitionedHashTableCtx* ht_ctx,
- Partition* partition, NewPartitionedHashTable* hash_tbl, TupleRow* in_row, uint32_t hash,
- int* remaining_capacity, Status* status);
-
- /// Initializes hash_partitions_. 'level' is the level for the partitions to create.
- /// If 'single_partition_idx' is provided, it must be a number in range
- /// [0, PARTITION_FANOUT), and only that partition is created - all others point to it.
- /// Also sets ht_ctx_'s level to 'level'.
- Status CreateHashPartitions(int level, int single_partition_idx = -1);
-
- /// Ensure that hash tables for all in-memory partitions are large enough to fit
- /// 'num_rows' additional hash table entries. If there is not enough memory to
- /// resize the hash tables, may spill partitions. 'aggregated_rows' is true if
- /// we're currently partitioning aggregated rows.
- Status CheckAndResizeHashPartitions(bool aggregated_rows, int num_rows, const NewPartitionedHashTableCtx* ht_ctx);
-
- /// Prepares the next partition to return results from. On return, this function
- /// initializes output_iterator_ and output_partition_. This either removes
- /// a partition from aggregated_partitions_ (and is done) or removes the next
- /// partition from aggregated_partitions_ and repartitions it.
- Status NextPartition();
-
- /// Tries to build the first partition in 'spilled_partitions_'.
- /// If successful, set *built_partition to the partition. The caller owns the partition
- /// and is responsible for closing it. If unsuccessful because the partition could not
- /// fit in memory, set *built_partition to NULL and append the spilled partition to the
- /// head of 'spilled_partitions_' so it can be processed by
- /// RepartitionSpilledPartition().
- Status BuildSpilledPartition(Partition** built_partition);
-
- /// Repartitions the first partition in 'spilled_partitions_' into PARTITION_FANOUT
- /// output partitions. On success, each output partition is either:
- /// * closed, if no rows were added to the partition.
- /// * in 'spilled_partitions_', if the partition spilled.
- /// * in 'aggregated_partitions_', if the output partition was not spilled.
- Status RepartitionSpilledPartition();
-
- /// Picks a partition from 'hash_partitions_' to spill. 'more_aggregate_rows' is passed
- /// to Partition::Spill() when spilling the partition. See the Partition::Spill()
- /// comment for further explanation.
- Status SpillPartition(bool more_aggregate_rows);
-
- /// Moves the partitions in hash_partitions_ to aggregated_partitions_ or
- /// spilled_partitions_. Partitions moved to spilled_partitions_ are unpinned.
- /// input_rows is the number of input rows that have been repartitioned.
- /// Used for diagnostics.
- Status MoveHashPartitions(int64_t input_rows);
-
- /// Adds a partition to the front of 'spilled_partitions_' for later processing.
- /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed
- /// first). This allows us to delete pages earlier and bottom out the recursion
- /// earlier and also improves time locality of access to spilled data on disk.
- void PushSpilledPartition(Partition* partition);
-
- /// Calls Close() on every Partition in 'aggregated_partitions_',
- /// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists,
- /// the vector and the partition pool.
- void ClosePartitions();
-
- /// Calls finalizes on all tuples starting at 'it'.
- void CleanupHashTbl(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
- NewPartitionedHashTable::Iterator it);
-
- /// Compute minimum buffer reservation for grouping aggregations.
- /// We need one buffer per partition, which is used either as the write buffer for the
- /// aggregated stream or the unaggregated stream. We need an additional buffer to read
- /// the stream we are currently repartitioning. The read buffer needs to be a max-sized
- /// buffer to hold a max-sized row and we need one max-sized write buffer that is used
- /// temporarily to append a row to any stream.
- ///
- /// If we need to serialize, we need an additional buffer while spilling a partition
- /// as the partitions aggregate stream needs to be serialized and rewritten.
- /// We do not spill streaming preaggregations, so we do not need to reserve any buffers.
- int64_t MinReservation() const {
- //DCHECK(!grouping_exprs_.empty());
- // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe.
- //if (is_streaming_preagg_) {
- // Reserve at least one buffer and a 64kb hash table per partition.
- // return (_resource_profile.spillable_buffer_size + 64 * 1024) * PARTITION_FANOUT;
- //}
- //int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
- // Two of the buffers must fit the maximum row.
- //return _resource_profile.spillable_buffer_size * (num_buffers - 2) +
- //_resource_profile.max_row_buffer_size * 2;
- return 0;
- }
+public:
+ PartitionedAggregationNode(ObjectPool* pool,
+ const TPlanNode& tnode, const DescriptorTbl& descs);
+ // a null dtor to pass codestyle check
+ virtual ~PartitionedAggregationNode() {}
+
+ virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
+ virtual Status prepare(RuntimeState* state);
+ virtual Status open(RuntimeState* state);
+ virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
+ virtual Status reset(RuntimeState* state);
+ // virtual void close(RuntimeState* state);
+ virtual Status close(RuntimeState* state);
+
+protected:
+ // Frees local allocations from _aggregate_evaluators and agg_fn_ctxs
+ // virtual Status QueryMaintenance(RuntimeState* state);
+
+ virtual void debug_string(int indentation_level, std::stringstream* out) const;
+
+private:
+ struct Partition;
+
+ // Number of initial partitions to create. Must be a power of 2.
+ static const int PARTITION_FANOUT = 16;
+
+ // Needs to be the log(PARTITION_FANOUT).
+ // We use the upper bits to pick the partition and lower bits in the HT.
+ // TODO: different hash functions here too? We don't need that many bits to pick
+ // the partition so this might be okay.
+ static const int NUM_PARTITIONING_BITS = 4;
+
+ // Maximum number of times we will repartition. The maximum build table we can process
+ // (if we have enough scratch disk space) in case there is no skew is:
+ // MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH).
+ // In the case where there is skew, repartitioning is unlikely to help (assuming a
+ // reasonable hash function).
+ // Note that we need to have at least as many SEED_PRIMES in PartitionedHashTableCtx.
+ // TODO: we can revisit and try harder to explicitly detect skew.
+ static const int MAX_PARTITION_DEPTH = 16;
+
+ // Codegen doesn't allow for automatic Status variables because then exception
+ // handling code is needed to destruct the Status, and our function call substitution
+ // doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by
+ // placing the Status here so exceptions won't need to destruct it.
+ // TODO: fix IMPALA-1948 and remove this.
+ Status _process_batch_status;
+
+ // Tuple into which Update()/Merge()/Serialize() results are stored.
+ TupleId _intermediate_tuple_id;
+ TupleDescriptor* _intermediate_tuple_desc;
+
+ // Row with the intermediate tuple as its only tuple.
+ boost::scoped_ptr<RowDescriptor> _intermediate_row_desc;
+
+ // Tuple into which Finalize() results are stored. Possibly the same as
+ // the intermediate tuple.
+ TupleId _output_tuple_id;
+ TupleDescriptor* _output_tuple_desc;
+
+ // Certain aggregates require a finalize step, which is the final step of the
+ // aggregate after consuming all input rows. The finalize step converts the aggregate
+ // value into its final form. This is true if this node contains aggregate that
+ // requires a finalize step.
+ const bool _needs_finalize;
+
+ // Contains any evaluators that require the serialize step.
+ bool _needs_serialize;
+
+ std::vector<AggFnEvaluator*> _aggregate_evaluators;
+
+ // FunctionContext for each aggregate function and backing MemPool. String data
+ // returned by the aggregate functions is allocated via these contexts.
+ // These contexts are only passed to the evaluators in the non-partitioned
+ // (non-grouping) case. Otherwise they are only used to clone FunctionContexts for the
+ // partitions.
+ // TODO: we really need to plumb through CHAR(N) for intermediate types.
+ std::vector<doris_udf::FunctionContext*> _agg_fn_ctxs;
+ boost::scoped_ptr<MemPool> _agg_fn_pool;
+
+ // Exprs used to evaluate input rows
+ std::vector<ExprContext*> _probe_expr_ctxs;
+
+ // Exprs used to insert constructed aggregation tuple into the hash table.
+ // All the exprs are simply SlotRefs for the intermediate tuple.
+ std::vector<ExprContext*> _build_expr_ctxs;
+
+ // True if the resulting tuple contains var-len agg/grouping values. This
+ // means we need to do more work when allocating and spilling these rows.
+ bool _contains_var_len_grouping_exprs;
+
+ RuntimeState* _state;
+ BufferedBlockMgr2::Client* _block_mgr_client;
+
+ // MemPool used to allocate memory for when we don't have grouping and don't initialize
+ // the partitioning structures, or during close() when creating new output tuples.
+ // For non-grouping aggregations, the ownership of the pool's memory is transferred
+ // to the output batch on eos. The pool should not be Reset() to allow amortizing
+ // memory allocation over a series of Reset()/open()/get_next()* calls.
+ boost::scoped_ptr<MemPool> _mem_pool;
+
+ // The current partition and iterator to the next row in its hash table that we need
+ // to return in get_next()
+ Partition* _output_partition;
+ PartitionedHashTable::Iterator _output_iterator;
+
+ typedef Status (*ProcessRowBatchFn)(
+ PartitionedAggregationNode*, RowBatch*, PartitionedHashTableCtx*);
+ // Jitted ProcessRowBatch function pointer. Null if codegen is disabled.
+ ProcessRowBatchFn _process_row_batch_fn;
+
+ // Time spent processing the child rows
+ RuntimeProfile::Counter* _build_timer;
+
+ // Total time spent resizing hash tables.
+ RuntimeProfile::Counter* _ht_resize_timer;
+
+ // Time spent returning the aggregated rows
+ RuntimeProfile::Counter* _get_results_timer;
+
+ // Total number of hash buckets across all partitions.
+ RuntimeProfile::Counter* _num_hash_buckets;
+
+ // Total number of partitions created.
+ RuntimeProfile::Counter* _partitions_created;
+
+ // Level of max partition (i.e. number of repartitioning steps).
+ // RuntimeProfile::HighWaterMarkCounter* _max_partition_level;
+
+ // Number of rows that have been repartitioned.
+ RuntimeProfile::Counter* _num_row_repartitioned;
+
+ // Number of partitions that have been repartitioned.
+ RuntimeProfile::Counter* _num_repartitions;
+
+ // Number of partitions that have been spilled.
+ RuntimeProfile::Counter* _num_spilled_partitions;
+
+ // The largest fraction after repartitioning. This is expected to be
+ // 1 / PARTITION_FANOUT. A value much larger indicates skew.
+ // RuntimeProfile::HighWaterMarkCounter* _largest_partition_percent;
+
+ ////////////////////////////
+ // BEGIN: Members that must be Reset()
+
+ // Result of aggregation w/o GROUP BY.
+ // Note: can be NULL even if there is no grouping if the result tuple is 0 width
+ // e.g. select 1 from table group by col.
+ Tuple* _singleton_output_tuple;
+ bool _singleton_output_tuple_returned;
+
+ // Used for hash-related functionality, such as evaluating rows and calculating hashes.
+ // TODO: If we want to multi-thread then this context should be thread-local and not
+ // associated with the node.
+ boost::scoped_ptr<PartitionedHashTableCtx> _ht_ctx;
+
+ // Object pool that holds the Partition objects in _hash_partitions.
+ boost::scoped_ptr<ObjectPool> _partition_pool;
+
+ // Current partitions we are partitioning into.
+ std::vector<Partition*> _hash_partitions;
+
+ // All partitions that have been spilled and need further processing.
+ std::list<Partition*> _spilled_partitions;
+
+ // All partitions that are aggregated and can just return the results in get_next().
+ // After consuming all the input, _hash_partitions is split into _spilled_partitions
+ // and _aggregated_partitions, depending on if it was spilled or not.
+ std::list<Partition*> _aggregated_partitions;
+
+ // END: Members that must be Reset()
+ ////////////////////////////
+
+ // The hash table and streams (aggregated and unaggregated) for an individual
+ // partition. The streams of each partition always (i.e. regardless of level)
+ // initially use small buffers.
+ struct Partition {
+ Partition(PartitionedAggregationNode* parent, int level) :
+ parent(parent), is_closed(false), level(level) {}
+
+ // Initializes aggregated_row_stream and unaggregated_row_stream, reserving
+ // one buffer for each. The buffers backing these streams are reserved, so this
+ // function will not fail with a continuable OOM. If we fail to init these buffers,
+ // the mem limit is too low to run this algorithm.
+ Status init_streams();
+
+ // Initializes the hash table. Returns false on OOM.
+ bool init_hash_table();
+
+ // Called in case we need to serialize aggregated rows. This step effectively does
+ // a merge aggregation in this node.
+ Status clean_up();
+
+ // Closes this partition. If finalize_rows is true, this iterates over all rows
+ // in aggregated_row_stream and finalizes them (this is only used in the cancellation
+ // path).
+ void close(bool finalize_rows);
+
+ // Spills this partition, unpinning streams and cleaning up hash tables as necessary.
+ Status spill();
+
+ bool is_spilled() const {
+ return hash_tbl.get() == NULL;
+ }
+
+ PartitionedAggregationNode* parent;
+
+ // If true, this partition is closed and there is nothing left to do.
+ bool is_closed;
+
+ // How many times rows in this partition have been repartitioned. Partitions created
+ // from the node's children's input is level 0, 1 after the first repartitionining,
+ // etc.
+ const int level;
+
+ // Hash table for this partition.
+ // Can be NULL if this partition is no longer maintaining a hash table (i.e.
+ // is spilled).
+ boost::scoped_ptr<PartitionedHashTable> hash_tbl;
+
+ // Clone of parent's _agg_fn_ctxs and backing MemPool.
+ std::vector<doris_udf::FunctionContext*> agg_fn_ctxs;
+ boost::scoped_ptr<MemPool> agg_fn_pool;
+
+ // Tuple stream used to store aggregated rows. When the partition is not spilled,
+ // (meaning the hash table is maintained), this stream is pinned and contains the
+ // memory referenced by the hash table. When it is spilled, aggregate rows are
+ // just appended to this stream.
+ boost::scoped_ptr<BufferedTupleStream2> aggregated_row_stream;
+
+ // Unaggregated rows that are spilled.
+ boost::scoped_ptr<BufferedTupleStream2> unaggregated_row_stream;
+ };
+
+ // Stream used to store serialized spilled rows. Only used if _needs_serialize
+ // is set. This stream is never pinned and only used in Partition::spill as a
+ // a temporary buffer.
+ boost::scoped_ptr<BufferedTupleStream2> _serialize_stream;
+
+ // Allocates a new aggregation intermediate tuple.
+ // Initialized to grouping values computed over '_current_row' using 'agg_fn_ctxs'.
+ // Aggregation expr slots are set to their initial values.
+ // Pool/Stream specify where the memory (tuple and var len slots) should be allocated
+ // from. Only one can be set.
+ // Returns NULL if there was not enough memory to allocate the tuple or an error
+ // occurred. When returning NULL, sets *status. If 'stream' is set and its small
+ // buffers get full, it will attempt to switch to IO-buffers.
+ Tuple* construct_intermediate_tuple(
+ const std::vector<doris_udf::FunctionContext*>& agg_fn_ctxs,
+ MemPool* pool, BufferedTupleStream2* stream, Status* status);
+
+ // Updates the given aggregation intermediate tuple with aggregation values computed
+ // over 'row' using 'agg_fn_ctxs'. Whether the agg fn evaluator calls Update() or
+ // Merge() is controlled by the evaluator itself, unless enforced explicitly by passing
+ // in is_merge == true. The override is needed to merge spilled and non-spilled rows
+ // belonging to the same partition independent of whether the agg fn evaluators have
+ // is_merge() == true.
+ // This function is replaced by codegen (which is why we don't use a vector argument
+ // for agg_fn_ctxs).
+ void update_tuple(doris_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row,
+ bool is_merge = false);
+
+ // Called on the intermediate tuple of each group after all input rows have been
+ // consumed and aggregated. Computes the final aggregate values to be returned in
+ // get_next() using the agg fn evaluators' Serialize() or Finalize().
+ // For the Finalize() case if the output tuple is different from the intermediate
+ // tuple, then a new tuple is allocated from 'pool' to hold the final result.
+ // Grouping values are copied into the output tuple and the the output tuple holding
+ // the finalized/serialized aggregate values is returned.
+ // TODO: Coordinate the allocation of new tuples with the release of memory
+ // so as not to make memory consumption blow up.
+ Tuple* get_output_tuple(const std::vector<doris_udf::FunctionContext*>& agg_fn_ctxs,
+ Tuple* tuple, MemPool* pool);
+
+ // Do the aggregation for all tuple rows in the batch when there is no grouping.
+ // The PartitionedHashTableCtx argument is unused, but included so the signature matches that of
+ // process_batch() for codegen. This function is replaced by codegen.
+ Status process_batch_no_grouping(RowBatch* batch, PartitionedHashTableCtx* ht_ctx = NULL);
+
+ // Processes a batch of rows. This is the core function of the algorithm. We partition
+ // the rows into _hash_partitions, spilling as necessary.
+ // If AGGREGATED_ROWS is true, it means that the rows in the batch are already
+ // pre-aggregated.
+ //
+ // This function is replaced by codegen. It's inlined into ProcessBatch_true/false in
+ // the IR module. We pass in _ht_ctx.get() as an argument for performance.
+ template<bool AGGREGATED_ROWS>
+ Status IR_ALWAYS_INLINE process_batch(RowBatch* batch, PartitionedHashTableCtx* ht_ctx);
+
+ // This function processes each individual row in process_batch(). Must be inlined
+ // into process_batch for codegen to substitute function calls with codegen'd versions.
+ template<bool AGGREGATED_ROWS>
+ Status IR_ALWAYS_INLINE process_row(TupleRow* row, PartitionedHashTableCtx* ht_ctx);
+
+ // Create a new intermediate tuple in partition, initialized with row. ht_ctx is
+ // the context for the partition's hash table and hash is the precomputed hash of
+ // the row. The row can be an unaggregated or aggregated row depending on
+ // AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate
+ // tuple to the partition's stream. Must be inlined into process_batch for codegen to
+ // substitute function calls with codegen'd versions. insert_it is an iterator for
+ // insertion returned from PartitionedHashTable::FindOrInsert().
+ template<bool AGGREGATED_ROWS>
+ Status IR_ALWAYS_INLINE add_intermediate_tuple(Partition* partition,
+ PartitionedHashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, PartitionedHashTable::Iterator insert_it);
+
+ // Append a row to a spilled partition. May spill partitions if needed to switch to
+ // I/O buffers. Selects the correct stream according to the argument. Inlined into
+ // process_batch().
+ template<bool AGGREGATED_ROWS>
+ Status IR_ALWAYS_INLINE append_spilled_row(Partition* partition, TupleRow* row);
+
+ // Append a row to a stream of a spilled partition. May spill partitions if needed
+ // to append the row.
+ Status append_spilled_row(BufferedTupleStream2* stream, TupleRow* row);
+
+ // Reads all the rows from input_stream and process them by calling process_batch().
+ template<bool AGGREGATED_ROWS>
+ Status process_stream(BufferedTupleStream2* input_stream);
+
+ // Initializes _hash_partitions. 'level' is the level for the partitions to create.
+ // Also sets _ht_ctx's level to 'level'.
+ Status create_hash_partitions(int level);
+
+ // Ensure that hash tables for all in-memory partitions are large enough to fit
+ // num_rows additional rows.
+ Status check_and_resize_hash_partitions(int num_rows, PartitionedHashTableCtx* ht_ctx);
+
+ // Iterates over all the partitions in _hash_partitions and returns the number of rows
+ // of the largest spilled partition (in terms of number of aggregated and unaggregated
+ // rows).
+ int64_t largest_spilled_partition() const;
+
+ // Prepares the next partition to return results from. On return, this function
+ // initializes _output_iterator and _output_partition. This either removes
+ // a partition from _aggregated_partitions (and is done) or removes the next
+ // partition from _aggregated_partitions and repartitions it.
+ Status next_partition();
+
+ // Picks a partition from _hash_partitions to spill.
+ Status spill_partition();
+
+ // Moves the partitions in _hash_partitions to _aggregated_partitions or
+ // _spilled_partitions. Partitions moved to _spilled_partitions are unpinned.
+ // input_rows is the number of input rows that have been repartitioned.
+ // Used for diagnostics.
+ Status move_hash_partitions(int64_t input_rows);
+
+ // Calls close() on every Partition in '_aggregated_partitions',
+ // '_spilled_partitions', and '_hash_partitions' and then resets the lists,
+ // the vector and the partition pool.
+ void close_partitions();
+
+ // Calls finalizes on all tuples starting at 'it'.
+ void cleanup_hash_tbl(const std::vector<doris_udf::FunctionContext*>& agg_fn_ctxs,
+ PartitionedHashTable::Iterator it);
+
+ // We need two buffers per partition, one for the aggregated stream and one
+ // for the unaggregated stream. We need an additional buffer to read the stream
+ // we are currently repartitioning.
+ // If we need to serialize, we need an additional buffer while spilling a partition
+ // as the partitions aggregate stream needs to be serialized and rewritten.
+ int min_required_buffers() const {
+ return 2 * PARTITION_FANOUT + 1 + (_needs_serialize ? 1 : 0);
+ }
};
-}
-
-#endif
+} // end namespace doris
+#endif // DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H
diff --git a/be/src/exec/partitioned_aggregation_node_ir.cc b/be/src/exec/partitioned_aggregation_node_ir.cc
index ad5a52e..e9837d5 100644
--- a/be/src/exec/partitioned_aggregation_node_ir.cc
+++ b/be/src/exec/partitioned_aggregation_node_ir.cc
@@ -17,235 +17,127 @@
#include "exec/partitioned_aggregation_node.h"
-#include "exec/new_partitioned_hash_table.inline.h"
-#include "exprs/new_agg_fn_evaluator.h"
-#include "exprs/expr_context.h"
-#include "runtime/buffered_tuple_stream3.inline.h"
+#include "exec/partitioned_hash_table.inline.h"
+#include "runtime/buffered_tuple_stream2.inline.h"
#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
-#include "util/runtime_profile.h"
-using namespace doris;
+namespace doris {
-Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
- Tuple* output_tuple = singleton_output_tuple_;
- FOREACH_ROW(batch, 0, batch_iter) {
- UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.get());
- }
- return Status::OK();
+Status PartitionedAggregationNode::process_batch_no_grouping(
+ RowBatch* batch, PartitionedHashTableCtx* ht_ctx) {
+ for (int i = 0; i < batch->num_rows(); ++i) {
+ update_tuple(&_agg_fn_ctxs[0], _singleton_output_tuple, batch->get_row(i));
+ }
+ return Status::OK();
}
template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
- NewPartitionedHashTableCtx* ht_ctx) {
- DCHECK(!hash_partitions_.empty());
- DCHECK(!is_streaming_preagg_);
-
- // Make sure that no resizes will happen when inserting individual rows to the hash
- // table of each partition by pessimistically assuming that all the rows in each batch
- // will end up to the same partition.
- // TODO: Once we have a histogram with the number of rows per partition, we will have
- // accurate resize calls.
- RETURN_IF_ERROR(CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx));
-
- NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- const int cache_size = expr_vals_cache->capacity();
- const int num_rows = batch->num_rows();
- for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
- EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, ht_ctx);
-
- FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
- RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.get(), ht_ctx));
- expr_vals_cache->NextRow();
+Status PartitionedAggregationNode::process_batch(RowBatch* batch, PartitionedHashTableCtx* ht_ctx) {
+ DCHECK(!_hash_partitions.empty());
+
+ // Make sure that no resizes will happen when inserting individual rows to the hash
+ // table of each partition by pessimistically assuming that all the rows in each batch
+ // will end up to the same partition.
+ // TODO: Once we have a histogram with the number of rows per partition, we will have
+ // accurate resize calls.
+ int num_rows = batch->num_rows();
+ RETURN_IF_ERROR(check_and_resize_hash_partitions(num_rows, ht_ctx));
+
+ for (int i = 0; i < num_rows; ++i) {
+ RETURN_IF_ERROR(process_row<AGGREGATED_ROWS>(batch->get_row(i), ht_ctx));
}
- DCHECK(expr_vals_cache->AtEnd());
- }
- return Status::OK();
+
+ return Status::OK();
}
template<bool AGGREGATED_ROWS>
-void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
- RowBatch* batch, int start_row_idx,
- NewPartitionedHashTableCtx* ht_ctx) {
- NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- const int cache_size = expr_vals_cache->capacity();
-
- expr_vals_cache->Reset();
- FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) {
- TupleRow* row = batch_iter.get();
- bool is_null;
+Status PartitionedAggregationNode::process_row(TupleRow* row, PartitionedHashTableCtx* ht_ctx) {
+ uint32_t hash = 0;
if (AGGREGATED_ROWS) {
- is_null = !ht_ctx->EvalAndHashBuild(row);
+ if (!ht_ctx->eval_and_hash_build(row, &hash)) {
+ return Status::OK();
+ }
} else {
- is_null = !ht_ctx->EvalAndHashProbe(row);
- }
- // Hoist lookups out of non-null branch to speed up non-null case.
- const uint32_t hash = expr_vals_cache->CurExprValuesHash();
- const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- NewPartitionedHashTable* hash_tbl = GetHashTable(partition_idx);
- if (is_null) {
- expr_vals_cache->SetRowNull();
- } else if (config::enable_prefetch) {
- if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<false>(hash);
+ if (!ht_ctx->eval_and_hash_probe(row, &hash)) {
+ return Status::OK();
+ }
}
- expr_vals_cache->NextRow();
- }
- expr_vals_cache->ResetForRead();
-}
+ // To process this row, we first see if it can be aggregated or inserted into this
+ // partition's hash table. If we need to insert it and that fails, due to OOM, we
+ // spill the partition. The partition to spill is not necessarily dst_partition,
+ // so we can try again to insert the row.
+ Partition* dst_partition = _hash_partitions[hash >> (32 - NUM_PARTITIONING_BITS)];
+ if (dst_partition->is_spilled()) {
+ // This partition is already spilled, just append the row.
+ return append_spilled_row<AGGREGATED_ROWS>(dst_partition, row);
+ }
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessRow(TupleRow* row,
- NewPartitionedHashTableCtx* ht_ctx) {
- NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- // Hoist lookups out of non-null branch to speed up non-null case.
- const uint32_t hash = expr_vals_cache->CurExprValuesHash();
- const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- if (expr_vals_cache->IsRowNull()) return Status::OK();
- // To process this row, we first see if it can be aggregated or inserted into this
- // partition's hash table. If we need to insert it and that fails, due to OOM, we
- // spill the partition. The partition to spill is not necessarily dst_partition,
- // so we can try again to insert the row.
- NewPartitionedHashTable* hash_tbl = GetHashTable(partition_idx);
- Partition* dst_partition = hash_partitions_[partition_idx];
- DCHECK(dst_partition != nullptr);
- DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL);
- if (hash_tbl == NULL) {
- // This partition is already spilled, just append the row.
- return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row);
- }
-
- DCHECK(dst_partition->aggregated_row_stream->is_pinned());
- bool found;
- // Find the appropriate bucket in the hash table. There will always be a free
- // bucket because we checked the size above.
- NewPartitionedHashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
- DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
- if (AGGREGATED_ROWS) {
- // If the row is already an aggregate row, it cannot match anything in the
- // hash table since we process the aggregate rows first. These rows should
- // have been aggregated in the initial pass.
- DCHECK(!found);
- } else if (found) {
- // Row is already in hash table. Do the aggregation and we're done.
- UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row);
- return Status::OK();
- }
+ PartitionedHashTable* ht = dst_partition->hash_tbl.get();
+ DCHECK(ht != NULL);
+ DCHECK(dst_partition->aggregated_row_stream->is_pinned());
+ bool found;
+ // Find the appropriate bucket in the hash table. There will always be a free
+ // bucket because we checked the size above.
+ PartitionedHashTable::Iterator it = ht->find_bucket(ht_ctx, hash, &found);
+ DCHECK(!it.at_end()) << "Hash table had no free buckets";
+ if (AGGREGATED_ROWS) {
+ // If the row is already an aggregate row, it cannot match anything in the
+ // hash table since we process the aggregate rows first. These rows should
+ // have been aggregated in the initial pass.
+ DCHECK(!found);
+ } else if (found) {
+ // Row is already in hash table. Do the aggregation and we're done.
+ update_tuple(&dst_partition->agg_fn_ctxs[0], it.get_tuple(), row);
+ return Status::OK();
+ }
- // If we are seeing this result row for the first time, we need to construct the
- // result row and initialize it.
- return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it);
+ // If we are seeing this result row for the first time, we need to construct the
+ // result row and initialize it.
+ return add_intermediate_tuple<AGGREGATED_ROWS>(dst_partition, ht_ctx, row, hash, it);
}
template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AddIntermediateTuple(Partition* partition,
- TupleRow* row, uint32_t hash, NewPartitionedHashTable::Iterator insert_it) {
- while (true) {
- DCHECK(partition->aggregated_row_stream->is_pinned());
- Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
- partition->aggregated_row_stream.get(), &process_batch_status_);
-
- if (LIKELY(intermediate_tuple != NULL)) {
- UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, row, AGGREGATED_ROWS);
- // After copying and initializing the tuple, insert it into the hash table.
- insert_it.SetTuple(intermediate_tuple, hash);
- return Status::OK();
- } else if (!process_batch_status_.ok()) {
- return std::move(process_batch_status_);
- }
-
- // We did not have enough memory to add intermediate_tuple to the stream.
- RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
- if (partition->is_spilled()) {
- return AppendSpilledRow<AGGREGATED_ROWS>(partition, row);
- }
- }
-}
+Status PartitionedAggregationNode::add_intermediate_tuple(
+ Partition* partition,
+ PartitionedHashTableCtx* ht_ctx,
+ TupleRow* row,
+ uint32_t hash,
+ PartitionedHashTable::Iterator insert_it) {
+ while (true) {
+ DCHECK(partition->aggregated_row_stream->is_pinned());
+ Tuple* intermediate_tuple = construct_intermediate_tuple(partition->agg_fn_ctxs,
+ NULL, partition->aggregated_row_stream.get(), &_process_batch_status);
+
+ if (LIKELY(intermediate_tuple != NULL)) {
+ update_tuple(&partition->agg_fn_ctxs[0], intermediate_tuple, row, AGGREGATED_ROWS);
+ // After copying and initializing the tuple, insert it into the hash table.
+ insert_it.set_tuple(intermediate_tuple, hash);
+ return Status::OK();
+ } else if (!_process_batch_status.ok()) {
+ return _process_batch_status;
+ }
-Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
- RowBatch* in_batch, RowBatch* out_batch,
- NewPartitionedHashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
- DCHECK(is_streaming_preagg_);
- DCHECK_EQ(out_batch->num_rows(), 0);
- DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
-
- RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
- NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- const int num_rows = in_batch->num_rows();
- const int cache_size = expr_vals_cache->capacity();
- for (int group_start = 0; group_start < num_rows; group_start += cache_size) {
- EvalAndHashPrefetchGroup<false>(in_batch, group_start, ht_ctx);
-
- FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
- // Hoist lookups out of non-null branch to speed up non-null case.
- TupleRow* in_row = in_batch_iter.get();
- const uint32_t hash = expr_vals_cache->CurExprValuesHash();
- const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- if (!expr_vals_cache->IsRowNull() &&
- !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
- GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx],
- &process_batch_status_)) {
- RETURN_IF_ERROR(std::move(process_batch_status_));
- // Tuple is not going into hash table, add it to the output batch.
- Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_evals_,
- out_batch->tuple_data_pool(), &process_batch_status_);
- if (UNLIKELY(intermediate_tuple == NULL)) {
- DCHECK(!process_batch_status_.ok());
- return std::move(process_batch_status_);
+ // We did not have enough memory to add intermediate_tuple to the stream.
+ RETURN_IF_ERROR(spill_partition());
+ if (partition->is_spilled()) {
+ return append_spilled_row<AGGREGATED_ROWS>(partition, row);
}
- UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row);
- out_batch_iterator.get()->set_tuple(0, intermediate_tuple);
- out_batch_iterator.next();
- out_batch->commit_last_row();
- }
- DCHECK(process_batch_status_.ok());
- expr_vals_cache->NextRow();
}
- DCHECK(expr_vals_cache->AtEnd());
- }
- if (needs_serialize) {
- FOREACH_ROW(out_batch, 0, out_batch_iter) {
- NewAggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.get()->get_tuple(0));
- }
- }
-
- return Status::OK();
}
-bool PartitionedAggregationNode::TryAddToHashTable(
- NewPartitionedHashTableCtx* ht_ctx, Partition* partition,
- NewPartitionedHashTable* hash_tbl, TupleRow* in_row,
- uint32_t hash, int* remaining_capacity, Status* status) {
- DCHECK(remaining_capacity != NULL);
- DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
- DCHECK_GE(*remaining_capacity, 0);
- bool found;
- // This is called from ProcessBatchStreaming() so the rows are not aggregated.
- NewPartitionedHashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
- Tuple* intermediate_tuple;
- if (found) {
- intermediate_tuple = it.GetTuple();
- } else if (*remaining_capacity == 0) {
- return false;
- } else {
- intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
- partition->aggregated_row_stream.get(), status);
- if (LIKELY(intermediate_tuple != NULL)) {
- it.SetTuple(intermediate_tuple, hash);
- --(*remaining_capacity);
- } else {
- // Avoid repeatedly trying to add tuples when under memory pressure.
- *remaining_capacity = 0;
- return false;
- }
- }
- UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row);
- return true;
+template<bool AGGREGATED_ROWS>
+Status PartitionedAggregationNode::append_spilled_row(Partition* partition, TupleRow* row) {
+ DCHECK(partition->is_spilled());
+ BufferedTupleStream2* stream = AGGREGATED_ROWS ?
+ partition->aggregated_row_stream.get() : partition->unaggregated_row_stream.get();
+ return append_spilled_row(stream, row);
}
-// Instantiate required templates.
-template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
- NewPartitionedHashTableCtx*);
-template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
- NewPartitionedHashTableCtx*);
+template Status PartitionedAggregationNode::process_batch<false>(
+ RowBatch*, PartitionedHashTableCtx*);
+template Status PartitionedAggregationNode::process_batch<true>(
+ RowBatch*, PartitionedHashTableCtx*);
+} // end namespace doris
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 7f4ce70..3d96636 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -55,7 +55,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
_tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
- if (config::enable_partitioned_aggregation) {
+ if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) {
_mem_tracker->consume(_tuple_ptrs_size);
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != NULL);
@@ -89,7 +89,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc,
_tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
- if (config::enable_partitioned_aggregation) {
+ if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) {
_mem_tracker->consume(_tuple_ptrs_size);
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
@@ -187,7 +187,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
_tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
- if (config::enable_partitioned_aggregation) {
+ if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) {
_mem_tracker->consume(_tuple_ptrs_size);
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != NULL);
@@ -288,7 +288,7 @@ void RowBatch::clear() {
for (int i = 0; i < _blocks.size(); ++i) {
_blocks[i]->del();
}
- if (config::enable_partitioned_aggregation ) {
+ if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) {
DCHECK(_tuple_ptrs != NULL);
free(_tuple_ptrs);
_mem_tracker->release(_tuple_ptrs_size);
@@ -498,7 +498,7 @@ void RowBatch::reset() {
}
_blocks.clear();
_auxiliary_mem_usage = 0;
- if (!config::enable_partitioned_aggregation) {
+ if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
}
_need_to_return = false;
@@ -590,7 +590,7 @@ void RowBatch::acquire_state(RowBatch* src) {
_num_rows = src->_num_rows;
_capacity = src->_capacity;
_need_to_return = src->_need_to_return;
- if (!config::enable_partitioned_aggregation) {
+ if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) {
// Tuple pointers are allocated from tuple_data_pool_ so are transferred.
_tuple_ptrs = src->_tuple_ptrs;
src->_tuple_ptrs = NULL;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org