You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/04/18 01:55:38 UTC

[incubator-doris] branch master updated: [Forbidden](Vec) Switch to non-vec engine when outer join + not null column (#8979)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 04287cabb2 [Forbidden](Vec) Switch to non-vec engine when outer join + not null column (#8979)
04287cabb2 is described below

commit 04287cabb26bc0c8f1ef14f8e972c57aa61f201a
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Mon Apr 18 09:55:33 2022 +0800

    [Forbidden](Vec) Switch to non-vec engine when outer join + not null column (#8979)
    
    * [Forbidden](Vec) Switch to non-vec engine when outer join + not null column
    
    Vectorized code will occur `core` in the case of ```outer join + not null column```, such as issue #7901
    So we need to fall back from vectorized mode to non-vectorized mode when we encounter this situation.
    
    If the nullside column of the outer join is a column that must return non-null like count(*)
    then there is no way to force the column to be nullable.
    At this time, vectorization cannot support this situation,
    so it is necessary to fall back to non-vectorization for processing.
    For example:
      Query: set enable_vectorized_engine=true
      Query: select * from t1 left join (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1
      Result: Query goes non-vectorized engine
---
 .../java/org/apache/doris/analysis/Analyzer.java   | 81 ++++++++++++++++++++--
 .../java/org/apache/doris/analysis/SelectStmt.java |  5 +-
 ...ectorizedUtil.java => VecNotImplException.java} | 14 ++--
 .../apache/doris/common/util/VectorizedUtil.java   | 42 ++++++++++-
 .../main/java/org/apache/doris/qe/Coordinator.java |  8 ++-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 59 +++++++++++-----
 6 files changed, 170 insertions(+), 39 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 693a90d958..4be8fa5308 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -33,6 +33,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.IdGenerator;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.VecNotImplException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.RuntimeFilter;
@@ -297,6 +298,17 @@ public class Analyzer {
 
         private final long autoBroadcastJoinThreshold;
 
+        /**
+         * This property is mainly used to store the vectorized switch of the current query.
+         * true: the vectorization of the current query is turned on
+         * false: the vectorization of the current query is turned off.
+         * It is different from the vectorized switch`enableVectorizedEngine` of the session.
+         * It is only valid for a single query, while the session switch is valid for all queries in the session.
+         * It cannot be set directly by the user, only by inheritance from session`enableVectorizedEngine`
+         * or internal adjustment of the system.
+         */
+        private boolean enableQueryVec;
+
         public GlobalState(Catalog catalog, ConnectContext context) {
             this.catalog = catalog;
             this.context = context;
@@ -347,6 +359,9 @@ public class Analyzer {
                 // autoBroadcastJoinThreshold is a "final" field, must set an initial value for it
                 autoBroadcastJoinThreshold = 0;
             }
+            if (context != null) {
+                enableQueryVec = context.getSessionVariable().enableVectorizedEngine();
+            }
         }
     }
 
@@ -643,9 +658,34 @@ public class Analyzer {
         return db.getTableOrAnalysisException(tblName.getTbl());
     }
 
-    public ExprRewriter getExprRewriter() { return globalState.exprRewriter_; }
+    public ExprRewriter getExprRewriter() {
+        return globalState.exprRewriter_;
+    }
+
+    public ExprRewriter getMVExprRewriter() {
+        return globalState.mvExprRewriter;
+    }
 
-    public ExprRewriter getMVExprRewriter() { return globalState.mvExprRewriter; }
+    /**
+     * Only the top-level `query vec` value of the query analyzer represents the value of the entire query.
+     * Other sub-analyzers cannot represent the value of `query vec`.
+     * @return
+     */
+    public boolean enableQueryVec() {
+        if (ancestors.isEmpty()) {
+            return globalState.enableQueryVec;
+        } else {
+            return ancestors.get(ancestors.size() - 1).enableQueryVec();
+        }
+    }
+
+    /**
+     * Since analyzer cannot get sub-analyzers from top to bottom.
+     * So I can only set the `query vec` variable of the top level analyzer of query to true.
+     */
+    public void disableQueryVec() {
+        globalState.enableQueryVec = false;
+    }
 
     /**
      * Return descriptor of registered table/alias.
@@ -890,18 +930,47 @@ public class Analyzer {
     }
 
     /**
-     * All tuple of outer join tuple should be null in slot desc
+     * The main function of this method is to set the column property on the nullable side of the outer join
+     * to nullable in the case of vectorization.
+     * For example:
+     * Query: select * from t1 left join t2 on t1.k1=t2.k1
+     * Origin: t2.k1 not null
+     * Result: t2.k1 is nullable
+     *
+     * @throws VecNotImplException In some cases, it is not possible to directly modify the column property to nullable.
+     *     It will report an error and fall back from vectorized mode to non-vectorized mode for execution.
+     *     If the nullside column of the outer join is a column that must return non-null like count(*)
+     *     then there is no way to force the column to be nullable.
+     *     At this time, vectorization cannot support this situation,
+     *     so it is necessary to fall back to non-vectorization for processing.
+     *     For example:
+     *       Query: select * from t1 left join (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1
+     *       Origin: tmp.k1 not null, tmp.count_k2 not null
+     *       Result: throw VecNotImplException
      */
-    public void changeAllOuterJoinTupleToNull() {
+    public void changeAllOuterJoinTupleToNull() throws VecNotImplException {
         for (TupleId tid : globalState.outerJoinedTupleIds.keySet()) {
             for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) {
-                slotDescriptor.setIsNullable(true);
+                changeSlotToNull(slotDescriptor);
             }
         }
 
         for (TupleId tid : globalState.outerJoinedMaterializedTupleIds) {
             for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) {
-                slotDescriptor.setIsNullable(true);
+                changeSlotToNull(slotDescriptor);
+            }
+        }
+    }
+
+    private void changeSlotToNull(SlotDescriptor slotDescriptor) throws VecNotImplException {
+        if (slotDescriptor.getSourceExprs().isEmpty()) {
+            slotDescriptor.setIsNullable(true);
+            return;
+        }
+        for (Expr sourceExpr : slotDescriptor.getSourceExprs()) {
+            if (!sourceExpr.isNullable()) {
+                throw new VecNotImplException("The slot (" + slotDescriptor.toString()
+                        + ") could not be changed to nullable");
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index 376fefb781..502fae51aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.TableAliasGenerator;
 import org.apache.doris.common.TreeNode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.SqlUtils;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.ExprRewriter;
@@ -514,7 +515,9 @@ public class SelectStmt extends QueryStmt {
         // Change all outer join tuple to null here after analyze where and from clause
         // all solt desc of join tuple is ready. Before analyze sort info/agg info/analytic info
         // the solt desc nullable mark must be corrected to make sure BE exec query right.
-        analyzer.changeAllOuterJoinTupleToNull();
+        if (VectorizedUtil.isVectorized()) {
+            analyzer.changeAllOuterJoinTupleToNull();
+        }
 
         createSortInfo(analyzer);
         if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java
similarity index 71%
copy from fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
copy to fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java
index 349cf1e236..2c5d12e7d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java
@@ -15,16 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.util;
+package org.apache.doris.common;
 
-import org.apache.doris.qe.ConnectContext;
-
-public class VectorizedUtil {
-    public static boolean isVectorized() {
-        if (ConnectContext.get() == null) {
-            return false;
-        }
-        return ConnectContext.get().getSessionVariable().enableVectorizedEngine();
+public class VecNotImplException extends UserException {
+    public VecNotImplException(String msg) {
+        super(msg);
     }
 }
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
index 349cf1e236..b094389db9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
@@ -17,14 +17,52 @@
 
 package org.apache.doris.common.util;
 
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.qe.ConnectContext;
 
 public class VectorizedUtil {
+    /**
+     * 1. Return false if there is no current connection (Rule1 to be changed)
+     * 2. Returns the vectorized switch value of the query 'globalState.enableQueryVec'
+     * 3. If it is not currently a query, return the vectorized switch value of the session 'enableVectorizedEngine'
+     * @return true: vec. false: non-vec
+     */
     public static boolean isVectorized() {
-        if (ConnectContext.get() == null) {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext == null) {
             return false;
         }
-        return ConnectContext.get().getSessionVariable().enableVectorizedEngine();
+        Analyzer analyzer = connectContext.getExecutor().getAnalyzer();
+        if (analyzer == null) {
+            return connectContext.getSessionVariable().enableVectorizedEngine();
+        }
+        return analyzer.enableQueryVec();
+    }
+
+    /**
+     * The purpose of this function is to turn off the vectorization switch for the current query.
+     * When the vectorization engine cannot meet the requirements of the current query,
+     * it will convert the current query into a non-vectorized query.
+     * Note that this will only change the **vectorization switch for a single query**,
+     * and will not affect other queries in the same session.
+     * Therefore, even if the vectorization switch of the current query is turned off,
+     * the vectorization properties of subsequent queries will not be affected.
+     *
+     * Session: set enable_vectorized_engine=true;
+     * Query1: select * from table (vec)
+     * Query2: select * from t1 left join (select count(*) as count from t2) t3 on t1.k1=t3.count (switch to non-vec)
+     * Query3: select * from table (still vec)
+     */
+    public static void switchToQueryNonVec() {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext == null) {
+            return;
+        }
+        Analyzer analyzer = connectContext.getExecutor().getAnalyzer();
+        if (analyzer == null) {
+            return;
+        }
+        analyzer.disableQueryVec();
     }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d2f6e28b92..458dd99e5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -33,6 +33,7 @@ import org.apache.doris.common.util.ListUtil;
 import org.apache.doris.common.util.ProfileWriter;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.planner.DataPartition;
@@ -230,7 +231,7 @@ public class Coordinator {
         this.scanNodes = planner.getScanNodes();
         this.descTable = analyzer.getDescTbl().toThrift();
         this.returnedAllResults = false;
-        this.queryOptions = context.getSessionVariable().toThrift();
+        initQueryOptions(context);
 
         setFromUserProperty(analyzer);
 
@@ -300,6 +301,11 @@ public class Coordinator {
         }
     }
 
+    private void initQueryOptions(ConnectContext context) {
+        this.queryOptions = context.getSessionVariable().toThrift();
+        this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized());
+    }
+
     public long getJobId() {
         return jobId;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 702c3db6c8..2de84f0705 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -66,6 +66,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.VecNotImplException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.MetaLockUtils;
@@ -75,6 +76,7 @@ import org.apache.doris.common.util.QueryPlannerProfile;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.MysqlChannel;
@@ -117,7 +119,6 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -137,6 +138,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import com.google.protobuf.ByteString;
+
 // Do one COM_QUERY process.
 // first: Parse receive byte array to statement struct.
 // second: Do handle function for statement.
@@ -194,6 +197,10 @@ public class StmtExecutor implements ProfileWriter {
         this.coord = coord;
     }
 
+    public Analyzer getAnalyzer() {
+        return analyzer;
+    }
+
     // At the end of query execution, we begin to add up profile
     private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) {
         long currentTimestamp = System.currentTimeMillis();
@@ -566,24 +573,38 @@ public class StmtExecutor implements ProfileWriter {
             }
             // table id in tableList is in ascending order because that table map is a sorted map
             List<Table> tables = Lists.newArrayList(tableMap.values());
-            MetaLockUtils.readLockTables(tables);
-            try {
-                analyzeAndGenerateQueryPlan(tQueryOptions);
-            } catch (MVSelectFailedException e) {
-                /**
-                 * If there is MVSelectFailedException after the first planner, there will be error mv rewritten in query.
-                 * So, the query should be reanalyzed without mv rewritten and planner again.
-                 * Attention: Only error rewritten tuple is forbidden to mv rewrite in the second time.
-                 */
-                resetAnalyzerAndStmt();
-                analyzeAndGenerateQueryPlan(tQueryOptions);
-            } catch (UserException e) {
-                throw e;
-            } catch (Exception e) {
-                LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
-                throw new AnalysisException("Unexpected exception: " + e.getMessage());
-            } finally {
-                MetaLockUtils.readUnlockTables(tables);
+            int analyzeTimes = 2;
+            for (int i = 1; i <= analyzeTimes; i++) {
+                MetaLockUtils.readLockTables(tables);
+                try {
+                    analyzeAndGenerateQueryPlan(tQueryOptions);
+                    break;
+                } catch (MVSelectFailedException e) {
+                    /**
+                     * If there is MVSelectFailedException after the first planner, there will be error mv rewritten in query.
+                     * So, the query should be reanalyzed without mv rewritten and planner again.
+                     * Attention: Only error rewritten tuple is forbidden to mv rewrite in the second time.
+                     */
+                    if (i == analyzeTimes) {
+                        throw e;
+                    } else {
+                        resetAnalyzerAndStmt();
+                    }
+                } catch (VecNotImplException e) {
+                    if (i == analyzeTimes) {
+                        throw e;
+                    } else {
+                        resetAnalyzerAndStmt();
+                        VectorizedUtil.switchToQueryNonVec();
+                    }
+                } catch (UserException e) {
+                    throw e;
+                } catch (Exception e) {
+                    LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
+                    throw new AnalysisException("Unexpected exception: " + e.getMessage());
+                } finally {
+                    MetaLockUtils.readUnlockTables(tables);
+                }
             }
         } else {
             try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org