You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/08 03:19:54 UTC

[doris] branch dev-1.0.1-v20220707 updated (08a0042afc -> 300f5cdbeb)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git


 discard 08a0042afc [tmpfix] rf
 discard 8190c78e53 [tmpfix] runtime filter and slot id
    omit 0150fabb7b [FIX] bottom line solution for vec outer join (#10645)
    omit 8c5e2b7328 fix_runtime_filter_outer_join (#10654)
    omit 7571c2bc8c [FIX] fix colocate join bug (#10651)
    omit e5a15b43b1 [refactor] if pending bytes exceeded, vtableSink wait until pending bytes consumed or task was cancelled (#10644)
     new 46efc851fd [hotfix](dev-1.0.1) if pending bytes exceeded, vtableSink wait until pending bytes consumed or task was cancelled (#10644)
     new 2895d248f4 [hotfix](dev-1.0.1) fix colocate join bug in vec engine after introducing output tuple (#10651)
     new 270daccfba [hotfix](dev-1.0.1) bottom line solution for vec outer join (#10645)
     new 199bb616c6 [fix][vectorized] Fix bug of VInPredicate on date type (#10663)
     new 62605a8f30 [hotfix](dev-1.0.1) Avoid VecNotImplementException for create view operation (#10676)
     new 644dec1bc5 [hotfix](dev-1.0.1) fix planner bug after introducing output tuple for join node
     new 300f5cdbeb [hotfix](dev-1.0.1) Fix Runtime Filter support equivalent slot of outer join (#10669)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (08a0042afc)
            \
             N -- N -- N   refs/heads/dev-1.0.1-v20220707 (300f5cdbeb)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exprs/create_predicate_function.h           | 25 ++++++++++----
 be/src/runtime/primitive_type.h                    | 16 +++++++++
 be/src/vec/functions/in.cpp                        |  8 ++---
 .../org/apache/doris/analysis/CreateViewStmt.java  | 18 +++++++---
 .../date/test_date_in_predicate.out}               |  6 +++-
 .../datatype/date/test_date_in_predicate.groovy    | 40 +++++++++++-----------
 6 files changed, 78 insertions(+), 35 deletions(-)
 copy regression-test/data/{correctness/test_bitmap_serialize.out => datatype/date/test_date_in_predicate.out} (54%)
 copy be/src/exprs/array_functions.h => regression-test/suites/datatype/date/test_date_in_predicate.groovy (54%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/07: [hotfix](dev-1.0.1) fix colocate join bug in vec engine after introducing output tuple (#10651)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2895d248f4d398f2daf1d9ce6f2eb269d37b15a7
Author: starocean999 <40...@users.noreply.github.com>
AuthorDate: Thu Jul 7 11:19:36 2022 +0800

    [hotfix](dev-1.0.1) fix colocate join bug in vec engine after introducing output tuple (#10651)
    
    to support vectorized outer join, we introduced a out tuple for hash join node,
    but it breaks the checking for colocate join.
    To solve this problem, we need map the output slot id to the children's slot id of hash join node,
    and the colocate join can be checked correctly.
    
    * fix colocate join bug
    
    * fix non vec colocate join issue
    
    Co-authored-by: lichi <li...@rateup.com.cn>
---
 .../org/apache/doris/planner/DistributedPlanner.java  |  2 +-
 .../java/org/apache/doris/planner/HashJoinNode.java   | 13 +++++++++++++
 .../main/java/org/apache/doris/planner/PlanNode.java  | 19 +++++++++++++++++--
 3 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 5d7d30aec2..d205e077a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -478,7 +478,7 @@ public class DistributedPlanner {
             return null;
         }
         ScanNode scanNode = planFragment.getPlanRoot()
-                .getScanNodeInOneFragmentByTupleId(slotRef.getDesc().getParent().getId());
+                .getScanNodeInOneFragmentBySlotRef(slotRef);
         if (scanNode == null) {
             cannotReason.add(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA);
             return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 52c623f3e8..25df202fde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -1037,4 +1037,17 @@ public class HashJoinNode extends PlanNode {
         }
         return true;
     }
+
+    SlotRef getMappedInputSlotRef(SlotRef slotRef) {
+        if (vSrcToOutputSMap != null) {
+            Expr mappedExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
+            if (mappedExpr != null && mappedExpr instanceof SlotRef) {
+                return (SlotRef) mappedExpr;
+            } else {
+                return null;
+            }
+        } else {
+            return slotRef;
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 4e57955813..63f28db103 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.ExprId;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.FunctionName;
 import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Function;
@@ -832,12 +833,26 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         return sb.toString();
     }
 
-    public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) {
+    public ScanNode getScanNodeInOneFragmentBySlotRef(SlotRef slotRef) {
+        TupleId tupleId = slotRef.getDesc().getParent().getId();
         if (this instanceof ScanNode && tupleIds.contains(tupleId)) {
             return (ScanNode) this;
+        } else if (this instanceof HashJoinNode) {
+            HashJoinNode hashJoinNode = (HashJoinNode) this;
+            SlotRef inputSlotRef = hashJoinNode.getMappedInputSlotRef(slotRef);
+            if (inputSlotRef != null) {
+                for (PlanNode planNode : children) {
+                    ScanNode scanNode = planNode.getScanNodeInOneFragmentBySlotRef(inputSlotRef);
+                    if (scanNode != null) {
+                        return scanNode;
+                    }
+                }
+            } else {
+                return null;
+            }
         } else if (!(this instanceof ExchangeNode)) {
             for (PlanNode planNode : children) {
-                ScanNode scanNode = planNode.getScanNodeInOneFragmentByTupleId(tupleId);
+                ScanNode scanNode = planNode.getScanNodeInOneFragmentBySlotRef(slotRef);
                 if (scanNode != null) {
                     return scanNode;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 04/07: [fix][vectorized] Fix bug of VInPredicate on date type (#10663)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 199bb616c6d7b0bc0a953c2799527d757dc3df51
Author: Xin Liao <li...@126.com>
AuthorDate: Thu Jul 7 22:15:33 2022 +0800

    [fix][vectorized] Fix bug of VInPredicate on date type (#10663)
---
 be/src/exprs/create_predicate_function.h           | 25 ++++++++++----
 be/src/runtime/primitive_type.h                    | 16 +++++++++
 be/src/vec/functions/in.cpp                        |  8 ++---
 .../data/datatype/date/test_date_in_predicate.out  |  8 +++++
 .../datatype/date/test_date_in_predicate.groovy    | 38 ++++++++++++++++++++++
 5 files changed, 85 insertions(+), 10 deletions(-)

diff --git a/be/src/exprs/create_predicate_function.h b/be/src/exprs/create_predicate_function.h
index 27aef88c92..bd7c8bc6e0 100644
--- a/be/src/exprs/create_predicate_function.h
+++ b/be/src/exprs/create_predicate_function.h
@@ -33,15 +33,23 @@ public:
     };
 };
 
+template <bool is_vec>
 class HybridSetTraits {
 public:
     using BasePtr = HybridSetBase*;
     template <PrimitiveType type>
     static BasePtr get_function([[maybe_unused]] MemTracker* tracker) {
-        using CppType = typename PrimitiveTypeTraits<type>::CppType;
-        using Set = std::conditional_t<std::is_same_v<CppType, StringValue>, StringValueSet,
-                                       HybridSet<CppType>>;
-        return new (std::nothrow) Set();
+        if constexpr (is_vec) {
+            using CppType = typename VecPrimitiveTypeTraits<type>::CppType;
+            using Set = std::conditional_t<std::is_same_v<CppType, StringValue>, StringValueSet,
+                                           HybridSet<CppType>>;
+            return new (std::nothrow) Set();
+        } else {
+            using CppType = typename PrimitiveTypeTraits<type>::CppType;
+            using Set = std::conditional_t<std::is_same_v<CppType, StringValue>, StringValueSet,
+                                           HybridSet<CppType>>;
+            return new (std::nothrow) Set();
+        }
     };
 };
 
@@ -114,11 +122,16 @@ inline auto create_minmax_filter(PrimitiveType type) {
 }
 
 inline auto create_set(PrimitiveType type) {
-    return create_predicate_function<HybridSetTraits>(type);
+    return create_predicate_function<HybridSetTraits<false>>(type);
+}
+
+// used for VInPredicate
+inline auto vec_create_set(PrimitiveType type) {
+    return create_predicate_function<HybridSetTraits<true>>(type);
 }
 
 inline auto create_bloom_filter(MemTracker* tracker, PrimitiveType type) {
     return create_predicate_function<BloomFilterTraits>(type, tracker);
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h
index 03d13b2069..7b4789b0b4 100644
--- a/be/src/runtime/primitive_type.h
+++ b/be/src/runtime/primitive_type.h
@@ -372,6 +372,22 @@ struct PredicatePrimitiveTypeTraits<TYPE_DATETIME> {
     using PredicateFieldType = uint64_t;
 };
 
+// used for VInPredicate. VInPredicate should use vectorized data type
+template <PrimitiveType type>
+struct VecPrimitiveTypeTraits {
+    using CppType = typename PrimitiveTypeTraits<type>::CppType;
+};
+
+template <>
+struct VecPrimitiveTypeTraits<TYPE_DATE> {
+    using CppType = vectorized::VecDateTimeValue;
+};
+
+template <>
+struct VecPrimitiveTypeTraits<TYPE_DATETIME> {
+    using CppType = vectorized::VecDateTimeValue;
+};
+
 } // namespace doris
 
 #endif
diff --git a/be/src/vec/functions/in.cpp b/be/src/vec/functions/in.cpp
index 65232c442e..216165b826 100644
--- a/be/src/vec/functions/in.cpp
+++ b/be/src/vec/functions/in.cpp
@@ -67,8 +67,8 @@ public:
         }
         auto* state = new InState();
         context->set_function_state(scope, state);
-        state->hybrid_set.reset(create_set(convert_type_to_primitive(
-                context->get_arg_type(0)->type)));
+        state->hybrid_set.reset(
+                vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type)));
 
         DCHECK(context->get_num_args() > 1);
         for (int i = 1; i < context->get_num_args(); ++i) {
@@ -135,8 +135,8 @@ public:
                     continue;
                 }
 
-                std::unique_ptr<HybridSetBase> hybrid_set(create_set(convert_type_to_primitive(
-                context->get_arg_type(0)->type)));
+                std::unique_ptr<HybridSetBase> hybrid_set(
+                        vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type)));
                 bool null_in_set = false;
 
                 for (const auto& set_column : set_columns) {
diff --git a/regression-test/data/datatype/date/test_date_in_predicate.out b/regression-test/data/datatype/date/test_date_in_predicate.out
new file mode 100644
index 0000000000..db4eb25976
--- /dev/null
+++ b/regression-test/data/datatype/date/test_date_in_predicate.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !sql1 --
+2	test2	2000-02-02
+
+-- !sql2 --
+1	test1	2000-01-01
+3	test3	2000-03-02
+
diff --git a/regression-test/suites/datatype/date/test_date_in_predicate.groovy b/regression-test/suites/datatype/date/test_date_in_predicate.groovy
new file mode 100644
index 0000000000..6fc5c1dbac
--- /dev/null
+++ b/regression-test/suites/datatype/date/test_date_in_predicate.groovy
@@ -0,0 +1,38 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_date_in_predicate", "datatype") {
+    def tbName = "test_date_in_predicate"
+    sql "DROP TABLE IF EXISTS ${tbName}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName} (
+                c0 int,
+                c1 char(10),
+                c2 date
+            )
+            UNIQUE KEY(c0)
+            DISTRIBUTED BY HASH(c0) BUCKETS 5 properties("replication_num" = "1");
+        """
+    sql "insert into ${tbName} values(1, 'test1', '2000-01-01')"
+    sql "insert into ${tbName} values(2, 'test2', '2000-02-02')"
+    sql "insert into ${tbName} values(3, 'test3', '2000-03-02')"
+
+    qt_sql1 "select * from ${tbName} where c2 in ('2000-02-02')"
+    qt_sql2 "select * from ${tbName} where c2 not in ('2000-02-02')"
+    sql "DROP TABLE ${tbName}"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/07: [hotfix](dev-1.0.1) bottom line solution for vec outer join (#10645)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 270daccfba38a341a0d8f758dcacbe5048a495e2
Author: starocean999 <40...@users.noreply.github.com>
AuthorDate: Thu Jul 7 11:20:16 2022 +0800

    [hotfix](dev-1.0.1) bottom line solution for vec outer join (#10645)
    
    agg and hash join node should produce nullable type column correctly according to the fe planner
---
 be/src/vec/exec/join/vhash_join_node.cpp           | 43 ++++++++++++-----
 be/src/vec/exec/vaggregation_node.cpp              | 54 ++++++++++++++++------
 be/src/vec/exec/vaggregation_node.h                |  7 +--
 be/src/vec/exec/vanalytic_eval_node.cpp            | 11 ++++-
 be/src/vec/functions/function_case.h               |  4 +-
 .../org/apache/doris/analysis/AggregateInfo.java   | 12 +++++
 .../org/apache/doris/planner/AggregationNode.java  |  9 ++--
 .../java/org/apache/doris/planner/Planner.java     | 13 +++---
 gensrc/thrift/PlanNodes.thrift                     |  4 ++
 9 files changed, 115 insertions(+), 42 deletions(-)

diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 596bef712b..25be6c80a9 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -189,10 +189,15 @@ struct ProcessHashTableProbe {
         if constexpr (!is_semi_anti_join || have_other_join_conjunct) {
             if (_build_blocks.size() == 1) {
                 for (int i = 0; i < column_length; i++) {
-                    auto& column = *_build_blocks[0].get_by_position(i).column;
                     if (output_slot_flags[i]) {
+                        auto column = _build_blocks[0].get_by_position(i).column;
+                        if (mcol[i + column_offset]->is_nullable() xor column->is_nullable()) {
+                            DCHECK(mcol[i + column_offset]->is_nullable() &&
+                                   !column->is_nullable());
+                            column = make_nullable(column);
+                        }
                         mcol[i + column_offset]->insert_indices_from(
-                                column, _build_block_rows.data(), _build_block_rows.data() + size);
+                                *column, _build_block_rows.data(), _build_block_rows.data() + size);
                     } else {
                         mcol[i + column_offset]->resize(size);
                     }
@@ -207,17 +212,29 @@ struct ProcessHashTableProbe {
                                     assert_cast<ColumnNullable*>(mcol[i + column_offset].get())
                                             ->insert_join_null_data();
                                 } else {
-                                    auto& column = *_build_blocks[_build_block_offsets[j]]
-                                                            .get_by_position(i)
-                                                            .column;
-                                    mcol[i + column_offset]->insert_from(column,
+                                    auto column = _build_blocks[_build_block_offsets[j]]
+                                                          .get_by_position(i)
+                                                          .column;
+                                    if (mcol[i + column_offset]->is_nullable() xor
+                                        column->is_nullable()) {
+                                        DCHECK(mcol[i + column_offset]->is_nullable() &&
+                                               !column->is_nullable());
+                                        column = make_nullable(column);
+                                    }
+                                    mcol[i + column_offset]->insert_from(*column,
                                                                          _build_block_rows[j]);
                                 }
                             } else {
-                                auto& column = *_build_blocks[_build_block_offsets[j]]
-                                                        .get_by_position(i)
-                                                        .column;
-                                mcol[i + column_offset]->insert_from(column, _build_block_rows[j]);
+                                auto column = _build_blocks[_build_block_offsets[j]]
+                                                      .get_by_position(i)
+                                                      .column;
+                                if (mcol[i + column_offset]->is_nullable() xor
+                                    column->is_nullable()) {
+                                    DCHECK(mcol[i + column_offset]->is_nullable() &&
+                                           !column->is_nullable());
+                                    column = make_nullable(column);
+                                }
+                                mcol[i + column_offset]->insert_from(*column, _build_block_rows[j]);
                             }
                         }
                     } else {
@@ -233,7 +250,11 @@ struct ProcessHashTableProbe {
                                   int size) {
         for (int i = 0; i < output_slot_flags.size(); ++i) {
             if (output_slot_flags[i]) {
-                auto& column = _probe_block.get_by_position(i).column;
+                auto column = _probe_block.get_by_position(i).column;
+                if (mcol[i]->is_nullable() xor column->is_nullable()) {
+                    DCHECK(mcol[i]->is_nullable() && !column->is_nullable());
+                    column = make_nullable(column);
+                }
                 column->replicate(&_items_counts[0], size, *mcol[i]);
             } else {
                 mcol[i]->resize(size);
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 29983cf374..4dd89499a1 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -77,6 +77,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
 AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
                                  const DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
+          _aggregate_evaluators_changed_flags(tnode.agg_node.aggregate_function_changed_flags),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _intermediate_tuple_desc(NULL),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
@@ -225,19 +226,26 @@ Status AggregationNode::prepare(RuntimeState* state) {
 
     int j = _probe_expr_ctxs.size();
     for (int i = 0; i < j; ++i) {
-        auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable();
+        auto nullable_output = _needs_finalize ? _output_tuple_desc->slots()[i]->is_nullable() : _intermediate_tuple_desc->slots()[i]->is_nullable();
         auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
         if (nullable_output != nullable_input) {
             DCHECK(nullable_output);
-            _make_nullable_keys.emplace_back(i);
+            _make_nullable_output_column_pos.emplace_back(i);
         }
     }
+    int probe_expr_count = _probe_expr_ctxs.size();
     for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
         SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
         SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
         RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(),
                                                           _mem_pool.get(), intermediate_slot_desc,
                                                           output_slot_desc, mem_tracker()));
+        auto nullable_output = _needs_finalize ? output_slot_desc->is_nullable() : intermediate_slot_desc->is_nullable();
+        auto nullable_agg_output = _aggregate_evaluators[i]->data_type()->is_nullable();
+        if ( nullable_output != nullable_agg_output) {
+            DCHECK(nullable_output);
+            _make_nullable_output_column_pos.emplace_back(i + probe_expr_count);
+        }
     }
 
     // set profile timer to evaluators
@@ -389,11 +397,11 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
         }
         // pre stream agg need use _num_row_return to decide whether to do pre stream agg
         _num_rows_returned += block->rows();
-        _make_nullable_output_key(block);
+        _make_nullable_output_column(block);
         COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     } else {
         RETURN_IF_ERROR(_executor.get_result(state, block, eos));
-        _make_nullable_output_key(block);
+        _make_nullable_output_column(block);
         // dispose the having clause, should not be execute in prestreaming agg
         RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns()));
         reached_limit(block, eos);
@@ -497,6 +505,9 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
     }
 
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        if (_aggregate_evaluators_changed_flags[i]) {
+            write_binary(true, value_buffer_writers[i]);
+        }
         _aggregate_evaluators[i]->function()->serialize(
                 _agg_data.without_key + _offsets_of_aggregate_states[i], value_buffer_writers[i]);
         value_buffer_writers[i].commit();
@@ -576,13 +587,16 @@ void AggregationNode::_close_without_key() {
     release_tracker();
 }
 
-void AggregationNode::_make_nullable_output_key(Block* block) {
+void AggregationNode::_make_nullable_output_column(Block* block) {
     if (block->rows() != 0) {
-        for (auto cid : _make_nullable_keys) {
-            block->get_by_position(cid).column =
-                    make_nullable(block->get_by_position(cid).column);
-            block->get_by_position(cid).type =
-                    make_nullable(block->get_by_position(cid).type);
+        for (auto cid : _make_nullable_output_column_pos) {
+            if (!block->get_by_position(cid).column->is_nullable()) {
+                block->get_by_position(cid).column =
+                        make_nullable(block->get_by_position(cid).column);
+            }
+            if (!block->get_by_position(cid).type->is_nullable()) {
+                block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type);
+            }
         }
     }
 }
@@ -695,7 +709,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                         // will serialize value data to string column
                         std::vector<VectorBufferWriter> value_buffer_writers;
-                        bool mem_reuse = out_block->mem_reuse();
+                        bool mem_reuse = out_block->mem_reuse() && _make_nullable_output_column_pos.empty();
                         auto serialize_string_type = std::make_shared<DataTypeString>();
                         MutableColumns value_columns;
                         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
@@ -713,6 +727,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                         for (size_t j = 0; j < rows; ++j) {
                             for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                                if (_aggregate_evaluators_changed_flags[i]) {
+                                    write_binary(true, value_buffer_writers[i]);
+                                }
                                 _aggregate_evaluators[i]->function()->serialize(
                                         _streaming_pre_places[j] + _offsets_of_aggregate_states[i],
                                         value_buffer_writers[i]);
@@ -850,14 +867,14 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) {
 
 Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Block* block,
                                                         bool* eos) {
-    bool mem_reuse = block->mem_reuse();
+    bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty();
     auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc());
     int key_size = _probe_expr_ctxs.size();
 
     MutableColumns key_columns;
     for (int i = 0; i < key_size; ++i) {
         if (!mem_reuse) {
-            key_columns.emplace_back(column_withschema[i].type->create_column());
+            key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column());
         } else {
             key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
         }
@@ -865,7 +882,8 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
     MutableColumns value_columns;
     for (int i = key_size; i < column_withschema.size(); ++i) {
         if (!mem_reuse) {
-            value_columns.emplace_back(column_withschema[i].type->create_column());
+            value_columns.emplace_back(
+                    _aggregate_evaluators[i - key_size]->data_type()->create_column());
         } else {
             value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
         }
@@ -932,7 +950,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
     MutableColumns value_columns(agg_size);
     DataTypes value_data_types(agg_size);
 
-    bool mem_reuse = block->mem_reuse();
+    bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty();
 
     MutableColumns key_columns;
     for (int i = 0; i < key_size; ++i) {
@@ -969,6 +987,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
 
                     // serialize values
                     for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                        if (_aggregate_evaluators_changed_flags[i]) {
+                            write_binary(true, value_buffer_writers[i]);
+                        }
                         _aggregate_evaluators[i]->function()->serialize(
                                 mapped + _offsets_of_aggregate_states[i], value_buffer_writers[i]);
                         value_buffer_writers[i].commit();
@@ -984,6 +1005,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                             key_columns[0]->insert_data(nullptr, 0);
                             auto mapped = agg_method.data.get_null_key_data();
                             for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                                if (_aggregate_evaluators_changed_flags[i]) {
+                                    write_binary(true, value_buffer_writers[i]);
+                                }
                                 _aggregate_evaluators[i]->function()->serialize(
                                         mapped + _offsets_of_aggregate_states[i],
                                         value_buffer_writers[i]);
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index c6d6c34b1c..758d11fbdb 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -417,12 +417,13 @@ public:
 private:
     // group by k1,k2
     std::vector<VExprContext*> _probe_expr_ctxs;
-    // left / full join will change the key nullable make output/input solt
+    // left / full join will change the output nullable make output/input solt
     // nullable diff. so we need make nullable of it.
-    std::vector<size_t> _make_nullable_keys;
+    std::vector<size_t> _make_nullable_output_column_pos;
     std::vector<size_t> _probe_key_sz;
 
     std::vector<AggFnEvaluator*> _aggregate_evaluators;
+    std::vector<bool> _aggregate_evaluators_changed_flags;
 
     // may be we don't have to know the tuple id
     TupleId _intermediate_tuple_id;
@@ -462,7 +463,7 @@ private:
     /// the preagg should pass through any rows it can't fit in its tables.
     bool _should_expand_preagg_hash_tables();
 
-    void _make_nullable_output_key(Block* block);
+    void _make_nullable_output_column(Block* block);
 
     Status _create_agg_status(AggregateDataPtr data);
     Status _destroy_agg_status(AggregateDataPtr data);
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 65765d15d3..858e330efe 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -544,7 +544,16 @@ Status VAnalyticEvalNode::_output_current_block(Block* block) {
     }
 
     for (size_t i = 0; i < _result_window_columns.size(); ++i) {
-        block->insert({std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""});
+        SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+        if (output_slot_desc->is_nullable() xor _agg_functions[i]->data_type()->is_nullable()) {
+            DCHECK(output_slot_desc->is_nullable() &&
+                   !_agg_functions[i]->data_type()->is_nullable());
+            block->insert({make_nullable(std::move(_result_window_columns[i])),
+                           make_nullable(_agg_functions[i]->data_type()), ""});
+        } else {
+            block->insert(
+                    {std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""});
+        }
     }
 
     _output_block_index++;
diff --git a/be/src/vec/functions/function_case.h b/be/src/vec/functions/function_case.h
index 47e33f58ff..0b0772583d 100644
--- a/be/src/vec/functions/function_case.h
+++ b/be/src/vec/functions/function_case.h
@@ -172,9 +172,11 @@ public:
                                     .data();
 
                     // simd automatically
+                    // we have to use (bool)cond_raw_data[row_idx] to force the output is 0 or 1
+                    // because in some cases, we might use none-zero values 1 or 2 to indicate the value is null.
                     for (int row_idx = 0; row_idx < rows_count; row_idx++) {
                         then_idx_ptr[row_idx] |=
-                                (!then_idx_ptr[row_idx]) * cond_raw_data[row_idx] * i;
+                                (!then_idx_ptr[row_idx]) * (bool)cond_raw_data[row_idx] * i;
                     }
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index 74facd0a55..b599f36909 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -333,6 +333,18 @@ public final class AggregateInfo extends AggregateInfoBase {
         return result;
     }
 
+    public ArrayList<Boolean> getMaterializedAggregateExprChangedFlags() {
+        ArrayList<Boolean> result = Lists.newArrayList();
+        for (Integer i : materializedSlots_) {
+            if (mergeAggInfo_ != null) {
+                result.add(aggregateExprs_.get(i).isNullable() != mergeAggInfo_.aggregateExprs_.get(i).isNullable());
+            } else {
+                result.add(false);
+            }
+        }
+        return result;
+    }
+
     public AggregateInfo getMergeAggInfo() {
         return mergeAggInfo_;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index e7dee2651c..c19058f1cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -268,16 +268,15 @@ public class AggregationNode extends PlanNode {
         msg.node_type = TPlanNodeType.AGGREGATION_NODE;
         List<TExpr> aggregateFunctions = Lists.newArrayList();
         // only serialize agg exprs that are being materialized
-        for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) {
+        for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
             aggregateFunctions.add(e.treeToThrift());
         }
         msg.agg_node =
-          new TAggregationNode(
-                  aggregateFunctions,
-                  aggInfo.getIntermediateTupleId().asInt(),
-                  aggInfo.getOutputTupleId().asInt(), needsFinalize);
+                new TAggregationNode(aggregateFunctions, aggInfo.getIntermediateTupleId().asInt(),
+                        aggInfo.getOutputTupleId().asInt(), needsFinalize);
         msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
         msg.agg_node.setIsUpdateStage(!aggInfo.isMerge());
+        msg.agg_node.setAggregateFunctionChangedFlags(aggInfo.getMaterializedAggregateExprChangedFlags());
         List<Expr> groupingExprs = aggInfo.getGroupingExprs();
         if (groupingExprs != null) {
             msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 96a3f4f0eb..fda1f82e1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -172,12 +172,13 @@ public class Planner {
             singleNodePlan.convertToVectoriezd();
         }
 
-        if (analyzer.getContext() != null
-                && analyzer.getContext().getSessionVariable().isEnableProjection()
-                && statement instanceof SelectStmt) {
-            ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
-            projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
-        }
+        // disable ProjectPlanner for now because there is some bug to be fixed
+        // if (analyzer.getContext() != null
+        //         && analyzer.getContext().getSessionVariable().isEnableProjection()
+        //         && statement instanceof SelectStmt) {
+        //     ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
+        //     projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
+        // }
 
         if (statement instanceof InsertStmt) {
             InsertStmt insertStmt = (InsertStmt) statement;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index dfe90968cb..8ca90ec29c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -465,6 +465,10 @@ struct TAggregationNode {
   5: required bool need_finalize
   6: optional bool use_streaming_preaggregation
   7: optional bool is_update_stage
+
+  // to support vec outer join, in some case the agg function has different nullable property in serialize and merge phase
+  // we need pass this info to be to make the agg function serialize and deserialize correctly
+  8: optional list<bool> aggregate_function_changed_flags
 }
 
 struct TRepeatNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/07: [hotfix](dev-1.0.1) if pending bytes exceeded, vtableSink wait until pending bytes consumed or task was cancelled (#10644)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 46efc851fd1113207a8fb0985888cd96972dbac4
Author: minghong <mi...@163.com>
AuthorDate: Wed Jul 6 17:52:10 2022 +0800

    [hotfix](dev-1.0.1) if pending bytes exceeded, vtableSink wait until pending bytes consumed or task was cancelled (#10644)
    
    Too avoid some OOM problem in load operation.
---
 be/src/common/config.h           |  4 ----
 be/src/exec/tablet_sink.cpp      |  6 ++----
 be/src/vec/sink/vtablet_sink.cpp | 24 +++++++-----------------
 3 files changed, 9 insertions(+), 25 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 3174b445c2..2f956e3b8d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -720,10 +720,6 @@ CONF_Int32(quick_compaction_max_rows, "1000");
 CONF_Int32(quick_compaction_batch_size, "10");
 // do compaction min rowsets
 CONF_Int32(quick_compaction_min_rowsets, "10");
-
-//memory limitation for batches in pending queue, default 500M
-CONF_Int64(table_sink_pending_bytes_limitation, "524288000");
-
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index adb44a5843..2d9733e9bf 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -260,8 +260,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
     // But there is still some unfinished things, we do mem limit here temporarily.
     // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
     // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
-    while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes || _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) &&
-           _pending_batches_num > 0) {
+    while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
     }
@@ -310,8 +309,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
     // But there is still some unfinished things, we do mem limit here temporarily.
     // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
     // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
-    while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes || _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) &&
-           _pending_batches_num > 0) {
+    while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
     }
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index c90e3b65b7..f0239b23c6 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -58,9 +58,8 @@ Status VOlapTableSink::open(RuntimeState* state) {
 
 size_t VOlapTableSink::get_pending_bytes() const {
     size_t mem_consumption = 0;
-    for (auto& indexChannel : _channels){
+    for (auto& indexChannel : _channels) {
         mem_consumption += indexChannel->get_pending_bytes();
-        
     }
     return mem_consumption;
 }
@@ -116,20 +115,10 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
     if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
         _partition_to_tablet_map.clear();
     }
-    
-    //if pending bytes is more than table_sink_pending_bytes_limitation, wait at most 1 min
-    size_t MAX_PENDING_BYTES = config::table_sink_pending_bytes_limitation;
-    constexpr int max_retry = 120;
-    int retry = 0;
-    while (get_pending_bytes() > MAX_PENDING_BYTES && retry++ < max_retry) {
-        std::this_thread::sleep_for(std::chrono::microseconds(500));
-    }
-    if (get_pending_bytes() > MAX_PENDING_BYTES) {
-        std::stringstream str;
-        str << "Load task " << _load_id
-            << ": pending bytes exceed limit (config::table_sink_pending_bytes_limitation):"
-            << MAX_PENDING_BYTES;
-        return Status::MemoryLimitExceeded(str.str());
+
+    size_t MAX_PENDING_BYTES = _load_mem_limit / 3;
+    while (get_pending_bytes() > MAX_PENDING_BYTES && !state->is_cancelled()) {
+        std::this_thread::sleep_for(std::chrono::microseconds(100));
     }
 
     for (int i = 0; i < num_rows; ++i) {
@@ -140,7 +129,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
         uint32_t tablet_index = 0;
         block_row = {&block, i};
         if (!_vpartition->find_partition(&block_row, &partition)) {
-            RETURN_IF_ERROR(state->append_error_msg_to_file([]() -> std::string { return ""; },
+            RETURN_IF_ERROR(state->append_error_msg_to_file(
+                    []() -> std::string { return ""; },
                     [&]() -> std::string {
                         fmt::memory_buffer buf;
                         fmt::format_to(buf, "no partition for this tuple. tuple=[]");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 07/07: [hotfix](dev-1.0.1) Fix Runtime Filter support equivalent slot of outer join (#10669)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 300f5cdbeba0258e103f45172f87c3a047c324f7
Author: Kidd <10...@users.noreply.github.com>
AuthorDate: Thu Jul 7 11:19:51 2022 +0800

    [hotfix](dev-1.0.1) Fix Runtime Filter support equivalent slot of outer join (#10669)
    
    This bug may cause some runtime filter not generated
---
 .../src/main/java/org/apache/doris/analysis/Analyzer.java | 15 ++++++++++++++-
 .../org/apache/doris/analysis/ExprSubstitutionMap.java    |  9 ++++++++-
 .../main/java/org/apache/doris/planner/HashJoinNode.java  |  2 +-
 3 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 26bf81e5b2..74b8b674e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -181,6 +181,12 @@ public class Analyzer {
 
     public String getTimezone() { return timezone; }
 
+    public void putEquivalentSlot(SlotId src_sid, SlotId target_sid) { globalState.equivalentSlots.put(src_sid, target_sid); }
+
+    public SlotId getEquivalentSlot(SlotId src_sid) { return globalState.equivalentSlots.get(src_sid); }
+
+    public boolean containEquivalentSlot(SlotId src_sid) { return globalState.equivalentSlots.containsKey(src_sid); }
+
     public void putAssignedRuntimeFilter(RuntimeFilter rf) { assignedRuntimeFilters.add(rf); }
 
     public List<RuntimeFilter> getAssignedRuntimeFilter() { return assignedRuntimeFilters; }
@@ -313,6 +319,8 @@ public class Analyzer {
 
         private final long autoBroadcastJoinThreshold;
 
+        private final Map<SlotId, SlotId> equivalentSlots = Maps.newHashMap();
+
         public GlobalState(Catalog catalog, ConnectContext context) {
             this.catalog = catalog;
             this.context = context;
@@ -2141,7 +2149,7 @@ public class Analyzer {
      * TODO(zxy) Use value-transfer graph to check
      */
     public boolean hasValueTransfer(SlotId a, SlotId b) {
-        return a.equals(b);
+        return getValueTransferTargets(a).contains(b);
     }
 
     /**
@@ -2153,6 +2161,11 @@ public class Analyzer {
     public List<SlotId> getValueTransferTargets(SlotId srcSid) {
         List<SlotId> result = new ArrayList<>();
         result.add(srcSid);
+        SlotId equalSlot = srcSid;
+        while(containEquivalentSlot(equalSlot)) {
+            result.add(getEquivalentSlot(equalSlot));
+            equalSlot = getEquivalentSlot(equalSlot);
+        }
         return result;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
index aee8142908..e0df525ce5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
@@ -18,6 +18,7 @@
 package org.apache.doris.analysis;
 
 import java.util.List;
+import java.util.Objects;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -156,7 +157,7 @@ public final class ExprSubstitutionMap {
      * f [A.id, B.id] g [A.id, C.id]
      * return: g-f [B,id, C,id]
      */
-    public static ExprSubstitutionMap subtraction(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+    public static ExprSubstitutionMap subtraction(ExprSubstitutionMap f, ExprSubstitutionMap g, Analyzer analyzer) {
         if (f == null && g == null) {
             return new ExprSubstitutionMap();
         }
@@ -170,8 +171,14 @@ public final class ExprSubstitutionMap {
         for (int i = 0; i < g.size(); i++) {
             if (f.containsMappingFor(g.lhs_.get(i))) {
                 result.put(f.get(g.lhs_.get(i)), g.rhs_.get(i));
+                if (f.get(g.lhs_.get(i)) instanceof SlotRef && g.rhs_.get(i) instanceof SlotRef) {
+                    analyzer.putEquivalentSlot(((SlotRef) g.rhs_.get(i)).getSlotId(), ((SlotRef) Objects.requireNonNull(f.get(g.lhs_.get(i)))).getSlotId());
+                }
             } else {
                 result.put(g.lhs_.get(i), g.rhs_.get(i));
+                if (g.lhs_.get(i) instanceof SlotRef && g.rhs_.get(i) instanceof SlotRef) {
+                    analyzer.putEquivalentSlot(((SlotRef) g.rhs_.get(i)).getSlotId(), ((SlotRef) g.lhs_.get(i)).getSlotId());
+                }
             }
         }
         return result;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 9c1436af90..16e82b644f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -398,7 +398,7 @@ public class HashJoinNode extends PlanNode {
             }
         }
         // 2. compute srcToOutputMap
-        vSrcToOutputSMap = ExprSubstitutionMap.subtraction(outputSmap, srcTblRefToOutputTupleSmap);
+        vSrcToOutputSMap = ExprSubstitutionMap.subtraction(outputSmap, srcTblRefToOutputTupleSmap, analyzer);
         for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
             Preconditions.checkState(vSrcToOutputSMap.getRhs().get(i) instanceof SlotRef);
             SlotRef rSlotRef = (SlotRef) vSrcToOutputSMap.getRhs().get(i);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 05/07: [hotfix](dev-1.0.1) Avoid VecNotImplementException for create view operation (#10676)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 62605a8f30c7c7ef34b369fec7e5552b5e532e4a
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Jul 7 18:51:12 2022 +0800

    [hotfix](dev-1.0.1) Avoid VecNotImplementException for create view operation (#10676)
---
 .../java/org/apache/doris/analysis/CreateViewStmt.java | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateViewStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateViewStmt.java
index cc32ab7dec..26faab87b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateViewStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateViewStmt.java
@@ -23,9 +23,9 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 
 import com.google.common.base.Strings;
-
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -58,14 +58,22 @@ public class CreateViewStmt extends BaseViewStmt {
         viewDefStmt.setNeedToSql(true);
 
         // check privilege
-        if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), tableName.getDb(),
-                tableName.getTbl(), PrivPredicate.CREATE)) {
+        if (!Catalog.getCurrentCatalog().getAuth()
+                .checkTblPriv(ConnectContext.get(), tableName.getDb(), tableName.getTbl(), PrivPredicate.CREATE)) {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CREATE");
         }
 
-        // Do not rewrite nondeterministic functions to constant in create view's def stmt
+        boolean originEnableVec = true;
         if (ConnectContext.get() != null) {
+            // Do not rewrite nondeterministic functions to constant in create view's def stmt
             ConnectContext.get().setNotEvalNondeterministicFunction(true);
+            // Because in current v1.1, the vec engine do not support some of outer join sql.
+            // So it we set enable_vectorized_engine = true, it may throw VecNotImplementExcetion.
+            // But it is not necessary because here we only neet to pass the analysis phase,
+            // So here we temporarily set enable_vectorized_engine = false to avoid this expcetion.
+            SessionVariable sv = ConnectContext.get().getSessionVariable();
+            originEnableVec = sv.enableVectorizedEngine;
+            sv.setEnableVectorizedEngine(false);
         }
         try {
             if (cols != null) {
@@ -82,6 +90,8 @@ public class CreateViewStmt extends BaseViewStmt {
             // will not do constant fold for nondeterministic functions.
             if (ConnectContext.get() != null) {
                 ConnectContext.get().setNotEvalNondeterministicFunction(false);
+                SessionVariable sv = ConnectContext.get().getSessionVariable();
+                sv.setEnableVectorizedEngine(originEnableVec);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 06/07: [hotfix](dev-1.0.1) fix planner bug after introducing output tuple for join node

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1-v20220707
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 644dec1bc5b2e6914e6298c66afe831307105908
Author: morningman <mo...@163.com>
AuthorDate: Fri Jul 8 11:16:48 2022 +0800

    [hotfix](dev-1.0.1) fix planner bug after introducing output tuple for join node
    
    Authored by EmmyMiao87
---
 .../main/java/org/apache/doris/analysis/ExprSubstitutionMap.java | 9 ++++++++-
 .../src/main/java/org/apache/doris/planner/HashJoinNode.java     | 5 ++++-
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
index 062eef4df8..aee8142908 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
@@ -182,7 +182,7 @@ public final class ExprSubstitutionMap {
      * f [A.id, B.id] [A.name, B.name] g [A.id, C.id] [A.age, C.age]
      * return: [A.id, C,id] [A.name, B.name] [A.age, C.age]
      */
-    public static ExprSubstitutionMap combineAndReplace(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+    public static ExprSubstitutionMap composeAndReplace(ExprSubstitutionMap f, ExprSubstitutionMap g) {
         if (f == null && g == null) {
             return new ExprSubstitutionMap();
         }
@@ -194,6 +194,13 @@ public final class ExprSubstitutionMap {
         }
         ExprSubstitutionMap result = new ExprSubstitutionMap();
         result = ExprSubstitutionMap.combine(result, g);
+        for (int i = 0; i < g.size(); i++) {
+            // case a->b, b->c => a->c
+            if (f.mappingForRhsExpr(g.getLhs().get(i)) != null) {
+                result.getLhs().set(i, f.mappingForRhsExpr(g.getLhs().get(i)));
+            }
+        }
+        // add remaining f
         for (int i = 0; i < f.size(); i++) {
             if (!result.containsMappingFor(f.lhs_.get(i))) {
                 result.put(f.lhs_.get(i), f.rhs_.get(i));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 25df202fde..9c1436af90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -441,7 +441,7 @@ public class HashJoinNode extends PlanNode {
             }
         }
         // 4. change the outputSmap
-        outputSmap = ExprSubstitutionMap.combineAndReplace(outputSmap, srcTblRefToOutputTupleSmap);
+        outputSmap = ExprSubstitutionMap.composeAndReplace(outputSmap, srcTblRefToOutputTupleSmap);
     }
 
     private void replaceOutputSmapForOuterJoin() {
@@ -989,6 +989,9 @@ public class HashJoinNode extends PlanNode {
 
     @Override
     public ArrayList<TupleId> getOutputTblRefIds() {
+        if (vOutputTupleDesc != null) {
+            return Lists.newArrayList(vOutputTupleDesc.getId());
+        }
         switch (joinOp) {
             case LEFT_SEMI_JOIN:
             case LEFT_ANTI_JOIN:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org