You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/19 00:55:19 UTC

[doris] branch branch-1.1-lts updated: [fix](agg)the intermediate slots should be materialized as output slots (#13424)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new f4a408874f [fix](agg)the intermediate slots should be materialized as output slots (#13424)
f4a408874f is described below

commit f4a408874fcf3002cd7cb12256d7b157efe590c9
Author: starocean999 <40...@users.noreply.github.com>
AuthorDate: Wed Oct 19 08:55:12 2022 +0800

    [fix](agg)the intermediate slots should be materialized as output slots (#13424)
    
    * [fix](agg)the intermediate slots should be materialized as output slots (#12441)
    
    in some case, the output slots of agg info may be materialized by call SlotDescriptor's materializeSrcExpr method, but not the intermediate slots. This pr set intermediate slots materialized info to keep consistent with output slots.
    
    * fix output and intermedia tuple sizes bug of agg node
---
 .../org/apache/doris/analysis/AggregateInfo.java   | 24 ++++++++
 .../apache/doris/analysis/AggregateInfoBase.java   | 11 ++++
 .../org/apache/doris/analysis/DescriptorTable.java | 13 ++++
 .../main/java/org/apache/doris/analysis/Expr.java  | 27 +++++++++
 .../org/apache/doris/analysis/SlotDescriptor.java  |  1 +
 .../org/apache/doris/planner/AggregationNode.java  |  4 +-
 .../apache/doris/planner/SingleNodePlanner.java    |  1 +
 .../data/correctness/test_subquery_with_agg.out    |  6 ++
 .../correctness/test_subquery_with_agg.groovy      | 70 ++++++++++++++++++++++
 9 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index 2db459d65c..612801f38c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -738,6 +738,30 @@ public final class AggregateInfo extends AggregateInfoBase {
         }
     }
 
+    public void updateMaterializedSlots() {
+        // why output and intermediate may have different materialized slots?
+        // because some slot is materialized by materializeSrcExpr method directly
+        // in that case, only output slots is materialized
+        // assume output tuple has correct marterialized infomation
+        // we update intermediate tuple and materializedSlots based on output tuple
+        materializedSlots_.clear();
+        ArrayList<SlotDescriptor> outputSlots = outputTupleDesc_.getSlots();
+        int groupingExprNum = groupingExprs_ != null ? groupingExprs_.size() : 0;
+        Preconditions.checkState(groupingExprNum <= outputSlots.size());
+        for (int i = groupingExprNum; i < outputSlots.size(); ++i) {
+            if (outputSlots.get(i).isMaterialized()) {
+                materializedSlots_.add(i - groupingExprNum);
+            }
+        }
+
+        ArrayList<SlotDescriptor> intermediateSlots = intermediateTupleDesc_.getSlots();
+        Preconditions.checkState(intermediateSlots.size() == outputSlots.size());
+        for (int i = 0; i < outputSlots.size(); ++i) {
+            intermediateSlots.get(i).setIsMaterialized(outputSlots.get(i).isMaterialized());
+        }
+        intermediateTupleDesc_.computeStatAndMemLayout();
+    }
+
     /**
      * Mark slots required for this aggregation as materialized:
      * - all grouping output slots as well as grouping exprs
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
index ad4dedd591..2f0c035daf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 /**
@@ -104,6 +105,16 @@ public abstract class AggregateInfoBase {
         intermediateTupleDesc_ = createTupleDesc(analyzer, false);
         if (requiresIntermediateTuple(aggregateExprs_, groupingExprs_.size() == 0)) {
             outputTupleDesc_ = createTupleDesc(analyzer, true);
+            // save the output and intermediate slots info into global desc table
+            // after creaing the plan, we can call materializeIntermediateSlots method
+            // to set the materialized info to intermediate slots based on output slots.
+            ArrayList<SlotDescriptor> outputSlots = outputTupleDesc_.getSlots();
+            ArrayList<SlotDescriptor> intermediateSlots = intermediateTupleDesc_.getSlots();
+            HashMap<SlotDescriptor, SlotDescriptor> mapping = new HashMap<>();
+            for (int i = 0; i < outputSlots.size(); ++i) {
+                mapping.put(outputSlots.get(i), intermediateSlots.get(i));
+            }
+            analyzer.getDescTbl().addSlotMappingInfo(mapping);
         } else {
             outputTupleDesc_ = intermediateTupleDesc_;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 2d179758d5..f4ffb194a3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -34,6 +34,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Repository for tuple (and slot) descriptors.
@@ -51,6 +52,8 @@ public class DescriptorTable {
     private final IdGenerator<SlotId> slotIdGenerator_ = SlotId.createGenerator();
     private final HashMap<SlotId, SlotDescriptor> slotDescs = Maps.newHashMap();
 
+    private final HashMap<SlotDescriptor, SlotDescriptor> outToIntermediateSlots = new HashMap<>();
+
     public DescriptorTable() {
     }
 
@@ -154,6 +157,16 @@ public class DescriptorTable {
         }
     }
 
+    public void addSlotMappingInfo(Map<SlotDescriptor, SlotDescriptor> mapping) {
+        outToIntermediateSlots.putAll(mapping);
+    }
+
+    public void materializeIntermediateSlots() {
+        for (Map.Entry<SlotDescriptor, SlotDescriptor> entry : outToIntermediateSlots.entrySet()) {
+            entry.getValue().setIsMaterialized(entry.getKey().isMaterialized());
+        }
+    }
+
     public TDescriptorTable toThrift() {
         TDescriptorTable result = new TDescriptorTable();
         HashSet<Table> referencedTbls = Sets.newHashSet();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index 850bf8be43..af40e9c414 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -1912,4 +1912,31 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         }
         return true;
     }
+
+    public final void finalizeForNereids() throws AnalysisException {
+        if (isAnalyzed()) {
+            return;
+        }
+        for (Expr child : children) {
+            child.finalizeForNereids();
+        }
+        finalizeImplForNereids();
+        analysisDone();
+    }
+
+    public void finalizeImplForNereids() throws AnalysisException {
+        throw new AnalysisException("analyze for Nereids do not implementation.");
+    }
+
+    public void materializeSrcExpr() {
+        if (this instanceof SlotRef) {
+            SlotRef thisRef = (SlotRef) this;
+            SlotDescriptor slotDesc = thisRef.getDesc();
+            slotDesc.setIsMaterialized(true);
+            slotDesc.getSourceExprs().forEach(Expr::materializeSrcExpr);
+        }
+        for (Expr child : children) {
+            child.materializeSrcExpr();
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 939aa05874..4b48941c71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -163,6 +163,7 @@ public class SlotDescriptor {
         }
         for (Expr expr : sourceExprs_) {
             if (!(expr instanceof SlotRef)) {
+                expr.materializeSrcExpr();
                 continue;
             }
             SlotRef slotRef = (SlotRef) expr;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index c19058f1cf..942ce1477c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -265,6 +265,7 @@ public class AggregationNode extends PlanNode {
 
     @Override
     protected void toThrift(TPlanNode msg) {
+        aggInfo.updateMaterializedSlots();
         msg.node_type = TPlanNodeType.AGGREGATION_NODE;
         List<TExpr> aggregateFunctions = Lists.newArrayList();
         // only serialize agg exprs that are being materialized
@@ -292,6 +293,7 @@ public class AggregationNode extends PlanNode {
 
     @Override
     public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) {
+        aggInfo.updateMaterializedSlots();
         StringBuilder output = new StringBuilder();
         String nameDetail = getDisplayLabelDetail();
         if (nameDetail != null) {
@@ -304,7 +306,7 @@ public class AggregationNode extends PlanNode {
 
         if (aggInfo.getAggregateExprs() != null && aggInfo.getMaterializedAggregateExprs().size() > 0) {
             output.append(detailPrefix + "output: ").append(
-                    getExplainString(aggInfo.getAggregateExprs()) + "\n");
+                    getExplainString(aggInfo.getMaterializedAggregateExprs()) + "\n");
         }
         // TODO: group by can be very long. Break it into multiple lines
         output.append(detailPrefix + "group by: ").append(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 008cb7df44..b233d9ce9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -164,6 +164,7 @@ public class SingleNodePlanner {
         PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
                 ctx.getQueryOptions().getDefaultOrderByLimit());
         Preconditions.checkNotNull(singleNodePlan);
+        analyzer.getDescTbl().materializeIntermediateSlots();
         return singleNodePlan;
     }
 
diff --git a/regression-test/data/correctness/test_subquery_with_agg.out b/regression-test/data/correctness/test_subquery_with_agg.out
new file mode 100644
index 0000000000..4c443b8493
--- /dev/null
+++ b/regression-test/data/correctness/test_subquery_with_agg.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select --
+1
+2
+3
+
diff --git a/regression-test/suites/correctness/test_subquery_with_agg.groovy b/regression-test/suites/correctness/test_subquery_with_agg.groovy
new file mode 100644
index 0000000000..49362ba86e
--- /dev/null
+++ b/regression-test/suites/correctness/test_subquery_with_agg.groovy
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_subquery_with_agg") {
+    sql """
+        drop table if exists agg_subquery_table;
+    """
+    
+    sql """
+        CREATE TABLE agg_subquery_table
+        (
+            gid       varchar(50)  NOT NULL,
+            num       int(11) SUM NOT NULL DEFAULT "0",
+            id_bitmap bitmap BITMAP_UNION NOT NULL
+        ) ENGINE = OLAP 
+        AGGREGATE KEY(gid)
+        DISTRIBUTED BY HASH(gid) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql """
+        INSERT INTO agg_subquery_table VALUES
+        ('1',4,to_bitmap(7)),
+        ('2',5,to_bitmap(8)),
+        ('3',6,to_bitmap(9));
+    """
+
+    qt_select """
+        SELECT
+        subq_1.gid AS c0
+        FROM
+        agg_subquery_table AS subq_1
+        WHERE
+        EXISTS (
+            SELECT
+            ref_2.amt AS c2
+            FROM
+            (
+                SELECT
+                bitmap_union_count(id_bitmap) AS unt,
+                sum(num) AS amt
+                FROM
+                agg_subquery_table
+            ) AS ref_2
+        )
+        order by
+        subq_1.gid;
+    """
+
+    sql """
+        drop table if exists agg_subquery_table;
+    """
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org