You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/08 03:19:57 UTC
[doris] 03/07: [hotfix](dev-1.0.1) bottom line solution for vec outer join (#10645)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 270daccfba38a341a0d8f758dcacbe5048a495e2
Author: starocean999 <40...@users.noreply.github.com>
AuthorDate: Thu Jul 7 11:20:16 2022 +0800
[hotfix](dev-1.0.1) bottom line solution for vec outer join (#10645)
agg and hash join node should produce nullable type column correctly according to the fe planner
---
be/src/vec/exec/join/vhash_join_node.cpp | 43 ++++++++++++-----
be/src/vec/exec/vaggregation_node.cpp | 54 ++++++++++++++++------
be/src/vec/exec/vaggregation_node.h | 7 +--
be/src/vec/exec/vanalytic_eval_node.cpp | 11 ++++-
be/src/vec/functions/function_case.h | 4 +-
.../org/apache/doris/analysis/AggregateInfo.java | 12 +++++
.../org/apache/doris/planner/AggregationNode.java | 9 ++--
.../java/org/apache/doris/planner/Planner.java | 13 +++---
gensrc/thrift/PlanNodes.thrift | 4 ++
9 files changed, 115 insertions(+), 42 deletions(-)
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 596bef712b..25be6c80a9 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -189,10 +189,15 @@ struct ProcessHashTableProbe {
if constexpr (!is_semi_anti_join || have_other_join_conjunct) {
if (_build_blocks.size() == 1) {
for (int i = 0; i < column_length; i++) {
- auto& column = *_build_blocks[0].get_by_position(i).column;
if (output_slot_flags[i]) {
+ auto column = _build_blocks[0].get_by_position(i).column;
+ if (mcol[i + column_offset]->is_nullable() xor column->is_nullable()) {
+ DCHECK(mcol[i + column_offset]->is_nullable() &&
+ !column->is_nullable());
+ column = make_nullable(column);
+ }
mcol[i + column_offset]->insert_indices_from(
- column, _build_block_rows.data(), _build_block_rows.data() + size);
+ *column, _build_block_rows.data(), _build_block_rows.data() + size);
} else {
mcol[i + column_offset]->resize(size);
}
@@ -207,17 +212,29 @@ struct ProcessHashTableProbe {
assert_cast<ColumnNullable*>(mcol[i + column_offset].get())
->insert_join_null_data();
} else {
- auto& column = *_build_blocks[_build_block_offsets[j]]
- .get_by_position(i)
- .column;
- mcol[i + column_offset]->insert_from(column,
+ auto column = _build_blocks[_build_block_offsets[j]]
+ .get_by_position(i)
+ .column;
+ if (mcol[i + column_offset]->is_nullable() xor
+ column->is_nullable()) {
+ DCHECK(mcol[i + column_offset]->is_nullable() &&
+ !column->is_nullable());
+ column = make_nullable(column);
+ }
+ mcol[i + column_offset]->insert_from(*column,
_build_block_rows[j]);
}
} else {
- auto& column = *_build_blocks[_build_block_offsets[j]]
- .get_by_position(i)
- .column;
- mcol[i + column_offset]->insert_from(column, _build_block_rows[j]);
+ auto column = _build_blocks[_build_block_offsets[j]]
+ .get_by_position(i)
+ .column;
+ if (mcol[i + column_offset]->is_nullable() xor
+ column->is_nullable()) {
+ DCHECK(mcol[i + column_offset]->is_nullable() &&
+ !column->is_nullable());
+ column = make_nullable(column);
+ }
+ mcol[i + column_offset]->insert_from(*column, _build_block_rows[j]);
}
}
} else {
@@ -233,7 +250,11 @@ struct ProcessHashTableProbe {
int size) {
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i]) {
- auto& column = _probe_block.get_by_position(i).column;
+ auto column = _probe_block.get_by_position(i).column;
+ if (mcol[i]->is_nullable() xor column->is_nullable()) {
+ DCHECK(mcol[i]->is_nullable() && !column->is_nullable());
+ column = make_nullable(column);
+ }
column->replicate(&_items_counts[0], size, *mcol[i]);
} else {
mcol[i]->resize(size);
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 29983cf374..4dd89499a1 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -77,6 +77,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
+ _aggregate_evaluators_changed_flags(tnode.agg_node.aggregate_function_changed_flags),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(NULL),
_output_tuple_id(tnode.agg_node.output_tuple_id),
@@ -225,19 +226,26 @@ Status AggregationNode::prepare(RuntimeState* state) {
int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
- auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable();
+ auto nullable_output = _needs_finalize ? _output_tuple_desc->slots()[i]->is_nullable() : _intermediate_tuple_desc->slots()[i]->is_nullable();
auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
if (nullable_output != nullable_input) {
DCHECK(nullable_output);
- _make_nullable_keys.emplace_back(i);
+ _make_nullable_output_column_pos.emplace_back(i);
}
}
+ int probe_expr_count = _probe_expr_ctxs.size();
for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(),
_mem_pool.get(), intermediate_slot_desc,
output_slot_desc, mem_tracker()));
+ auto nullable_output = _needs_finalize ? output_slot_desc->is_nullable() : intermediate_slot_desc->is_nullable();
+ auto nullable_agg_output = _aggregate_evaluators[i]->data_type()->is_nullable();
+ if ( nullable_output != nullable_agg_output) {
+ DCHECK(nullable_output);
+ _make_nullable_output_column_pos.emplace_back(i + probe_expr_count);
+ }
}
// set profile timer to evaluators
@@ -389,11 +397,11 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
}
// pre stream agg need use _num_row_return to decide whether to do pre stream agg
_num_rows_returned += block->rows();
- _make_nullable_output_key(block);
+ _make_nullable_output_column(block);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
} else {
RETURN_IF_ERROR(_executor.get_result(state, block, eos));
- _make_nullable_output_key(block);
+ _make_nullable_output_column(block);
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns()));
reached_limit(block, eos);
@@ -497,6 +505,9 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
}
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ if (_aggregate_evaluators_changed_flags[i]) {
+ write_binary(true, value_buffer_writers[i]);
+ }
_aggregate_evaluators[i]->function()->serialize(
_agg_data.without_key + _offsets_of_aggregate_states[i], value_buffer_writers[i]);
value_buffer_writers[i].commit();
@@ -576,13 +587,16 @@ void AggregationNode::_close_without_key() {
release_tracker();
}
-void AggregationNode::_make_nullable_output_key(Block* block) {
+void AggregationNode::_make_nullable_output_column(Block* block) {
if (block->rows() != 0) {
- for (auto cid : _make_nullable_keys) {
- block->get_by_position(cid).column =
- make_nullable(block->get_by_position(cid).column);
- block->get_by_position(cid).type =
- make_nullable(block->get_by_position(cid).type);
+ for (auto cid : _make_nullable_output_column_pos) {
+ if (!block->get_by_position(cid).column->is_nullable()) {
+ block->get_by_position(cid).column =
+ make_nullable(block->get_by_position(cid).column);
+ }
+ if (!block->get_by_position(cid).type->is_nullable()) {
+ block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type);
+ }
}
}
}
@@ -695,7 +709,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
// will serialize value data to string column
std::vector<VectorBufferWriter> value_buffer_writers;
- bool mem_reuse = out_block->mem_reuse();
+ bool mem_reuse = out_block->mem_reuse() && _make_nullable_output_column_pos.empty();
auto serialize_string_type = std::make_shared<DataTypeString>();
MutableColumns value_columns;
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
@@ -713,6 +727,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
for (size_t j = 0; j < rows; ++j) {
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ if (_aggregate_evaluators_changed_flags[i]) {
+ write_binary(true, value_buffer_writers[i]);
+ }
_aggregate_evaluators[i]->function()->serialize(
_streaming_pre_places[j] + _offsets_of_aggregate_states[i],
value_buffer_writers[i]);
@@ -850,14 +867,14 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) {
Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Block* block,
bool* eos) {
- bool mem_reuse = block->mem_reuse();
+ bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty();
auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc());
int key_size = _probe_expr_ctxs.size();
MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
if (!mem_reuse) {
- key_columns.emplace_back(column_withschema[i].type->create_column());
+ key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column());
} else {
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
}
@@ -865,7 +882,8 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
MutableColumns value_columns;
for (int i = key_size; i < column_withschema.size(); ++i) {
if (!mem_reuse) {
- value_columns.emplace_back(column_withschema[i].type->create_column());
+ value_columns.emplace_back(
+ _aggregate_evaluators[i - key_size]->data_type()->create_column());
} else {
value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
}
@@ -932,7 +950,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
MutableColumns value_columns(agg_size);
DataTypes value_data_types(agg_size);
- bool mem_reuse = block->mem_reuse();
+ bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty();
MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
@@ -969,6 +987,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
// serialize values
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ if (_aggregate_evaluators_changed_flags[i]) {
+ write_binary(true, value_buffer_writers[i]);
+ }
_aggregate_evaluators[i]->function()->serialize(
mapped + _offsets_of_aggregate_states[i], value_buffer_writers[i]);
value_buffer_writers[i].commit();
@@ -984,6 +1005,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
key_columns[0]->insert_data(nullptr, 0);
auto mapped = agg_method.data.get_null_key_data();
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ if (_aggregate_evaluators_changed_flags[i]) {
+ write_binary(true, value_buffer_writers[i]);
+ }
_aggregate_evaluators[i]->function()->serialize(
mapped + _offsets_of_aggregate_states[i],
value_buffer_writers[i]);
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index c6d6c34b1c..758d11fbdb 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -417,12 +417,13 @@ public:
private:
// group by k1,k2
std::vector<VExprContext*> _probe_expr_ctxs;
- // left / full join will change the key nullable make output/input solt
+ // left / full join will change the output nullable make output/input solt
// nullable diff. so we need make nullable of it.
- std::vector<size_t> _make_nullable_keys;
+ std::vector<size_t> _make_nullable_output_column_pos;
std::vector<size_t> _probe_key_sz;
std::vector<AggFnEvaluator*> _aggregate_evaluators;
+ std::vector<bool> _aggregate_evaluators_changed_flags;
// may be we don't have to know the tuple id
TupleId _intermediate_tuple_id;
@@ -462,7 +463,7 @@ private:
/// the preagg should pass through any rows it can't fit in its tables.
bool _should_expand_preagg_hash_tables();
- void _make_nullable_output_key(Block* block);
+ void _make_nullable_output_column(Block* block);
Status _create_agg_status(AggregateDataPtr data);
Status _destroy_agg_status(AggregateDataPtr data);
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 65765d15d3..858e330efe 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -544,7 +544,16 @@ Status VAnalyticEvalNode::_output_current_block(Block* block) {
}
for (size_t i = 0; i < _result_window_columns.size(); ++i) {
- block->insert({std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""});
+ SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+ if (output_slot_desc->is_nullable() xor _agg_functions[i]->data_type()->is_nullable()) {
+ DCHECK(output_slot_desc->is_nullable() &&
+ !_agg_functions[i]->data_type()->is_nullable());
+ block->insert({make_nullable(std::move(_result_window_columns[i])),
+ make_nullable(_agg_functions[i]->data_type()), ""});
+ } else {
+ block->insert(
+ {std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""});
+ }
}
_output_block_index++;
diff --git a/be/src/vec/functions/function_case.h b/be/src/vec/functions/function_case.h
index 47e33f58ff..0b0772583d 100644
--- a/be/src/vec/functions/function_case.h
+++ b/be/src/vec/functions/function_case.h
@@ -172,9 +172,11 @@ public:
.data();
// simd automatically
+ // we have to use (bool)cond_raw_data[row_idx] to force the output is 0 or 1
+ // because in some cases, we might use none-zero values 1 or 2 to indicate the value is null.
for (int row_idx = 0; row_idx < rows_count; row_idx++) {
then_idx_ptr[row_idx] |=
- (!then_idx_ptr[row_idx]) * cond_raw_data[row_idx] * i;
+ (!then_idx_ptr[row_idx]) * (bool)cond_raw_data[row_idx] * i;
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index 74facd0a55..b599f36909 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -333,6 +333,18 @@ public final class AggregateInfo extends AggregateInfoBase {
return result;
}
+ public ArrayList<Boolean> getMaterializedAggregateExprChangedFlags() {
+ ArrayList<Boolean> result = Lists.newArrayList();
+ for (Integer i : materializedSlots_) {
+ if (mergeAggInfo_ != null) {
+ result.add(aggregateExprs_.get(i).isNullable() != mergeAggInfo_.aggregateExprs_.get(i).isNullable());
+ } else {
+ result.add(false);
+ }
+ }
+ return result;
+ }
+
public AggregateInfo getMergeAggInfo() {
return mergeAggInfo_;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index e7dee2651c..c19058f1cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -268,16 +268,15 @@ public class AggregationNode extends PlanNode {
msg.node_type = TPlanNodeType.AGGREGATION_NODE;
List<TExpr> aggregateFunctions = Lists.newArrayList();
// only serialize agg exprs that are being materialized
- for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) {
+ for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
aggregateFunctions.add(e.treeToThrift());
}
msg.agg_node =
- new TAggregationNode(
- aggregateFunctions,
- aggInfo.getIntermediateTupleId().asInt(),
- aggInfo.getOutputTupleId().asInt(), needsFinalize);
+ new TAggregationNode(aggregateFunctions, aggInfo.getIntermediateTupleId().asInt(),
+ aggInfo.getOutputTupleId().asInt(), needsFinalize);
msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
msg.agg_node.setIsUpdateStage(!aggInfo.isMerge());
+ msg.agg_node.setAggregateFunctionChangedFlags(aggInfo.getMaterializedAggregateExprChangedFlags());
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 96a3f4f0eb..fda1f82e1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -172,12 +172,13 @@ public class Planner {
singleNodePlan.convertToVectoriezd();
}
- if (analyzer.getContext() != null
- && analyzer.getContext().getSessionVariable().isEnableProjection()
- && statement instanceof SelectStmt) {
- ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
- projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
- }
+ // disable ProjectPlanner for now because there is some bug to be fixed
+ // if (analyzer.getContext() != null
+ // && analyzer.getContext().getSessionVariable().isEnableProjection()
+ // && statement instanceof SelectStmt) {
+ // ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
+ // projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
+ // }
if (statement instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) statement;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index dfe90968cb..8ca90ec29c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -465,6 +465,10 @@ struct TAggregationNode {
5: required bool need_finalize
6: optional bool use_streaming_preaggregation
7: optional bool is_update_stage
+
+ // to support vec outer join, in some case the agg function has different nullable property in serialize and merge phase
+ // we need pass this info to be to make the agg function serialize and deserialize correctly
+ 8: optional list<bool> aggregate_function_changed_flags
}
struct TRepeatNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org