You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/04 14:43:56 UTC
[doris] 08/15: [Chore](agg-state) add sessionvariable enable_agg_state (#21373)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3a8dcd21226a41e9f2550f49bf5d125bda3f5ee2
Author: Pxl <px...@qq.com>
AuthorDate: Tue Jul 4 14:25:21 2023 +0800
[Chore](agg-state) add sessionvariable enable_agg_state (#21373)
add sessionvariable enable_agg_state
---
.../java/org/apache/doris/analysis/Analyzer.java | 14 ----------
.../java/org/apache/doris/analysis/ColumnDef.java | 6 ++++
.../apache/doris/common/util/VectorizedUtil.java | 30 --------------------
.../glue/translator/RuntimeFilterTranslator.java | 5 ++--
.../processor/post/RuntimeFilterGenerator.java | 6 ++--
.../rewrite/BuildCTEAnchorAndCTEProducer.java | 2 +-
.../doris/nereids/rules/rewrite/InlineCTE.java | 2 +-
.../apache/doris/planner/DistributedPlanner.java | 3 +-
.../org/apache/doris/planner/OlapScanNode.java | 4 +--
.../main/java/org/apache/doris/qe/Coordinator.java | 5 ++--
.../java/org/apache/doris/qe/SessionVariable.java | 32 ++++++++++++++++++----
.../java/org/apache/doris/qe/StmtExecutor.java | 6 ++--
.../test_agg_state_group_concat.groovy | 1 +
.../agg_state/max/test_agg_state_max.groovy | 1 +
.../nereids/test_agg_state_nereids.groovy | 1 +
.../datatype_p0/agg_state/test_agg_state.groovy | 1 +
16 files changed, 53 insertions(+), 66 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index d229d4b246..0faa84d2e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -2294,20 +2294,6 @@ public class Analyzer {
return globalState.context.getSessionVariable().isEnableFoldConstantByBe();
}
- /**
- * Returns true if predicate 'e' can be correctly evaluated by a tree materializing
- * 'tupleIds', otherwise false:
- * - the predicate needs to be bound by tupleIds
- * - a Where clause predicate can only be correctly evaluated if for all outer-joined
- * referenced tids the last join to outer-join this tid has been materialized
- * - an On clause predicate against the non-nullable side of an Outer Join clause
- * can only be correctly evaluated by the join node that materializes the
- * Outer Join clause
- */
- private boolean canEvalPredicate(PlanNode node, Expr e) {
- return canEvalPredicate(node.getTblRefIds(), e);
- }
-
/**
* Returns true if predicate 'e' can be correctly evaluated by a tree materializing
* 'tupleIds', otherwise false:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
index 9833236a89..ab1a1bed6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.qe.SessionVariable;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
@@ -303,6 +304,11 @@ public class ColumnDef {
throw new AnalysisException(String.format("Aggregate type %s is not compatible with primitive type %s",
toString(), type.toSql()));
}
+ if (aggregateType == AggregateType.GENERIC_AGGREGATION) {
+ if (!SessionVariable.enableAggState()) {
+ throw new AnalysisException("agg state not enable, need set enable_agg_state=true");
+ }
+ }
}
if (type.getPrimitiveType() == PrimitiveType.FLOAT || type.getPrimitiveType() == PrimitiveType.DOUBLE) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
deleted file mode 100644
index f45bffcfa5..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.util;
-
-import org.apache.doris.qe.ConnectContext;
-
-public class VectorizedUtil {
- public static boolean isPipeline() {
- ConnectContext connectContext = ConnectContext.get();
- if (connectContext == null) {
- return false;
- }
- return connectContext.getSessionVariable().enablePipelineEngine();
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 3308ed3580..bbb9997f2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -36,7 +36,7 @@ import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.ScanNode;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.collect.ImmutableList;
@@ -175,8 +175,7 @@ public class RuntimeFilterTranslator {
origFilter.extractTargetsPosition();
// Number of parallel instances are large for pipeline engine, so we prefer bloom filter.
if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM
- && ConnectContext.get() != null
- && ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
+ && SessionVariable.enablePipelineEngine()) {
origFilter.setType(TRuntimeFilterType.BLOOM);
}
return origFilter;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 524ba17ba0..5b4a1b6ade 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -313,7 +313,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
} else {
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
- && ctx.getSessionVariable().enablePipelineEngine()
+ && ctx.getSessionVariable().getEnablePipelineEngine()
&& hasRemoteTarget(join, scan)) {
type = TRuntimeFilterType.BLOOM;
}
@@ -363,7 +363,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
}
PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
if (type == TRuntimeFilterType.IN_OR_BLOOM
- && ctx.getSessionVariable().enablePipelineEngine()
+ && ctx.getSessionVariable().getEnablePipelineEngine()
&& hasRemoteTarget(join, scan)) {
type = TRuntimeFilterType.BLOOM;
}
@@ -563,7 +563,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
PhysicalHashJoin join = innerEntry.getValue();
Preconditions.checkState(join != null);
TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
- if (ctx.getSessionVariable().enablePipelineEngine()) {
+ if (ctx.getSessionVariable().getEnablePipelineEngine()) {
type = TRuntimeFilterType.BLOOM;
}
EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/BuildCTEAnchorAndCTEProducer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/BuildCTEAnchorAndCTEProducer.java
index b171d44931..dab88e686e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/BuildCTEAnchorAndCTEProducer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/BuildCTEAnchorAndCTEProducer.java
@@ -52,7 +52,7 @@ public class BuildCTEAnchorAndCTEProducer extends OneRewriteRuleFactory {
CTEId id = logicalCTE.findCTEId(s.getAlias());
if (cascadesContext.cteReferencedCount(id)
<= ConnectContext.get().getSessionVariable().inlineCTEReferencedThreshold
- || !ConnectContext.get().getSessionVariable().enablePipelineEngine) {
+ || !ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) {
continue;
}
LogicalCTEProducer logicalCTEProducer = new LogicalCTEProducer(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InlineCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InlineCTE.java
index 4c9e8d904d..a50f42a2c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InlineCTE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InlineCTE.java
@@ -47,7 +47,7 @@ public class InlineCTE extends OneRewriteRuleFactory {
* Current we only implement CTE Materialize on pipeline engine and only materialize those CTE whose
* refCount > NereidsRewriter.INLINE_CTE_REFERENCED_THRESHOLD.
*/
- if (ConnectContext.get().getSessionVariable().enablePipelineEngine
+ if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
&& ConnectContext.get().getSessionVariable().enableCTEMaterialize
&& refCount > INLINE_CTE_REFERENCED_THRESHOLD) {
return cteConsumer;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 561a771e3c..6a73952e20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -39,6 +39,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TPartitionType;
import com.google.common.base.Preconditions;
@@ -935,7 +936,7 @@ public class DistributedPlanner {
childFragment.addPlanRoot(node);
childFragment.setHasColocatePlanNode(true);
return childFragment;
- } else if (ConnectContext.get().getSessionVariable().enablePipelineEngine()
+ } else if (SessionVariable.enablePipelineEngine()
&& childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())
&& childFragment.getPlanRoot() instanceof OlapScanNode) {
childFragment.getPlanRoot().setShouldColoScan();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 6fa4955a31..b520e058e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1229,7 +1229,7 @@ public class OlapScanNode extends ScanNode {
public int getNumInstances() {
// In pipeline exec engine, the instance num equals be_num * parallel instance.
// so here we need count distinct be_num to do the work. make sure get right instance
- if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
+ if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) {
int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
long numBackend = scanRangeLocations.stream().flatMap(rangeLoc -> rangeLoc.getLocations().stream())
.map(loc -> loc.backend_id).distinct().count();
@@ -1241,7 +1241,7 @@ public class OlapScanNode extends ScanNode {
@Override
public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
distributionColumnIds.clear();
- if (ConnectContext.get().getSessionVariable().enablePipelineEngine()
+ if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
&& ConnectContext.get().getSessionVariable().enableColocateScan()) {
List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs();
List<SlotDescriptor> slots = desc.getSlots();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d4f78b0d91..a18248a017 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -35,7 +35,6 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
@@ -311,7 +310,7 @@ public class Coordinator {
this.returnedAllResults = false;
this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin;
- this.enablePipelineEngine = context.getSessionVariable().enablePipelineEngine;
+ this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine();
initQueryOptions(context);
setFromUserProperty(context);
@@ -379,7 +378,7 @@ public class Coordinator {
private void initQueryOptions(ConnectContext context) {
this.queryOptions = context.getSessionVariable().toThrift();
- this.queryOptions.setEnablePipelineEngine(VectorizedUtil.isPipeline());
+ this.queryOptions.setEnablePipelineEngine(SessionVariable.enablePipelineEngine());
this.queryOptions.setBeExecVersion(Config.be_exec_version);
this.queryOptions.setQueryTimeout(context.getExecTimeout());
this.queryOptions.setExecutionTimeout(context.getExecTimeout());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index bb72439de8..d18834ab8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -191,6 +191,9 @@ public class SessionVariable implements Serializable, Writable {
public static final long MIN_INSERT_VISIBLE_TIMEOUT_MS = 1000;
public static final String ENABLE_PIPELINE_ENGINE = "enable_pipeline_engine";
+
+ public static final String ENABLE_AGG_STATE = "enable_agg_state";
+
public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt";
@@ -607,7 +610,10 @@ public class SessionVariable implements Serializable, Writable {
public boolean enableVectorizedEngine = true;
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true, expType = ExperimentalType.EXPERIMENTAL)
- public boolean enablePipelineEngine = true;
+ private boolean enablePipelineEngine = true;
+
+ @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, expType = ExperimentalType.EXPERIMENTAL)
+ public boolean enableAggState = false;
@VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE)
public boolean enableParallelOutfile = false;
@@ -1595,10 +1601,6 @@ public class SessionVariable implements Serializable, Writable {
this.runtimeFilterMaxInNum = runtimeFilterMaxInNum;
}
- public boolean enablePipelineEngine() {
- return enablePipelineEngine;
- }
-
public void setEnablePipelineEngine(boolean enablePipelineEngine) {
this.enablePipelineEngine = enablePipelineEngine;
}
@@ -2297,4 +2299,24 @@ public class SessionVariable implements Serializable, Writable {
public boolean isEnableUnifiedLoad() {
return enableUnifiedLoad;
}
+
+ public boolean getEnablePipelineEngine() {
+ return enablePipelineEngine;
+ }
+
+ public static boolean enablePipelineEngine() {
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext == null) {
+ return false;
+ }
+ return connectContext.getSessionVariable().enablePipelineEngine;
+ }
+
+ public static boolean enableAggState() {
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext == null) {
+ return true;
+ }
+ return connectContext.getSessionVariable().enableAggState;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 04ed3340c7..c86da0daed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -315,7 +315,7 @@ public class StmtExecutor {
builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
builder.traceId(context.getSessionVariable().getTraceId());
builder.isNereids(context.getState().isNereids ? "Yes" : "No");
- builder.isPipeline(context.getSessionVariable().enablePipelineEngine ? "Yes" : "No");
+ builder.isPipeline(context.getSessionVariable().getEnablePipelineEngine() ? "Yes" : "No");
return builder.build();
}
@@ -563,7 +563,7 @@ public class StmtExecutor {
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
// queue query here
if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue
- && context.getSessionVariable().enablePipelineEngine()) {
+ && context.getSessionVariable().getEnablePipelineEngine()) {
this.queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
try {
this.offerRet = queryQueue.offer();
@@ -1357,7 +1357,7 @@ public class StmtExecutor {
// 2. If this is a query, send the result expr fields first, and send result data back to client.
RowBatch batch;
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
- if (Config.enable_workload_group && context.sessionVariable.enablePipelineEngine()) {
+ if (Config.enable_workload_group && context.sessionVariable.getEnablePipelineEngine()) {
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
}
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
diff --git a/regression-test/suites/datatype_p0/agg_state/group_concat/test_agg_state_group_concat.groovy b/regression-test/suites/datatype_p0/agg_state/group_concat/test_agg_state_group_concat.groovy
index 6926a16aba..9dc1e86009 100644
--- a/regression-test/suites/datatype_p0/agg_state/group_concat/test_agg_state_group_concat.groovy
+++ b/regression-test/suites/datatype_p0/agg_state/group_concat/test_agg_state_group_concat.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("test_agg_state_group_concat") {
+ sql "set enable_agg_state=true"
sql """ DROP TABLE IF EXISTS a_table; """
sql """
create table a_table(
diff --git a/regression-test/suites/datatype_p0/agg_state/max/test_agg_state_max.groovy b/regression-test/suites/datatype_p0/agg_state/max/test_agg_state_max.groovy
index c022b829e9..1ec3e354de 100644
--- a/regression-test/suites/datatype_p0/agg_state/max/test_agg_state_max.groovy
+++ b/regression-test/suites/datatype_p0/agg_state/max/test_agg_state_max.groovy
@@ -19,6 +19,7 @@ suite("test_agg_state_max") {
// todo: will core dump now, need fix.
sql"set enable_nereids_planner=false;"
+ sql "set enable_agg_state=true"
sql """ DROP TABLE IF EXISTS a_table; """
sql """
create table a_table(
diff --git a/regression-test/suites/datatype_p0/agg_state/nereids/test_agg_state_nereids.groovy b/regression-test/suites/datatype_p0/agg_state/nereids/test_agg_state_nereids.groovy
index 001b890647..1cadc48e2b 100644
--- a/regression-test/suites/datatype_p0/agg_state/nereids/test_agg_state_nereids.groovy
+++ b/regression-test/suites/datatype_p0/agg_state/nereids/test_agg_state_nereids.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("test_agg_state_nereids") {
+ sql "set enable_agg_state=true"
sql "set enable_nereids_planner=true;"
sql "set enable_fallback_to_original_planner=false;"
diff --git a/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy b/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy
index 142432083e..3c035982f5 100644
--- a/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy
+++ b/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy
@@ -16,6 +16,7 @@
// under the License.
suite("test_agg_state") {
+ sql "set enable_agg_state=true"
sql """ DROP TABLE IF EXISTS d_table; """
sql """
create table d_table(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org