You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/07 03:20:21 UTC

[doris] branch dev-1.0.1 updated: [FIX] bottom line solution for vec outer join (#10645)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
     new 0150fabb7b [FIX] bottom line solution for vec outer join (#10645)
0150fabb7b is described below

commit 0150fabb7b8487dbcfb1f56d77444df43a901b5e
Author: starocean999 <40...@users.noreply.github.com>
AuthorDate: Thu Jul 7 11:20:16 2022 +0800

    [FIX] bottom line solution for vec outer join (#10645)
    
    * bottom line solution for vec outer join
    
    * change aggregate_function_changed_flags to optinal and add some comments
    
    * add missing changed file
    
    * fix error of probe_side_output_column introduced by code merge
    
    Co-authored-by: lichi <li...@rateup.com.cn>
---
 be/src/vec/exec/join/vhash_join_node.cpp           | 43 ++++++++++++-----
 be/src/vec/exec/vaggregation_node.cpp              | 54 ++++++++++++++++------
 be/src/vec/exec/vaggregation_node.h                |  7 +--
 be/src/vec/exec/vanalytic_eval_node.cpp            | 11 ++++-
 be/src/vec/functions/function_case.h               |  4 +-
 .../org/apache/doris/analysis/AggregateInfo.java   | 12 +++++
 .../org/apache/doris/planner/AggregationNode.java  |  9 ++--
 .../java/org/apache/doris/planner/Planner.java     | 13 +++---
 gensrc/thrift/PlanNodes.thrift                     |  4 ++
 9 files changed, 115 insertions(+), 42 deletions(-)

diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 596bef712b..25be6c80a9 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -189,10 +189,15 @@ struct ProcessHashTableProbe {
         if constexpr (!is_semi_anti_join || have_other_join_conjunct) {
             if (_build_blocks.size() == 1) {
                 for (int i = 0; i < column_length; i++) {
-                    auto& column = *_build_blocks[0].get_by_position(i).column;
                     if (output_slot_flags[i]) {
+                        auto column = _build_blocks[0].get_by_position(i).column;
+                        if (mcol[i + column_offset]->is_nullable() xor column->is_nullable()) {
+                            DCHECK(mcol[i + column_offset]->is_nullable() &&
+                                   !column->is_nullable());
+                            column = make_nullable(column);
+                        }
                         mcol[i + column_offset]->insert_indices_from(
-                                column, _build_block_rows.data(), _build_block_rows.data() + size);
+                                *column, _build_block_rows.data(), _build_block_rows.data() + size);
                     } else {
                         mcol[i + column_offset]->resize(size);
                     }
@@ -207,17 +212,29 @@ struct ProcessHashTableProbe {
                                     assert_cast<ColumnNullable*>(mcol[i + column_offset].get())
                                             ->insert_join_null_data();
                                 } else {
-                                    auto& column = *_build_blocks[_build_block_offsets[j]]
-                                                            .get_by_position(i)
-                                                            .column;
-                                    mcol[i + column_offset]->insert_from(column,
+                                    auto column = _build_blocks[_build_block_offsets[j]]
+                                                          .get_by_position(i)
+                                                          .column;
+                                    if (mcol[i + column_offset]->is_nullable() xor
+                                        column->is_nullable()) {
+                                        DCHECK(mcol[i + column_offset]->is_nullable() &&
+                                               !column->is_nullable());
+                                        column = make_nullable(column);
+                                    }
+                                    mcol[i + column_offset]->insert_from(*column,
                                                                          _build_block_rows[j]);
                                 }
                             } else {
-                                auto& column = *_build_blocks[_build_block_offsets[j]]
-                                                        .get_by_position(i)
-                                                        .column;
-                                mcol[i + column_offset]->insert_from(column, _build_block_rows[j]);
+                                auto column = _build_blocks[_build_block_offsets[j]]
+                                                      .get_by_position(i)
+                                                      .column;
+                                if (mcol[i + column_offset]->is_nullable() xor
+                                    column->is_nullable()) {
+                                    DCHECK(mcol[i + column_offset]->is_nullable() &&
+                                           !column->is_nullable());
+                                    column = make_nullable(column);
+                                }
+                                mcol[i + column_offset]->insert_from(*column, _build_block_rows[j]);
                             }
                         }
                     } else {
@@ -233,7 +250,11 @@ struct ProcessHashTableProbe {
                                   int size) {
         for (int i = 0; i < output_slot_flags.size(); ++i) {
             if (output_slot_flags[i]) {
-                auto& column = _probe_block.get_by_position(i).column;
+                auto column = _probe_block.get_by_position(i).column;
+                if (mcol[i]->is_nullable() xor column->is_nullable()) {
+                    DCHECK(mcol[i]->is_nullable() && !column->is_nullable());
+                    column = make_nullable(column);
+                }
                 column->replicate(&_items_counts[0], size, *mcol[i]);
             } else {
                 mcol[i]->resize(size);
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 29983cf374..4dd89499a1 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -77,6 +77,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
 AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
                                  const DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
+          _aggregate_evaluators_changed_flags(tnode.agg_node.aggregate_function_changed_flags),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _intermediate_tuple_desc(NULL),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
@@ -225,19 +226,26 @@ Status AggregationNode::prepare(RuntimeState* state) {
 
     int j = _probe_expr_ctxs.size();
     for (int i = 0; i < j; ++i) {
-        auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable();
+        auto nullable_output = _needs_finalize ? _output_tuple_desc->slots()[i]->is_nullable() : _intermediate_tuple_desc->slots()[i]->is_nullable();
         auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
         if (nullable_output != nullable_input) {
             DCHECK(nullable_output);
-            _make_nullable_keys.emplace_back(i);
+            _make_nullable_output_column_pos.emplace_back(i);
         }
     }
+    int probe_expr_count = _probe_expr_ctxs.size();
     for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
         SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
         SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
         RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(),
                                                           _mem_pool.get(), intermediate_slot_desc,
                                                           output_slot_desc, mem_tracker()));
+        auto nullable_output = _needs_finalize ? output_slot_desc->is_nullable() : intermediate_slot_desc->is_nullable();
+        auto nullable_agg_output = _aggregate_evaluators[i]->data_type()->is_nullable();
+        if ( nullable_output != nullable_agg_output) {
+            DCHECK(nullable_output);
+            _make_nullable_output_column_pos.emplace_back(i + probe_expr_count);
+        }
     }
 
     // set profile timer to evaluators
@@ -389,11 +397,11 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
         }
         // pre stream agg need use _num_row_return to decide whether to do pre stream agg
         _num_rows_returned += block->rows();
-        _make_nullable_output_key(block);
+        _make_nullable_output_column(block);
         COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     } else {
         RETURN_IF_ERROR(_executor.get_result(state, block, eos));
-        _make_nullable_output_key(block);
+        _make_nullable_output_column(block);
         // dispose the having clause, should not be execute in prestreaming agg
         RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns()));
         reached_limit(block, eos);
@@ -497,6 +505,9 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
     }
 
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        if (_aggregate_evaluators_changed_flags[i]) {
+            write_binary(true, value_buffer_writers[i]);
+        }
         _aggregate_evaluators[i]->function()->serialize(
                 _agg_data.without_key + _offsets_of_aggregate_states[i], value_buffer_writers[i]);
         value_buffer_writers[i].commit();
@@ -576,13 +587,16 @@ void AggregationNode::_close_without_key() {
     release_tracker();
 }
 
-void AggregationNode::_make_nullable_output_key(Block* block) {
+void AggregationNode::_make_nullable_output_column(Block* block) {
     if (block->rows() != 0) {
-        for (auto cid : _make_nullable_keys) {
-            block->get_by_position(cid).column =
-                    make_nullable(block->get_by_position(cid).column);
-            block->get_by_position(cid).type =
-                    make_nullable(block->get_by_position(cid).type);
+        for (auto cid : _make_nullable_output_column_pos) {
+            if (!block->get_by_position(cid).column->is_nullable()) {
+                block->get_by_position(cid).column =
+                        make_nullable(block->get_by_position(cid).column);
+            }
+            if (!block->get_by_position(cid).type->is_nullable()) {
+                block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type);
+            }
         }
     }
 }
@@ -695,7 +709,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                         // will serialize value data to string column
                         std::vector<VectorBufferWriter> value_buffer_writers;
-                        bool mem_reuse = out_block->mem_reuse();
+                        bool mem_reuse = out_block->mem_reuse() && _make_nullable_output_column_pos.empty();
                         auto serialize_string_type = std::make_shared<DataTypeString>();
                         MutableColumns value_columns;
                         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
@@ -713,6 +727,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                         for (size_t j = 0; j < rows; ++j) {
                             for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                                if (_aggregate_evaluators_changed_flags[i]) {
+                                    write_binary(true, value_buffer_writers[i]);
+                                }
                                 _aggregate_evaluators[i]->function()->serialize(
                                         _streaming_pre_places[j] + _offsets_of_aggregate_states[i],
                                         value_buffer_writers[i]);
@@ -850,14 +867,14 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) {
 
 Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Block* block,
                                                         bool* eos) {
-    bool mem_reuse = block->mem_reuse();
+    bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty();
     auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc());
     int key_size = _probe_expr_ctxs.size();
 
     MutableColumns key_columns;
     for (int i = 0; i < key_size; ++i) {
         if (!mem_reuse) {
-            key_columns.emplace_back(column_withschema[i].type->create_column());
+            key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column());
         } else {
             key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
         }
@@ -865,7 +882,8 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
     MutableColumns value_columns;
     for (int i = key_size; i < column_withschema.size(); ++i) {
         if (!mem_reuse) {
-            value_columns.emplace_back(column_withschema[i].type->create_column());
+            value_columns.emplace_back(
+                    _aggregate_evaluators[i - key_size]->data_type()->create_column());
         } else {
             value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
         }
@@ -932,7 +950,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
     MutableColumns value_columns(agg_size);
     DataTypes value_data_types(agg_size);
 
-    bool mem_reuse = block->mem_reuse();
+    bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty();
 
     MutableColumns key_columns;
     for (int i = 0; i < key_size; ++i) {
@@ -969,6 +987,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
 
                     // serialize values
                     for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                        if (_aggregate_evaluators_changed_flags[i]) {
+                            write_binary(true, value_buffer_writers[i]);
+                        }
                         _aggregate_evaluators[i]->function()->serialize(
                                 mapped + _offsets_of_aggregate_states[i], value_buffer_writers[i]);
                         value_buffer_writers[i].commit();
@@ -984,6 +1005,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                             key_columns[0]->insert_data(nullptr, 0);
                             auto mapped = agg_method.data.get_null_key_data();
                             for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                                if (_aggregate_evaluators_changed_flags[i]) {
+                                    write_binary(true, value_buffer_writers[i]);
+                                }
                                 _aggregate_evaluators[i]->function()->serialize(
                                         mapped + _offsets_of_aggregate_states[i],
                                         value_buffer_writers[i]);
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index c6d6c34b1c..758d11fbdb 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -417,12 +417,13 @@ public:
 private:
     // group by k1,k2
     std::vector<VExprContext*> _probe_expr_ctxs;
-    // left / full join will change the key nullable make output/input solt
+    // left / full join will change the output nullable make output/input solt
     // nullable diff. so we need make nullable of it.
-    std::vector<size_t> _make_nullable_keys;
+    std::vector<size_t> _make_nullable_output_column_pos;
     std::vector<size_t> _probe_key_sz;
 
     std::vector<AggFnEvaluator*> _aggregate_evaluators;
+    std::vector<bool> _aggregate_evaluators_changed_flags;
 
     // may be we don't have to know the tuple id
     TupleId _intermediate_tuple_id;
@@ -462,7 +463,7 @@ private:
     /// the preagg should pass through any rows it can't fit in its tables.
     bool _should_expand_preagg_hash_tables();
 
-    void _make_nullable_output_key(Block* block);
+    void _make_nullable_output_column(Block* block);
 
     Status _create_agg_status(AggregateDataPtr data);
     Status _destroy_agg_status(AggregateDataPtr data);
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 65765d15d3..858e330efe 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -544,7 +544,16 @@ Status VAnalyticEvalNode::_output_current_block(Block* block) {
     }
 
     for (size_t i = 0; i < _result_window_columns.size(); ++i) {
-        block->insert({std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""});
+        SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+        if (output_slot_desc->is_nullable() xor _agg_functions[i]->data_type()->is_nullable()) {
+            DCHECK(output_slot_desc->is_nullable() &&
+                   !_agg_functions[i]->data_type()->is_nullable());
+            block->insert({make_nullable(std::move(_result_window_columns[i])),
+                           make_nullable(_agg_functions[i]->data_type()), ""});
+        } else {
+            block->insert(
+                    {std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""});
+        }
     }
 
     _output_block_index++;
diff --git a/be/src/vec/functions/function_case.h b/be/src/vec/functions/function_case.h
index 47e33f58ff..0b0772583d 100644
--- a/be/src/vec/functions/function_case.h
+++ b/be/src/vec/functions/function_case.h
@@ -172,9 +172,11 @@ public:
                                     .data();
 
                     // simd automatically
+                    // we have to use (bool)cond_raw_data[row_idx] to force the output is 0 or 1
+                    // because in some cases, we might use none-zero values 1 or 2 to indicate the value is null.
                     for (int row_idx = 0; row_idx < rows_count; row_idx++) {
                         then_idx_ptr[row_idx] |=
-                                (!then_idx_ptr[row_idx]) * cond_raw_data[row_idx] * i;
+                                (!then_idx_ptr[row_idx]) * (bool)cond_raw_data[row_idx] * i;
                     }
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index 74facd0a55..b599f36909 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -333,6 +333,18 @@ public final class AggregateInfo extends AggregateInfoBase {
         return result;
     }
 
+    public ArrayList<Boolean> getMaterializedAggregateExprChangedFlags() {
+        ArrayList<Boolean> result = Lists.newArrayList();
+        for (Integer i : materializedSlots_) {
+            if (mergeAggInfo_ != null) {
+                result.add(aggregateExprs_.get(i).isNullable() != mergeAggInfo_.aggregateExprs_.get(i).isNullable());
+            } else {
+                result.add(false);
+            }
+        }
+        return result;
+    }
+
     public AggregateInfo getMergeAggInfo() {
         return mergeAggInfo_;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index e7dee2651c..c19058f1cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -268,16 +268,15 @@ public class AggregationNode extends PlanNode {
         msg.node_type = TPlanNodeType.AGGREGATION_NODE;
         List<TExpr> aggregateFunctions = Lists.newArrayList();
         // only serialize agg exprs that are being materialized
-        for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) {
+        for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
             aggregateFunctions.add(e.treeToThrift());
         }
         msg.agg_node =
-          new TAggregationNode(
-                  aggregateFunctions,
-                  aggInfo.getIntermediateTupleId().asInt(),
-                  aggInfo.getOutputTupleId().asInt(), needsFinalize);
+                new TAggregationNode(aggregateFunctions, aggInfo.getIntermediateTupleId().asInt(),
+                        aggInfo.getOutputTupleId().asInt(), needsFinalize);
         msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
         msg.agg_node.setIsUpdateStage(!aggInfo.isMerge());
+        msg.agg_node.setAggregateFunctionChangedFlags(aggInfo.getMaterializedAggregateExprChangedFlags());
         List<Expr> groupingExprs = aggInfo.getGroupingExprs();
         if (groupingExprs != null) {
             msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 96a3f4f0eb..fda1f82e1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -172,12 +172,13 @@ public class Planner {
             singleNodePlan.convertToVectoriezd();
         }
 
-        if (analyzer.getContext() != null
-                && analyzer.getContext().getSessionVariable().isEnableProjection()
-                && statement instanceof SelectStmt) {
-            ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
-            projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
-        }
+        // disable ProjectPlanner for now because there is some bug to be fixed
+        // if (analyzer.getContext() != null
+        //         && analyzer.getContext().getSessionVariable().isEnableProjection()
+        //         && statement instanceof SelectStmt) {
+        //     ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
+        //     projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
+        // }
 
         if (statement instanceof InsertStmt) {
             InsertStmt insertStmt = (InsertStmt) statement;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index dfe90968cb..8ca90ec29c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -465,6 +465,10 @@ struct TAggregationNode {
   5: required bool need_finalize
   6: optional bool use_streaming_preaggregation
   7: optional bool is_update_stage
+
+  // to support vec outer join, in some case the agg function has different nullable property in serialize and merge phase
+  // we need pass this info to be to make the agg function serialize and deserialize correctly
+  8: optional list<bool> aggregate_function_changed_flags
 }
 
 struct TRepeatNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org