You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/13 03:36:55 UTC
[incubator-doris] branch master updated: [Feature] Runtime
Filtering for Doris (Background, Configuration, FE Implement, Tuning,
Test ) (#6121)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 394a9a1 [Feature] Runtime Filtering for Doris (Background, Configuration, FE Implement, Tuning, Test ) (#6121)
394a9a1 is described below
commit 394a9a1472cb8bfe91f6649afcbeee7f0178dc62
Author: Xinyi Zou <zo...@foxmail.com>
AuthorDate: Tue Jul 13 11:36:01 2021 +0800
[Feature] Runtime Filtering for Doris (Background, Configuration, FE Implement, Tuning, Test ) (#6121)
- `RuntimeFilterGenerator` is used to generate Runtime Filter and assign it to the node that uses Runtime Filter in the query plan.
- `RuntimeFilter` represents a filter in the query plan, including the specific properties of the filter, the binding method of expr and tuple slot, etc.
- `RuntimeFilterTarget` indicates the filter information provided to ScanNode, including target expr, whether to merge, etc.
---
.../java/org/apache/doris/analysis/Analyzer.java | 46 +++
.../org/apache/doris/analysis/BinaryPredicate.java | 2 +
.../main/java/org/apache/doris/analysis/Expr.java | 37 ++
.../java/org/apache/doris/analysis/Predicate.java | 5 +
.../org/apache/doris/analysis/StringLiteral.java | 30 +-
.../org/apache/doris/analysis/SysVariableDesc.java | 20 +-
.../doris/analysis/TupleIsNullPredicate.java | 20 +
.../util/BitUtil.java} | 23 +-
.../apache/doris/planner/DistributedPlanner.java | 3 -
.../org/apache/doris/planner/HashJoinNode.java | 15 +-
.../org/apache/doris/planner/OlapScanNode.java | 4 +
.../org/apache/doris/planner/PlanFragment.java | 30 ++
.../java/org/apache/doris/planner/PlanNode.java | 36 ++
.../java/org/apache/doris/planner/Planner.java | 10 +-
.../org/apache/doris/planner/RuntimeFilter.java | 457 +++++++++++++++++++++
.../doris/planner/RuntimeFilterGenerator.java | 400 ++++++++++++++++++
.../org/apache/doris/planner/RuntimeFilterId.java | 56 +++
.../java/org/apache/doris/planner/ScanNode.java | 2 +
.../apache/doris/planner/SingleNodePlanner.java | 31 ++
.../main/java/org/apache/doris/qe/Coordinator.java | 74 +++-
.../apache/doris/qe/RuntimeFilterTypeHelper.java | 115 ++++++
.../java/org/apache/doris/qe/SessionVariable.java | 102 +++++
.../main/java/org/apache/doris/qe/VariableMgr.java | 36 +-
.../org/apache/doris/qe/VariableVarConverterI.java | 4 +-
.../org/apache/doris/qe/VariableVarConverters.java | 59 ++-
.../org/apache/doris/planner/QueryPlanTest.java | 79 ++++
.../doris/planner/RuntimeFilterGeneratorTest.java | 426 +++++++++++++++++++
.../doris/qe/RuntimeFilterTypeHelperTest.java | 77 ++++
.../java/org/apache/doris/qe/VariableMgrTest.java | 6 +
gensrc/thrift/PaloInternalService.thrift | 24 +-
gensrc/thrift/PlanNodes.thrift | 4 +-
31 files changed, 2131 insertions(+), 102 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 3634768..a879f08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.BetweenToCompoundRule;
import org.apache.doris.rewrite.ExprRewriteRule;
@@ -148,6 +149,9 @@ public class Analyzer {
private boolean isUDFAllowed = true;
// timezone specified for some operation, such as broker load
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
+
+ // The runtime filter that is expected to be used
+ private final List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
public void setIsSubquery() {
isSubquery = true;
@@ -163,6 +167,10 @@ public class Analyzer {
public void setTimezone(String timezone) { this.timezone = timezone; }
public String getTimezone() { return timezone; }
+ public void putAssignedRuntimeFilter(RuntimeFilter rf) { assignedRuntimeFilters.add(rf); }
+ public List<RuntimeFilter> getAssignedRuntimeFilter() { return assignedRuntimeFilters; }
+ public void clearAssignedRuntimeFilters() { assignedRuntimeFilters.clear(); }
+
// state shared between all objects of an Analyzer tree
// TODO: Many maps here contain properties about tuples, e.g., whether
// a tuple is outer/semi joined, etc. Remove the maps in favor of making
@@ -551,6 +559,10 @@ public class Analyzer {
return globalState.descTbl.getTupleDesc(id);
}
+ public SlotDescriptor getSlotDesc(SlotId id) {
+ return globalState.descTbl.getSlotDesc(id);
+ }
+
/**
* Given a "table alias"."column alias", return the SlotDescriptor
*
@@ -1809,4 +1821,38 @@ public class Analyzer {
}
}
}
+
+ /**
+ * Column conduction, can slot a value-transfer to slot b
+ *
+ * TODO(zxy) Use value-transfer graph to check
+ */
+ public boolean hasValueTransfer(SlotId a, SlotId b) {
+ return a.equals(b);
+ }
+
+ /**
+ * Returns sorted slot IDs with value transfers from 'srcSid'.
+ * Time complexity: O(V) where V = number of slots
+ *
+ * TODO(zxy) Use value-transfer graph to check
+ */
+ public List<SlotId> getValueTransferTargets(SlotId srcSid) {
+ List<SlotId> result = new ArrayList<>();
+ result.add(srcSid);
+ return result;
+ }
+
+ /**
+ * Returns true if any of the given slot ids or their value-transfer targets belong
+ * to an outer-joined tuple.
+ */
+ public boolean hasOuterJoinedValueTransferTarget(List<SlotId> sids) {
+ for (SlotId srcSid: sids) {
+ for (SlotId dstSid: getValueTransferTargets(srcSid)) {
+ if (isOuterJoined(getTupleId(dstSid))) return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
index 9a98ac5..8c3da5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
@@ -133,6 +133,8 @@ public class BinaryPredicate extends Predicate implements Writable {
public boolean isEquivalence() { return this == EQ || this == EQ_FOR_NULL; };
+ public boolean isUnNullSafeEquivalence() { return this == EQ; };
+
public boolean isUnequivalence() { return this == NE; }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index 3f73a31..cafd88f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -1704,4 +1704,41 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
final Expr newExpr = ExpressionFunctions.INSTANCE.evalExpr(this);
return newExpr != null ? newExpr : this;
}
+
+ public String getStringValue() {
+ if (this instanceof LiteralExpr) {
+ return ((LiteralExpr) this).getStringValue();
+ }
+ return "";
+ }
+
+ public static Expr getFirstBoundChild(Expr expr, List<TupleId> tids) {
+ for (Expr child: expr.getChildren()) {
+ if (child.isBoundByTupleIds(tids)) return child;
+ }
+ return null;
+ }
+
+ /**
+ * Returns true if expr contains specify function, otherwise false.
+ */
+ public boolean isContainsFunction(String functionName) {
+ if (fn == null) return false;
+ if (fn.functionName().equalsIgnoreCase(functionName)) return true;
+ for (Expr child: children) {
+ if (child.isContainsFunction(functionName)) return true;
+ }
+ return false;
+ }
+
+ /**
+ * Returns true if expr contains specify className, otherwise false.
+ */
+ public boolean isContainsClass(String className) {
+ if (this.getClass().getName().equalsIgnoreCase(className)) return true;
+ for (Expr child: children) {
+ if (child.isContainsClass(className)) return true;
+ }
+ return false;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java
index 0f033b4..cc4f4dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java
@@ -98,6 +98,11 @@ public abstract class Predicate extends Expr {
&& ((BinaryPredicate) expr).getOp().isEquivalence();
}
+ public static boolean isUnNullSafeEquivalencePredicate(Expr expr) {
+ return (expr instanceof BinaryPredicate)
+ && ((BinaryPredicate) expr).getOp().isUnNullSafeEquivalence();
+ }
+
public static boolean canPushDownPredicate(Expr expr) {
if (!(expr instanceof Predicate)) {
return false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
index 8af05d3..ab202e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
@@ -24,7 +24,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.io.Text;
-import org.apache.doris.qe.SqlModeHelper;
+import org.apache.doris.qe.VariableVarConverters;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TExprNodeType;
import org.apache.doris.thrift.TStringLiteral;
@@ -43,18 +43,8 @@ import java.util.Objects;
public class StringLiteral extends LiteralExpr {
private static final Logger LOG = LogManager.getLogger(StringLiteral.class);
private String value;
- /**
- * the session variable `sql_mode` is a special kind of variable.
- * it's real type is int, so when querying `select @@sql_mode`, the return column
- * type is "int". but user usually set this variable by string, such as:
- * `set @@sql_mode = 'STRICT_TRANS_TABLES'`
- * or
- * `set @@sql_mode = concat(@@sql_mode, 'STRICT_TRANS_TABLES')'`
- * <p>
- * So when it need to be cast to int, it means "cast 'STRICT_TRANS_TABLES' to Integer".
- * To support this, we set `isSqlMode` to true, so that it can cast sql mode name to integer.
- */
- private boolean isSqlMode = false;
+ // Means the converted session variable need to be cast to int, such as "cast 'STRICT_TRANS_TABLES' to Integer".
+ private String beConverted = "";
public StringLiteral() {
super();
@@ -73,8 +63,8 @@ public class StringLiteral extends LiteralExpr {
value = other.value;
}
- public void setIsSqlMode(boolean val) {
- this.isSqlMode = val;
+ public void setBeConverted(String val) {
+ this.beConverted = val;
}
@Override
@@ -203,20 +193,18 @@ public class StringLiteral extends LiteralExpr {
case SMALLINT:
case INT:
case BIGINT:
- if (isSqlMode) {
+ if (VariableVarConverters.hasConverter(beConverted)) {
try {
- long sqlMode = SqlModeHelper.encode(value);
- return new IntLiteral(sqlMode, targetType);
+ return new IntLiteral(VariableVarConverters.encode(beConverted, value), targetType);
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
}
return new IntLiteral(value, targetType);
case LARGEINT:
- if (isSqlMode) {
+ if (VariableVarConverters.hasConverter(beConverted)) {
try {
- long sqlMode = SqlModeHelper.encode(value);
- return new LargeIntLiteral(String.valueOf(sqlMode));
+ return new LargeIntLiteral(String.valueOf(VariableVarConverters.encode(beConverted, value)));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java
index 5675f52..c000baf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java
@@ -21,9 +21,8 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorReport;
-import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.qe.VariableVarConverters;
import org.apache.doris.thrift.TBoolLiteral;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TExprNodeType;
@@ -72,10 +71,10 @@ public class SysVariableDesc extends Expr {
@Override
public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
VariableMgr.fillValue(analyzer.getContext().getSessionVariable(), this);
- if (!Strings.isNullOrEmpty(name) && name.equalsIgnoreCase(SessionVariable.SQL_MODE)) {
+ if (!Strings.isNullOrEmpty(name) && VariableVarConverters.hasConverter(name)) {
setType(Type.VARCHAR);
try {
- setStringValue(SqlModeHelper.decode(intValue));
+ setStringValue(VariableVarConverters.decode(name, intValue));
} catch (DdlException e) {
ErrorReport.reportAnalysisException(e.getMessage());
}
@@ -117,16 +116,13 @@ public class SysVariableDesc extends Expr {
@Override
public Expr getResultValue() throws AnalysisException {
Expr expr = super.getResultValue();
- if (!Strings.isNullOrEmpty(name) && name.equalsIgnoreCase(SessionVariable.SQL_MODE)) {
- // SQL_MODE is a special variable. Its type is int, but it is usually set using a string.
- // Such as `set sql_mode = concat(@@sql_mode, "STRICT_TRANS_TABLES");`
- // So we return the string type here so that it can correctly match the subsequent function signature.
- // We will convert the string to int in VariableMgr.
- // And we also set `isSqlMode` to true in StringLiteral, so that it can be cast back
+ if (!Strings.isNullOrEmpty(name) && VariableVarConverters.hasConverter(name)) {
+ // Return the string type here so that it can correctly match the subsequent function signature.
+ // And we also set `beConverted` to session variable name in StringLiteral, so that it can be cast back
// to Integer when returning value.
try {
- StringLiteral s = new StringLiteral(SqlModeHelper.decode(intValue));
- s.setIsSqlMode(true);
+ StringLiteral s = new StringLiteral(VariableVarConverters.decode(name, intValue));
+ s.setBeConverted(name);
return s;
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java
index 872c18c..c4cb13f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java
@@ -153,4 +153,24 @@ public class TupleIsNullPredicate extends Predicate {
public String toSqlImpl() {
return "TupleIsNull(" + Joiner.on(",").join(tupleIds) + ")";
}
+
+ /**
+ * Recursive function that replaces all 'IF(TupleIsNull(), NULL, e)' exprs in
+ * 'expr' with e and returns the modified expr.
+ */
+ public static Expr unwrapExpr(Expr expr) {
+ if (expr instanceof FunctionCallExpr) {
+ FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr;
+ List<Expr> params = fnCallExpr.getParams().exprs();
+ if (fnCallExpr.getFnName().getFunction().equals("if") &&
+ params.get(0) instanceof TupleIsNullPredicate &&
+ Expr.IS_NULL_LITERAL.apply(params.get(1))) {
+ return unwrapExpr(params.get(2));
+ }
+ }
+ for (int i = 0; i < expr.getChildren().size(); ++i) {
+ expr.setChild(i, unwrapExpr(expr.getChild(i)));
+ }
+ return expr;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java
similarity index 53%
copy from fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
copy to fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java
index 26d7dae..5702755 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java
@@ -15,10 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.qe;
+package org.apache.doris.common.util;
-import org.apache.doris.common.DdlException;
+public class BitUtil {
-public interface VariableVarConverterI {
- public String convert(String value) throws DdlException;
+ // Returns the log2 of 'val'. 'val' must be > 0.
+ public static int log2Ceiling(long val) {
+ // Formula is based on the Long.numberOfLeadingZeros() javadoc comment.
+ return 64 - Long.numberOfLeadingZeros(val - 1);
+ }
+
+ // Round up 'val' to the nearest power of two. 'val' must be > 0.
+ public static long roundUpToPowerOf2(long val) {
+ return 1L << log2Ceiling(val);
+ }
+
+ // Round up 'val' to the nearest multiple of a power-of-two 'factor'.
+ // 'val' must be > 0.
+ public static long roundUpToPowerOf2Factor(long val, long factor) {
+ return (val + (factor - 1)) & ~(factor - 1);
+ }
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 677bde5..5aa9bb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -427,7 +427,6 @@ public class DistributedPlanner {
node.setChild(0, leftChildFragment.getPlanRoot());
connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
leftChildFragment.setPlanRoot(node);
-
return leftChildFragment;
} else {
node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
@@ -472,8 +471,6 @@ public class DistributedPlanner {
rightChildFragment.setDestination(rhsExchange);
rightChildFragment.setOutputPartition(rhsJoinPartition);
- // TODO: Before we support global runtime filter, only shuffle join do not enable local runtime filter
- node.setIsPushDown(false);
return joinFragment;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 2360d5c..6b06de2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -60,7 +60,6 @@ public class HashJoinNode extends PlanNode {
private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
// join conjuncts from the JOIN clause that aren't equi-join predicates
private List<Expr> otherJoinConjuncts;
- private boolean isPushDown = false;
private DistributionMode distrMode;
private boolean isColocate = false; //the flag for colocate join
private String colocateReason = ""; // if can not do colocate join, set reason here
@@ -85,9 +84,6 @@ public class HashJoinNode extends PlanNode {
this.otherJoinConjuncts = otherJoinConjuncts;
children.add(outer);
children.add(inner);
- if (this.joinOp.isInnerJoin() || this.joinOp.isLeftSemiJoin()) {
- this.isPushDown = true;
- }
// Inherits all the nullable tuple from the children
// Mark tuples that form the "nullable" side of the outer join as nullable.
@@ -282,10 +278,6 @@ public class HashJoinNode extends PlanNode {
}
}
- public void setIsPushDown(boolean isPushDown) {
- this.isPushDown = isPushDown;
- }
-
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.HASH_JOIN_NODE;
@@ -300,7 +292,6 @@ public class HashJoinNode extends PlanNode {
for (Expr e : otherJoinConjuncts) {
msg.hash_join_node.addToOtherJoinConjuncts(e.treeToThrift());
}
- msg.hash_join_node.setIsPushDown(isPushDown);
}
@Override
@@ -326,6 +317,12 @@ public class HashJoinNode extends PlanNode {
if (!conjuncts.isEmpty()) {
output.append(detailPrefix).append("other predicates: ").append(getExplainString(conjuncts)).append("\n");
}
+ if (!runtimeFilters.isEmpty()) {
+ output.append(detailPrefix).append("runtime filters: ");
+ output.append(getRuntimeFilterExplainString(true));
+ }
+ output.append(detailPrefix).append(String.format(
+ "cardinality=%s", cardinality)).append("\n");
return output.toString();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 62948c7..99e158e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -591,6 +591,10 @@ public class OlapScanNode extends ScanNode {
output.append(prefix).append("PREDICATES: ").append(
getExplainString(conjuncts)).append("\n");
}
+ if (!runtimeFilters.isEmpty()) {
+ output.append(prefix).append("runtime filters: ");
+ output.append(getRuntimeFilterExplainString(false));
+ }
output.append(prefix).append(String.format(
"partitions=%s/%s",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index f714132..e865cf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -36,7 +36,9 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -127,6 +129,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// default value is 1
private int parallelExecNum = 1;
+ // The runtime filter id that produced
+ private Set<RuntimeFilterId> builderRuntimeFilterIds;
+ // The runtime filter id that is expected to be used
+ private Set<RuntimeFilterId> targetRuntimeFilterIds;
+
/**
* C'tor for fragment with specific partition; the output is by default broadcast.
*/
@@ -136,6 +143,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
this.dataPartition = partition;
this.outputPartition = DataPartition.UNPARTITIONED;
this.transferQueryStatisticsWithEveryBatch = false;
+ this.builderRuntimeFilterIds = new HashSet<>();
+ this.targetRuntimeFilterIds = new HashSet<>();
setParallelExecNumIfExists();
setFragmentInPlanTree(planRoot);
}
@@ -177,6 +186,14 @@ public class PlanFragment extends TreeNode<PlanFragment> {
this.outputExprs = Expr.cloneList(outputExprs, null);
}
+ public void setBuilderRuntimeFilterIds(RuntimeFilterId rid) {
+ this.builderRuntimeFilterIds.add(rid);
+ }
+
+ public void setTargetRuntimeFilterIds(RuntimeFilterId rid) {
+ this.targetRuntimeFilterIds.add(rid);
+ }
+
/**
* Finalize plan tree and create stream sink, if needed.
*/
@@ -344,6 +361,19 @@ public class PlanFragment extends TreeNode<PlanFragment> {
return fragmentId;
}
+ public Set<RuntimeFilterId> getBuilderRuntimeFilterIds() {
+ return builderRuntimeFilterIds;
+ }
+
+ public Set<RuntimeFilterId> getTargetRuntimeFilterIds() {
+ return targetRuntimeFilterIds;
+ }
+
+ public void clearRuntimeFilters() {
+ builderRuntimeFilterIds.clear();
+ targetRuntimeFilterIds.clear();
+ }
+
public void setTransferQueryStatisticsWithEveryBatch(boolean value) {
transferQueryStatisticsWithEveryBatch = value;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index dd7f636..1233f45 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -17,6 +17,7 @@
package org.apache.doris.planner;
+import com.google.common.base.Joiner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
@@ -40,6 +41,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -117,6 +119,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
return planNodeName;
}
+ // Runtime filters assigned to this node.
+ protected List<RuntimeFilter> runtimeFilters = new ArrayList<>();
+
protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName) {
this.id = id;
this.limit = -1;
@@ -411,6 +416,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
for (Expr e : conjuncts) {
msg.addToConjuncts(e.treeToThrift());
}
+ // Serialize any runtime filters
+ for (RuntimeFilter filter : runtimeFilters) {
+ msg.addToRuntimeFilters(filter.toThrift());
+ }
msg.compact_data = compactData;
toThrift(msg);
container.addToNodes(msg);
@@ -659,4 +668,31 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
}
return null;
}
+
+ protected void addRuntimeFilter(RuntimeFilter filter) { runtimeFilters.add(filter); }
+
+ protected Collection<RuntimeFilter> getRuntimeFilters() { return runtimeFilters; }
+
+ public void clearRuntimeFilters() { runtimeFilters.clear(); }
+
+ protected String getRuntimeFilterExplainString(boolean isBuildNode) {
+ if (runtimeFilters.isEmpty()) return "";
+ List<String> filtersStr = new ArrayList<>();
+ for (RuntimeFilter filter: runtimeFilters) {
+ StringBuilder filterStr = new StringBuilder();
+ filterStr.append(filter.getFilterId());
+ filterStr.append("[");
+ filterStr.append(filter.getType().toString().toLowerCase());
+ filterStr.append("]");
+ if (isBuildNode) {
+ filterStr.append(" <- ");
+ filterStr.append(filter.getSrcExpr().toSql());
+ } else {
+ filterStr.append(" -> ");
+ filterStr.append(filter.getTargetExpr(getId()).toSql());
+ }
+ filtersStr.add(filterStr.toString());
+ }
+ return Joiner.on(", ").join(filtersStr) + "\n";
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index fa7a101..16c0a37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.PlanTreeBuilder;
import org.apache.doris.common.profile.PlanTreePrinter;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TQueryOptions;
@@ -38,6 +39,7 @@ import org.apache.doris.thrift.TQueryOptions;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.doris.thrift.TRuntimeFilterMode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -201,7 +203,13 @@ public class Planner {
QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer = new QueryStatisticsTransferOptimizer(rootFragment);
queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer();
- if (statement instanceof InsertStmt) {
+ // Create runtime filters.
+ if (!ConnectContext.get().getSessionVariable().getRuntimeFilterMode().toUpperCase()
+ .equals(TRuntimeFilterMode.OFF.name())) {
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
+ }
+
+ if (statement instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) statement;
rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
rootFragment.setSink(insertStmt.getDataSink());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
new file mode 100644
index 0000000..a368208
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.analysis.TupleIsNullPredicate;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.thrift.TRuntimeFilterDesc;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Representation of a runtime filter. A runtime filter is generated from
+ * an equi-join predicate of the form <lhs_expr> = <rhs_expr>, where lhs_expr is the
+ * expr on which the filter is applied and must be bound by a single tuple id from
+ * the left plan subtree of the associated join node, while rhs_expr is the expr on
+ * which the filter is built and can be bound by any number of tuple ids from the
+ * right plan subtree. Every runtime filter must record the join node that constructs
+ * the filter and the scan nodes that apply the filter (destination nodes).
+ */
+public final class RuntimeFilter {
+ private final static Logger LOG = LogManager.getLogger(RuntimeFilter.class);
+
+ // Identifier of the filter (unique within a query)
+ private final RuntimeFilterId id;
+ // Join node that builds the filter
+ private final HashJoinNode builderNode;
+ // Expr (rhs of join predicate) on which the filter is built
+ private final Expr srcExpr;
+ // The position of expr in the join condition
+ private final int exprOrder;
+ // Expr (lhs of join predicate) from which the targetExprs_ are generated.
+ private final Expr origTargetExpr;
+ // Runtime filter targets
+ private final List<RuntimeFilterTarget> targets = new ArrayList<>();
+ // Slots from base table tuples that have value transfer from the slots
+ // of 'origTargetExpr'. The slots are grouped by tuple id.
+ private final Map<TupleId, List<SlotId>> targetSlotsByTid;
+ // If true, the join node building this filter is executed using a broadcast join;
+ // set in the DistributedPlanner.createHashJoinFragment()
+ private boolean isBroadcastJoin;
+ // Estimate of the number of distinct values that will be inserted into this filter,
+ // globally across all instances of the source node. Used to compute an optimal size
+ // for the filter. A value of -1 means no estimate is available, and default filter
+ // parameters should be used.
+ private long ndvEstimate = -1;
+ // Size of the filter (in Bytes). Should be greater than zero for bloom filters.
+ private long filterSizeBytes = 0;
+ // If true, the filter is produced by a broadcast join and there is at least one
+ // destination scan node which is in the same fragment as the join; set in
+ // DistributedPlanner.createHashJoinFragment().
+ private boolean hasLocalTargets = false;
+ // If true, there is at least one destination scan node which is not in the same
+ // fragment as the join that produced the filter; set in
+ // DistributedPlanner.createHashJoinFragment().
+ private boolean hasRemoteTargets = false;
+ // If set, indicates that the filter can't be assigned to another scan node.
+ // Once set, it can't be unset.
+ private boolean finalized = false;
+ // The type of filter to build.
+ private TRuntimeFilterType runtimeFilterType;
+
+ /**
+ * Internal representation of a runtime filter target.
+ */
+ public static class RuntimeFilterTarget {
+ // Scan node that applies the filter
+ public ScanNode node;
+ // Expr on which the filter is applied
+ public Expr expr;
+ // Indicates if 'expr' is bound only by partition columns
+ public final boolean isBoundByKeyColumns;
+ // Indicates if 'node' is in the same fragment as the join that produces the filter
+ public final boolean isLocalTarget;
+
+ public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr,
+ boolean isBoundByKeyColumns, boolean isLocalTarget) {
+ Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
+ this.node = targetNode;
+ this.expr = targetExpr;
+ this.isBoundByKeyColumns = isBoundByKeyColumns;
+ this.isLocalTarget = isLocalTarget;
+ }
+
+ @Override
+ public String toString() {
+ return "Target Id: " + node.getId() + " " +
+ "Target expr: " + expr.debugString() + " " +
+ "Is only Bound By Key: " + isBoundByKeyColumns +
+ "Is local: " + isLocalTarget;
+ }
+ }
+
+ private RuntimeFilter(RuntimeFilterId filterId, HashJoinNode filterSrcNode, Expr srcExpr, int exprOrder,
+ Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots,
+ TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+ this.id = filterId;
+ this.builderNode = filterSrcNode;
+ this.srcExpr = srcExpr;
+ this.exprOrder = exprOrder;
+ this.origTargetExpr = origTargetExpr;
+ this.targetSlotsByTid = targetSlots;
+ this.runtimeFilterType = type;
+ computeNdvEstimate();
+ calculateFilterSize(filterSizeLimits);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RuntimeFilter)) return false;
+ return ((RuntimeFilter) obj).id.equals(id);
+ }
+
+ @Override
+ public int hashCode() { return id.hashCode(); }
+
+ public void markFinalized() { finalized = true; }
+ public boolean isFinalized() { return finalized; }
+
+ /**
+ * Serializes a runtime filter to Thrift.
+ */
+ public TRuntimeFilterDesc toThrift() {
+ TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc();
+ tFilter.setFilterId(id.asInt());
+ tFilter.setSrcExpr(srcExpr.treeToThrift());
+ tFilter.setExprOrder(exprOrder);
+ tFilter.setIsBroadcastJoin(isBroadcastJoin);
+ tFilter.setHasLocalTargets(hasLocalTargets);
+ tFilter.setHasRemoteTargets(hasRemoteTargets);
+ for (RuntimeFilterTarget target : targets) {
+ tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift());
+ }
+ tFilter.setType(runtimeFilterType);
+ tFilter.setBloomFilterSizeBytes(filterSizeBytes);
+ return tFilter;
+ }
+
+ public List<RuntimeFilterTarget> getTargets() { return targets; }
+ public boolean hasTargets() { return !targets.isEmpty(); }
+ public Expr getSrcExpr() { return srcExpr; }
+ public Expr getOrigTargetExpr() { return origTargetExpr; }
+ public Map<TupleId, List<SlotId>> getTargetSlots() { return targetSlotsByTid; }
+ public RuntimeFilterId getFilterId() { return id; }
+ public TRuntimeFilterType getType() { return runtimeFilterType; }
+ public void setType(TRuntimeFilterType type) { runtimeFilterType = type; }
+ public boolean hasRemoteTargets() { return hasRemoteTargets; }
+ public HashJoinNode getBuilderNode() { return builderNode; }
+
+ /**
+ * Static function to create a RuntimeFilter from 'joinPredicate' that is assigned
+ * to the join node 'filterSrcNode'. Returns an instance of RuntimeFilter
+ * or null if a runtime filter cannot be generated from the specified predicate.
+ */
+ public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer analyzer,
+ Expr joinPredicate, int exprOrder, HashJoinNode filterSrcNode,
+ TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+ Preconditions.checkNotNull(idGen);
+ Preconditions.checkNotNull(joinPredicate);
+ Preconditions.checkNotNull(filterSrcNode);
+ // Only consider binary equality predicates and not contain Null-safe equals.
+ // The predicate could not be pushed down when there is Null-safe equal operator. Because the runtimeFilter
+ // will filter the null value in child[0] while it is needed in the Null-safe equal join.
+ // For example: select * from a join b where a.id<=>b.id
+ // the null value in table a should be return by scan node instead of filtering it by runtimeFilter.
+ if (!Predicate.isUnNullSafeEquivalencePredicate(joinPredicate)) return null;
+
+ BinaryPredicate normalizedJoinConjunct =
+ SingleNodePlanner.getNormalizedEqPred(joinPredicate,
+ filterSrcNode.getChild(0).getTupleIds(),
+ filterSrcNode.getChild(1).getTupleIds(), analyzer);
+ if (normalizedJoinConjunct == null) return null;
+
+ // Ensure that the target expr does not contain TupleIsNull predicates as these
+ // can't be evaluated at a scan node.
+ Expr targetExpr =
+ TupleIsNullPredicate.unwrapExpr(normalizedJoinConjunct.getChild(0).clone());
+ Expr srcExpr = normalizedJoinConjunct.getChild(1);
+
+ if (srcExpr.getType().equals(ScalarType.createHllType())
+ || srcExpr.getType().equals(ScalarType.createType(PrimitiveType.BITMAP))) return null;
+
+ Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, targetExpr);
+ Preconditions.checkNotNull(targetSlots);
+ if (targetSlots.isEmpty()) return null;
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Generating runtime filter from predicate " + joinPredicate);
+ }
+ return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder,
+ targetExpr, targetSlots, type, filterSizeLimits);
+ }
+
+ /**
+ * Returns the ids of base table tuple slots on which a runtime filter expr can be
+ * applied. Due to the existence of equivalence classes, a filter expr may be
+ * applicable at multiple scan nodes. The returned slot ids are grouped by tuple id.
+ * Returns an empty collection if the filter expr cannot be applied at a base table
+ * or if applying the filter might lead to incorrect results.
+ * Returns the slot id of the base table expected to use this target expr.
+ */
+ private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer analyzer, Expr expr) {
+ // 'expr' is not a SlotRef and may contain multiple SlotRefs
+ List<TupleId> tids = new ArrayList<>();
+ List<SlotId> sids = new ArrayList<>();
+ expr.getIds(tids, sids);
+
+ /*
+ If the target expression evaluates to a non-NULL value for outer-join non-matches, then assigning the
+ filter below the nullable side of an outer join may produce incorrect query results.
+ This check is conservative but correct to keep the code simple. In particular, it would otherwise be
+ difficult to identify incorrect runtime filter assignments through outer-joined inline views because
+ the 'expr' has already been fully resolved.
+ TODO(zxy) We rely on the value-transfer graph to check whether 'expr' could potentially be assigned
+ below an outer-joined inline view.
+
+ Queries with the following characteristics may produce wrong results due to an incorrectly assigned
+ runtime filter:
+ 1)The query has an outer join
+ 2)A scan on the nullable side of that outer join has a runtime filter with a NULL-checking
+ expression such as COALESCE/IFNULL/CASE
+ 3)The latter point imples that there is another join above the outer join with a NULL-checking
+ expression in it's join condition
+
+ Reproduction:
+ TPC-DS 1T Benchmarks test
+ "
+ select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk
+ where coalesce(t2.s_store_sk + 100, 100) in (select ifnull(100, s_store_sk) from store);
+
+ select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk
+ where case when t2.s_store_sk is NULL then 100 else t2.s_store_sk end
+ in (select ifnull(100, s_store_sk) from store limit 10);
+ "
+ We expect a count of 0. A count of 1024 is incorrect.
+ Query plan:
+ | 4:HASH JOIN
+ | | join op: LEFT SEMI JOIN (BROADCAST)
+ | | equal join conjunct: coalesce(`t2`.`s_store_sk` + 100, 100) = ifnull(100, `s_store_sk`)
+ | | runtime filters: RF000[in] <- ifnull(100, `s_store_sk`)
+ | | cardinality=1002
+ | |----7:EXCHANGE
+ | 3:HASH JOIN
+ | | join op: LEFT OUTER JOIN
+ | | equal join conjunct: `t1`.`s_store_sk` = `t2`.`s_store_sk`
+ | |----1:OlapScanNode
+ | | TABLE: store
+ | | runtime filters: RF000[in] -> coalesce(`t2`.`s_store_sk` + 100, 100)
+ | 0:OlapScanNode
+ | TABLE: store
+ Explanation:
+ RF000 filters out all rows in scan 01.
+ In join 03 there are no join matches since the right-hand is empty. All rows from the right-hand
+ side are nulled.
+ The join condition in join 04 now satisfies all input rows because every "t2.id" is NULL,
+ so after the COALESCE() the join condition becomes 100 = 100.
+ */
+ if (analyzer.hasOuterJoinedValueTransferTarget(sids)) {
+ // Do not push down when contains NULL-checking expression COALESCE/IFNULL/CASE
+ // TODO(zxy) Returns true if 'p' evaluates to true when all its referenced slots are NULL, returns false
+ // otherwise. Throws if backend expression evaluation fails.
+ if (expr.isContainsFunction("COALESCE") || expr.isContainsFunction("IFNULL")
+ || expr.isContainsClass("org.apache.doris.analysis.CaseExpr"))
+ return Collections.emptyMap();
+ }
+
+ Map<TupleId, List<SlotId>> slotsByTid = new HashMap<>();
+ // We need to iterate over all the slots of 'expr' and check if they have
+ // equivalent slots that are bound by the same base table tuple(s).
+ for (SlotId slotId: sids) {
+ Map<TupleId, List<SlotId>> currSlotsByTid = getBaseTblEquivSlots(analyzer, slotId);
+ if (currSlotsByTid.isEmpty()) return Collections.emptyMap();
+ if (slotsByTid.isEmpty()) {
+ slotsByTid.putAll(currSlotsByTid);
+ continue;
+ }
+
+ // Compute the intersection between tuple ids from 'slotsByTid' and
+ // 'currSlotsByTid'. If the intersection is empty, an empty collection
+ // is returned.
+ Iterator<Map.Entry<TupleId, List<SlotId>>> iter = slotsByTid.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<TupleId, List<SlotId>> entry = iter.next();
+ List<SlotId> slotIds = currSlotsByTid.get(entry.getKey());
+ // Take the intersection of the tuple ids of all slots in expr to
+ // form <tupleid, slotid> and return.
+ // A.a + B.b = C.c, when the tuple IDs of the two slots A.a and B.b are different, at this
+ // time cannot be pushed down, so remove. If you can get A.a and transferd to B.a, then
+ // the tuple IDs of A.a and B.b have intersection B, So target expr is available, the tuple
+ // ID of this intersection is the scan node that is expected to use this runtime fitler
+ if (slotIds == null) {
+ iter.remove();
+ } else {
+ entry.getValue().addAll(slotIds);
+ }
+ }
+ if (slotsByTid.isEmpty()) return Collections.emptyMap();
+ }
+ return slotsByTid;
+ }
+
+ /**
+ * Static function that returns the ids of slots bound by base table tuples for which
+ * there is a value transfer from 'srcSid'. The slots are grouped by tuple id.
+ * That is, srcSid can be calculated from the <tuple id, slot id> of the base table.
+ */
+ private static Map<TupleId, List<SlotId>> getBaseTblEquivSlots(Analyzer analyzer,
+ SlotId srcSid) {
+ Map<TupleId, List<SlotId>> slotsByTid = new HashMap<>();
+ for (SlotId targetSid: analyzer.getValueTransferTargets(srcSid)) {
+ TupleDescriptor tupleDesc = analyzer.getSlotDesc(targetSid).getParent();
+ if (tupleDesc.getTable() == null) continue;
+ List<SlotId> sids = slotsByTid.computeIfAbsent(tupleDesc.getId(), k -> new ArrayList<>());
+ sids.add(targetSid);
+ }
+ return slotsByTid;
+ }
+
+ public Expr getTargetExpr(PlanNodeId targetPlanNodeId) {
+ for (RuntimeFilterTarget target: targets) {
+ if (target.node.getId() != targetPlanNodeId) continue;
+ return target.expr;
+ }
+ return null;
+ }
+
+ /**
+ * Estimates the selectivity of a runtime filter as the cardinality of the
+ * associated source join node over the cardinality of that join node's left
+ * child.
+ */
+ public double getSelectivity() {
+ if (builderNode.getCardinality() == -1
+ || builderNode.getChild(0).getCardinality() == -1
+ || builderNode.getChild(0).getCardinality() == 0) {
+ return -1;
+ }
+ return builderNode.getCardinality() / (double) builderNode.getChild(0).getCardinality();
+ }
+
+ public void addTarget(RuntimeFilterTarget target) { targets.add(target); }
+
+ public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin = isBroadcast; }
+
+ public void computeNdvEstimate() { ndvEstimate = builderNode.getChild(1).getCardinality(); }
+
+ public void extractTargetsPosition() {
+ Preconditions.checkNotNull(builderNode.getFragment());
+ Preconditions.checkState(hasTargets());
+ for (RuntimeFilterTarget target: targets) {
+ Preconditions.checkNotNull(target.node.getFragment());
+ hasLocalTargets = hasLocalTargets || target.isLocalTarget;
+ hasRemoteTargets = hasRemoteTargets || !target.isLocalTarget;
+ }
+ }
+
+ /**
+ * Sets the filter size (in bytes) required for a bloom filter to achieve the
+ * configured maximum false-positive rate based on the expected NDV. Also bounds the
+ * filter size between the max and minimum filter sizes supplied to it by
+ * 'filterSizeLimits'.
+ * Considering that the `IN` filter may be converted to the `Bloom FIlter` when crossing fragments,
+ * the bloom filter size is always calculated.
+ */
+ private void calculateFilterSize(RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+ if (ndvEstimate == -1) {
+ filterSizeBytes = filterSizeLimits.defaultVal;
+ return;
+ }
+ double fpp = FeConstants.default_bloom_filter_fpp;
+ int logFilterSize = GetMinLogSpaceForBloomFilter(ndvEstimate, fpp);
+ filterSizeBytes = 1L << logFilterSize;
+ filterSizeBytes = Math.max(filterSizeBytes, filterSizeLimits.minVal);
+ filterSizeBytes = Math.min(filterSizeBytes, filterSizeLimits.maxVal);
+ }
+
+ /**
+ * Returns the log (base 2) of the minimum number of bytes we need for a Bloom
+ * filter with 'ndv' unique elements and a false positive probability of less
+ * than 'fpp'.
+ */
+ public static int GetMinLogSpaceForBloomFilter(long ndv, double fpp) {
+ if (0 == ndv) return 0;
+ double k = 8; // BUCKET_WORDS
+ // m is the number of bits we would need to get the fpp specified
+ double m = -k * ndv / Math.log(1 - Math.pow(fpp, 1.0 / k));
+
+ // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
+ return Math.max(0, (int)(Math.ceil(Math.log(m / 8)/Math.log(2))));
+ }
+
+ /**
+ * Assigns this runtime filter to the corresponding plan nodes.
+ */
+ public void assignToPlanNodes() {
+ Preconditions.checkState(hasTargets());
+ builderNode.addRuntimeFilter(this);
+ for (RuntimeFilterTarget target: targets) {
+ target.node.addRuntimeFilter(this);
+ // fragment is expected to use this filter id
+ target.node.fragment_.setTargetRuntimeFilterIds(this.id);
+ }
+ }
+
+ public void registerToPlan(Analyzer analyzer) {
+ setIsBroadcast(getBuilderNode().getDistributionMode() == HashJoinNode.DistributionMode.BROADCAST);
+ if (LOG.isTraceEnabled()) LOG.trace("Runtime filter: " + debugString());
+ assignToPlanNodes();
+ analyzer.putAssignedRuntimeFilter(this);
+ getBuilderNode().fragment_.setBuilderRuntimeFilterIds(getFilterId());
+ }
+
+ public String debugString() {
+ return "FilterID: " + id + " " +
+ "Source: " + builderNode.getId() + " " +
+ "SrcExpr: " + getSrcExpr().debugString() + " " +
+ "Target(s): " +
+ Joiner.on(", ").join(targets) + " " +
+ "Selectivity: " + getSelectivity();
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
new file mode 100644
index 0000000..7c53de3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
@@ -0,0 +1,400 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprSubstitutionMap;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.util.BitUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterMode;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class used for generating and assigning runtime filters to a query plan using
+ * runtime filter propagation. Runtime filter propagation is an optimization technique
+ * used to filter scanned tuples or scan ranges based on information collected at
+ * runtime. A runtime filter is constructed during the build phase of a join node, and is
+ * applied at, potentially, multiple scan nodes on the probe side of that join node.
+ * Runtime filters are generated from equal-join predicates but they do not replace the
+ * original predicates.
+ *
+ * MinMax filters are of a fixed size (except for those used for string type) and
+ * therefore only sizes for bloom filters need to be calculated. These calculations are
+ * based on the NDV estimates of the associated table columns, the min buffer size that
+ * can be allocated by the bufferpool, and the query options. Moreover, it is also bound
+ * by the MIN/MAX_BLOOM_FILTER_SIZE limits which are enforced on the query options before
+ * this phase of planning.
+ *
+ * Example: select * from T1, T2 where T1.a = T2.b and T2.c = '1';
+ * Assuming that T1 is a fact table and T2 is a significantly smaller dimension table, a
+ * runtime filter is constructed at the join node between tables T1 and T2 while building
+ * the hash table on the values of T2.b (rhs of the join condition) from the tuples of T2
+ * that satisfy predicate T2.c = '1'. The runtime filter is subsequently sent to the
+ * scan node of table T1 and is applied on the values of T1.a (lhs of the join condition)
+ * to prune tuples of T2 that cannot be part of the join result.
+ */
+public final class RuntimeFilterGenerator {
+ private final static Logger LOG = LogManager.getLogger(RuntimeFilterGenerator.class);
+
+ // Map of base table tuple ids to a list of runtime filters that
+ // can be applied at the corresponding scan nodes.
+ private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid = new HashMap<>();
+
+ // Generator for filter ids
+ private final IdGenerator<RuntimeFilterId> filterIdGenerator = RuntimeFilterId.createGenerator();
+
+ /**
+ * Internal class that encapsulates the max, min and default sizes used for creating
+ * bloom filter objects.
+ */
+ public static class FilterSizeLimits {
+ // Maximum filter size, in bytes, rounded up to a power of two.
+ public final long maxVal;
+
+ // Minimum filter size, in bytes, rounded up to a power of two.
+ public final long minVal;
+
+ // Pre-computed default filter size, in bytes, rounded up to a power of two.
+ public final long defaultVal;
+
+ public FilterSizeLimits(SessionVariable sessionVariable) {
+ // Round up all limits to a power of two
+ long maxLimit = sessionVariable.getRuntimeBloomFilterMaxSize();
+ maxVal = BitUtil.roundUpToPowerOf2(maxLimit);
+
+ long minLimit = sessionVariable.getRuntimeBloomFilterMinSize();
+ // Make sure minVal <= defaultVal <= maxVal
+ minVal = BitUtil.roundUpToPowerOf2(Math.min(minLimit, maxVal));
+
+ long defaultValue = sessionVariable.getRuntimeBloomFilterSize();
+ defaultValue = Math.max(defaultValue, minVal);
+ defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal));
+ }
+ }
+
+ // Contains size limits for bloom filters.
+ private final FilterSizeLimits bloomFilterSizeLimits;
+
+ private final Analyzer analyzer;
+ private final SessionVariable sessionVariable;
+
+ private RuntimeFilterGenerator(Analyzer analyzer) {
+ this.analyzer = analyzer;
+ this.sessionVariable = ConnectContext.get().getSessionVariable();
+ Preconditions.checkNotNull(this.sessionVariable);
+ bloomFilterSizeLimits = new FilterSizeLimits(sessionVariable);
+ }
+
+ /**
+ * Generates and assigns runtime filters to a query plan tree.
+ */
+ public static void generateRuntimeFilters(Analyzer analyzer, PlanNode plan) {
+ Preconditions.checkNotNull(analyzer);
+ int maxNumBloomFilters = ConnectContext.get().getSessionVariable().getRuntimeFiltersMaxNum();
+ int runtimeFilterType = ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ Preconditions.checkState(maxNumBloomFilters >= 0);
+ RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(analyzer);
+ Preconditions.checkState(runtimeFilterType >= 0, "runtimeFilterType not expected");
+ Preconditions.checkState(runtimeFilterType
+ <= Arrays.stream(TRuntimeFilterType.values()).mapToInt(TRuntimeFilterType::getValue).sum()
+ , "runtimeFilterType not expected");
+ filterGenerator.generateFilters(plan);
+ List<RuntimeFilter> filters = filterGenerator.getRuntimeFilters();
+ if (filters.size() > maxNumBloomFilters) {
+ // If more than 'maxNumBloomFilters' were generated, sort them by increasing
+ // selectivity and keep the 'maxNumBloomFilters' most selective bloom filters.
+ filters.sort((a, b) -> {
+ double aSelectivity =
+ a.getSelectivity() == -1 ? Double.MAX_VALUE : a.getSelectivity();
+ double bSelectivity =
+ b.getSelectivity() == -1 ? Double.MAX_VALUE : b.getSelectivity();
+ return Double.compare(aSelectivity, bSelectivity);
+ });
+ }
+ // We only enforce a limit on the number of bloom filters as they are much more
+ // heavy-weight than the other filter types.
+ int numBloomFilters = 0;
+ for (RuntimeFilter filter: filters) {
+ filter.extractTargetsPosition();
+ // When there is a remote target, the producer and consumer of the filter are not in the same fragment at
+ // this time, and the filter build by the producer needs to be merged. Currently, the IN filter has
+ // no merge logic, so replace it with Bloom Filter.
+ // The reason for this is that in the IN pushdown implemented by early Doris, when OlapScanNode and
+ // HashJoinNode are not in the same fragment, the IN filter will be pushed down to the nearest
+ // ExchangeNode, so that although it cannot be pushed down to the storage engine to improve performance,
+ // In some extreme cases, the number of rows in the hash table constructed by HashJoinNode can be reduced,
+ // thereby avoiding OOM. To cover the previous case (from tpcds 1T query 17), replace IN with Bloom Filter.
+ // Only when no Bloom Filter is generated, will IN be converted to Bloom Filter and pushed down.
+ if (filter.getType() == TRuntimeFilterType.IN && filter.hasRemoteTargets()) {
+ if ((runtimeFilterType & TRuntimeFilterType.BLOOM.getValue()) == 0) {
+ filter.setType(TRuntimeFilterType.BLOOM);
+ } else {
+ continue;
+ }
+ }
+ if (filter.getType() == TRuntimeFilterType.BLOOM) {
+ if (numBloomFilters >= maxNumBloomFilters) continue;
+ ++numBloomFilters;
+ }
+ filter.registerToPlan(analyzer);
+ }
+ }
+
+ /**
+ * Returns a list of all the registered runtime filters, ordered by filter ID.
+ */
+ public List<RuntimeFilter> getRuntimeFilters() {
+ Set<RuntimeFilter> resultSet = new HashSet<>();
+ for (List<RuntimeFilter> filters: runtimeFiltersByTid.values()) {
+ resultSet.addAll(filters);
+ }
+ List<RuntimeFilter> resultList = Lists.newArrayList(resultSet);
+ resultList.sort((a, b) -> a.getFilterId().compareTo(b.getFilterId()));
+ return resultList;
+ }
+
+ /**
+ * Generates the runtime filters for a query by recursively traversing the distributed
+ * plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate
+ * runtime filters are generated from equi-join predicates assigned to hash-join nodes.
+ * In the bottom-up traversal of the plan tree, the filters are assigned to destination
+ * (scan) nodes. Filters that cannot be assigned to a scan node are discarded.
+ */
+ private void generateFilters(PlanNode root) {
+ if (root instanceof HashJoinNode) {
+ HashJoinNode joinNode = (HashJoinNode) root;
+ List<Expr> joinConjuncts = new ArrayList<>();
+ // It's not correct to push runtime filters to the left side of a left outer,
+ // full outer or anti join if the filter corresponds to an equi-join predicate
+ // from the ON clause.
+ if (!joinNode.getJoinOp().isLeftOuterJoin()
+ && !joinNode.getJoinOp().isFullOuterJoin()
+ && !joinNode.getJoinOp().isAntiJoin()) {
+ joinConjuncts.addAll(joinNode.getEqJoinConjuncts());
+ }
+
+ // TODO(zxy) supports PlanNode.conjuncts generate runtime filter.
+ // PlanNode.conjuncts (call joinNode.getConjuncts() here) Different from HashJoinNode.eqJoinConjuncts
+ // and HashJoinNode.otherJoinConjuncts.
+ // In previous tests, it was found that using PlanNode.conjuncts to generate runtimeFilter may cause
+ // incorrect results. For example, When the `in` subquery is converted to join, the join conjunct will be
+ // saved in PlanNode.conjuncts. At this time, using the automatically generated join conjunct to generate
+ // a runtimeFilter, some rows may be missing in the result.
+ // SQL: select * from T as a where k1 = (select count(1) from T as b where a.k1 = b.k1);
+ // Table T has only one INT column. At this time, `a.k1 = b.k1` is in eqJoinConjuncts,
+ // `k1` = ifnull(xxx) is in conjuncts, the runtimeFilter generated according to conjuncts will cause
+ // the result to be empty, but the actual result should have data returned.
+
+ List<RuntimeFilter> filters = new ArrayList<>();
+ // Actually all types of Runtime Filter objects generated by the same joinConjunct have the same
+ // properties except ID. Maybe consider avoiding repeated generation
+ for (TRuntimeFilterType type : TRuntimeFilterType.values()) {
+ if ((sessionVariable.getRuntimeFilterType() & type.getValue()) == 0) continue;
+ for (int i = 0; i < joinConjuncts.size(); i++) {
+ Expr conjunct = joinConjuncts.get(i);
+ RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
+ analyzer, conjunct, i, joinNode, type, bloomFilterSizeLimits);
+ if (filter == null) continue;
+ registerRuntimeFilter(filter);
+ filters.add(filter);
+ }
+ }
+ generateFilters(root.getChild(0));
+ // Finalize every runtime filter of that join. This is to ensure that we don't
+ // assign a filter to a scan node from the right subtree of joinNode or ancestor
+ // join nodes in case we don't find a destination node in the left subtree.
+ for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter);
+ generateFilters(root.getChild(1));
+ } else if (root instanceof ScanNode) {
+ assignRuntimeFilters((ScanNode) root);
+ } else {
+ for (PlanNode childNode: root.getChildren()) {
+ generateFilters(childNode);
+ }
+ }
+ }
+
+ /**
+ * Registers a runtime filter with the tuple id of every scan node that is a candidate
+ * destination node for that filter.
+ */
+ private void registerRuntimeFilter(RuntimeFilter filter) {
+ Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots();
+ Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty());
+ for (TupleId tupleId: targetSlotsByTid.keySet()) {
+ registerRuntimeFilter(filter, tupleId);
+ }
+ }
+
+ /**
+ * Registers a runtime filter with a specific target tuple id.
+ */
+ private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) {
+ Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+ List<RuntimeFilter> filters = runtimeFiltersByTid.computeIfAbsent(targetTid, k -> new ArrayList<>());
+ Preconditions.checkState(!filter.isFinalized());
+ filters.add(filter);
+ }
+
+ /**
+ * Finalizes a runtime filter by disassociating it from all the candidate target scan
+ * nodes that haven't been used as destinations for that filter. Also sets the
+ * finalized flag of that filter so that it can't be assigned to any other scan nodes.
+ */
+ private void finalizeRuntimeFilter(RuntimeFilter runtimeFilter) {
+ Set<TupleId> targetTupleIds = new HashSet<>();
+ for (RuntimeFilter.RuntimeFilterTarget target: runtimeFilter.getTargets()) {
+ targetTupleIds.addAll(target.node.getTupleIds());
+ }
+ for (TupleId tupleId: runtimeFilter.getTargetSlots().keySet()) {
+ if (!targetTupleIds.contains(tupleId)) {
+ runtimeFiltersByTid.get(tupleId).remove(runtimeFilter);
+ }
+ }
+ runtimeFilter.markFinalized();
+ }
+
+ /**
+ * Assigns runtime filters to a specific scan node 'scanNode'.
+ * The assigned filters are the ones for which 'scanNode' can be used as a destination
+ * node. The following constraints are enforced when assigning filters to 'scanNode':
+ * 1. If the RUNTIME_FILTER_MODE query option is set to LOCAL, a filter is only assigned
+ * to 'scanNode' if the filter is produced within the same fragment that contains the
+ * scan node.
+ * 2. Only olap scan nodes are supported:
+ */
+ private void assignRuntimeFilters(ScanNode scanNode) {
+ if (!(scanNode instanceof OlapScanNode)) return;
+ TupleId tid = scanNode.getTupleIds().get(0);
+ if (!runtimeFiltersByTid.containsKey(tid)) return;
+ String runtimeFilterMode = sessionVariable.getRuntimeFilterMode();
+ Preconditions.checkState(Arrays.stream(TRuntimeFilterMode.values()).map(Enum::name).anyMatch(
+ p -> p.equals(runtimeFilterMode.toUpperCase())), "runtimeFilterMode not expected");
+ for (RuntimeFilter filter: runtimeFiltersByTid.get(tid)) {
+ if (filter.isFinalized()) continue;
+ Expr targetExpr = computeTargetExpr(filter, tid);
+ if (targetExpr == null) continue;
+ boolean isBoundByKeyColumns = isBoundByKeyColumns(analyzer, targetExpr, scanNode);
+ boolean isLocalTarget = isLocalTarget(filter, scanNode);
+ if (runtimeFilterMode.equals(TRuntimeFilterMode.LOCAL.name()) && !isLocalTarget) continue;
+ if (runtimeFilterMode.equals(TRuntimeFilterMode.REMOTE.name()) && isLocalTarget) continue;
+
+ RuntimeFilter.RuntimeFilterTarget target = new RuntimeFilter.RuntimeFilterTarget(
+ scanNode, targetExpr, isBoundByKeyColumns, isLocalTarget);
+ filter.addTarget(target);
+ }
+ }
+
+ /**
+ * Check if 'targetNode' is local to the source node of 'filter'.
+ */
+ private static boolean isLocalTarget(RuntimeFilter filter, ScanNode targetNode) {
+ return targetNode.getFragment().getId().equals(filter.getBuilderNode().getFragment().getId());
+ }
+
+ /**
+ * Check if all the slots of'targetExpr' is key.
+ */
+ private static boolean isBoundByKeyColumns(Analyzer analyzer, Expr targetExpr, ScanNode targetNode) {
+ Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
+ List<SlotId> sids = new ArrayList<>();
+ targetExpr.getIds(null, sids);
+ for (SlotId sid : sids) {
+ // Take slotDesc from the desc of targetExpr the same
+ SlotDescriptor slotDesc = analyzer.getSlotDesc(sid);
+ if (slotDesc.getColumn() == null || !slotDesc.getColumn().isKey()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Computes the target expr for a specified runtime filter 'filter' to be applied at
+ * the scan node with target tuple descriptor 'targetTid'.
+ */
+ private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid) {
+ Expr targetExpr = filter.getOrigTargetExpr();
+ // if there is a subquery on the left side of join, in order to push to scan in the subquery,
+ // targetExpr will return false as long as there is a slotref parent node that is not targetTid.
+ // But when this slotref can be transferred to the targetTid slot, such as Aa + Bb = Cc,
+ // targetTid is B, if Aa can be transferred to Ba, that is, Aa and Ba are equivalent columns,
+ // then replace Aa with Ba, and then calculate for targetTid targetExpr
+ if (!targetExpr.isBound(targetTid)) {
+ Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+ // Modify the filter target expr using the equivalent slots from the scan node
+ // on which the filter will be applied.
+ ExprSubstitutionMap smap = new ExprSubstitutionMap();
+ List<SlotRef> exprSlots = new ArrayList<>();
+ // Get the ids of all slotRef children of targetExpr, which is equal to the deduplication of
+ // all slots of targetSlotsByTid.
+ targetExpr.collect(SlotRef.class, exprSlots);
+ // targetExpr specifies the id of the slotRef node in the `tupleID`
+ List<SlotId> sids = filter.getTargetSlots().get(targetTid);
+ for (SlotRef slotRef: exprSlots) {
+ for (SlotId sid: sids) {
+ if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) {
+ SlotRef newSlotRef = new SlotRef(analyzer.getSlotDesc(sid));
+ newSlotRef.analyzeNoThrow(analyzer);
+ smap.put(slotRef, newSlotRef);
+ break;
+ }
+ }
+ }
+ Preconditions.checkState(exprSlots.size() == smap.size());
+ try {
+ targetExpr = targetExpr.substitute(smap, analyzer, false);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ Type srcType = filter.getSrcExpr().getType();
+ // Types of targetExpr and srcExpr must be exactly the same since runtime filters are
+ // based on hashing.
+ if (!targetExpr.getType().equals(srcType)) {
+ try {
+ targetExpr = targetExpr.castTo(srcType);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ return targetExpr;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java
new file mode 100644
index 0000000..c4339b4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.common.Id;
+import org.apache.doris.common.IdGenerator;
+
+public class RuntimeFilterId extends Id<RuntimeFilterId> {
+ // Construction only allowed via an IdGenerator.
+ protected RuntimeFilterId(int id) {
+ super(id);
+ }
+
+ public static IdGenerator<RuntimeFilterId> createGenerator() {
+ return new IdGenerator<RuntimeFilterId>() {
+ @Override
+ public RuntimeFilterId getNextId() {
+ return new RuntimeFilterId(nextId_++);
+ }
+
+ @Override
+ public RuntimeFilterId getMaxId() {
+ return new RuntimeFilterId(nextId_ - 1);
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return String.format("RF%03d", id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ public int compareTo(RuntimeFilterId cmp) {
+ return Integer.compare(id, cmp.id);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 2a71437..e039881 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -55,6 +55,8 @@ abstract public class ScanNode extends PlanNode {
return result;
}
+ public TupleDescriptor getTupleDesc() { return desc; }
+
public void setColumnFilters(Map<String, PartitionColumnFilter> columnFilters) {
this.columnFilters = columnFilters;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index ffd8cc3..9dcadd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -2202,4 +2202,35 @@ public class SingleNodePlanner {
}
return analyzer.getUnassignedConjuncts(tupleIds);
}
+
+ /**
+ * Returns a normalized version of a binary equality predicate 'expr' where the lhs
+ * child expr is bound by some tuple in 'lhsTids' and the rhs child expr is bound by
+ * some tuple in 'rhsTids'. Returns 'expr' if this predicate is already normalized.
+ * Returns null in any of the following cases:
+ * 1. It is not an equality predicate
+ * 2. One of the operands is a constant
+ * 3. Both children of this predicate are the same expr
+ * The so-called normalization is to ensure that the above conditions are met, and then
+ * to ensure that the order of expr is consistent with the order of node
+ */
+ public static BinaryPredicate getNormalizedEqPred(Expr expr, List<TupleId> lhsTids,
+ List<TupleId> rhsTids, Analyzer analyzer) {
+ if (!(expr instanceof BinaryPredicate)) return null;
+ BinaryPredicate pred = (BinaryPredicate) expr;
+ if (!pred.getOp().isEquivalence()) {
+ return null;
+ }
+ if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) return null;
+
+ // Use the child that contains lhsTids as lhsExpr, for example, A join B on B.k = A.k,
+ // where lhsExpr=A.k, rhsExpr=B.k, changed the order, A.k = B.k
+ Expr lhsExpr = Expr.getFirstBoundChild(pred, lhsTids);
+ Expr rhsExpr = Expr.getFirstBoundChild(pred, rhsTids);
+ if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) return null;
+
+ BinaryPredicate result = new BinaryPredicate(pred.getOp(), lhsExpr, rhsExpr);
+ result.analyzeNoThrow(analyzer);
+ return result;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 10ae31f..11375f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -48,6 +48,8 @@ import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ResultSink;
+import org.apache.doris.planner.RuntimeFilter;
+import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.UnionNode;
@@ -72,6 +74,8 @@ import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TRuntimeFilterParams;
+import org.apache.doris.thrift.TRuntimeFilterTargetParams;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRangeParams;
@@ -201,6 +205,17 @@ public class Coordinator {
// parallel execute
private final TUniqueId nextInstanceId;
+ // Runtime filter merge instance address and ID
+ public TNetworkAddress runtimeFilterMergeAddr;
+ public TUniqueId runtimeFilterMergeInstanceId;
+ // Runtime filter ID to the target instance address of the fragment,
+ // that is expected to use this runtime filter, the instance address is not repeated
+ public Map<RuntimeFilterId, List<FRuntimeFilterTargetParam>> ridToTargetParam = Maps.newHashMap();
+ // The runtime filter that expects the instance to be used
+ public List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
+ // Runtime filter ID to the builder instance number
+ public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
+
// Used for query/insert
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.isBlockQuery = planner.isBlockQuery();
@@ -224,6 +239,7 @@ public class Coordinator {
this.nextInstanceId = new TUniqueId();
nextInstanceId.setHi(queryId.hi);
nextInstanceId.setLo(queryId.lo + 1);
+ this.assignedRuntimeFilters = analyzer.getAssignedRuntimeFilter();
}
// Used for broker load task/export task coordinator
@@ -804,6 +820,9 @@ public class Coordinator {
}
}
+ // assign runtime filter merge addr and target addr
+ assignRuntimeFilterAddr();
+
// compute destinations and # senders per exchange node
// (the root fragment doesn't have a destination)
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
@@ -1117,6 +1136,31 @@ public class Coordinator {
}
}
+ // Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship
+ // between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter
+ private void assignRuntimeFilterAddr() throws Exception {
+ for (PlanFragment fragment: fragments) {
+ FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());
+ // Transform <fragment, runtimeFilterId> to <runtimeFilterId, fragment>
+ for (RuntimeFilterId rid: fragment.getTargetRuntimeFilterIds()) {
+ List<FRuntimeFilterTargetParam> targetFragments =
+ ridToTargetParam.computeIfAbsent(rid, k -> new ArrayList<>());
+ for (final FInstanceExecParam instance : params.instanceExecParams) {
+ targetFragments.add(new FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host)));
+ }
+ }
+
+ for (RuntimeFilterId rid: fragment.getBuilderRuntimeFilterIds()) {
+ ridToBuilderNum.merge(rid, params.instanceExecParams.size(), Integer::sum);
+ }
+ }
+ // Use the uppermost fragment as a merged node, the uppermost fragment has one and only one instance
+ FragmentExecParams uppermostParams = fragmentExecParamsMap.get(fragments.get(0).getFragmentId());
+ runtimeFilterMergeAddr = toBrpcHost(uppermostParams.instanceExecParams.get(0).host);
+ runtimeFilterMergeInstanceId = uppermostParams.instanceExecParams.get(0).instanceId;
+ }
+
+ // One fragment could only have one HashJoinNode
private boolean isColocateJoin(PlanNode node) {
// TODO(cmy): some internal process, such as broker load task, do not have ConnectContext.
// Any configurations needed by the Coordinator should be passed in Coordinator initialization.
@@ -1957,6 +2001,24 @@ public class Coordinator {
params.setQueryOptions(queryOptions);
params.params.setSendQueryStatisticsWithEveryBatch(
fragment.isTransferQueryStatisticsWithEveryBatch());
+ params.params.setRuntimeFilterParams(new TRuntimeFilterParams());
+ params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
+ if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
+ for (Map.Entry<RuntimeFilterId, List<FRuntimeFilterTargetParam>> entry: ridToTargetParam.entrySet()) {
+ List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
+ for (FRuntimeFilterTargetParam targetParam: entry.getValue()) {
+ targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
+ targetParam.targetFragmentInstanceAddr));
+ }
+ params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(), targetParams);
+ }
+ for (Map.Entry<RuntimeFilterId, Integer> entry: ridToBuilderNum.entrySet()) {
+ params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(entry.getKey().asInt(), entry.getValue());
+ }
+ for (RuntimeFilter rf: assignedRuntimeFilters) {
+ params.params.runtime_filter_params.putToRidToRuntimeFilter(rf.getFilterId().asInt(), rf.toThrift());
+ }
+ }
if (queryOptions.getQueryType() == TQueryType.LOAD) {
LoadErrorHub.Param param = Catalog.getCurrentCatalog().getLoadInstance().getLoadErrorHubInfo();
if (param != null) {
@@ -1966,7 +2028,6 @@ public class Coordinator {
}
}
}
-
paramsList.add(params);
}
return paramsList;
@@ -2089,6 +2150,17 @@ public class Coordinator {
fragmentProfile.get(backendExecState.profileFragmentId).addChild(backendExecState.profile);
}
}
+
+ // Runtime filter target fragment instance param
+ static class FRuntimeFilterTargetParam {
+ public TUniqueId targetFragmentInstanceId;;
+ public TNetworkAddress targetFragmentInstanceAddr;
+
+ public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
+ this.targetFragmentInstanceId = id;
+ this.targetFragmentInstanceAddr = host;
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
new file mode 100644
index 0000000..8806263
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used for encoding and decoding of session variable runtime_filter_type
+ */
+public class RuntimeFilterTypeHelper {
+ private static final Logger LOG = LogManager.getLogger(RuntimeFilterTypeHelper.class);
+
+ public final static long ALLOWED_MASK = (TRuntimeFilterType.IN.getValue() |
+ TRuntimeFilterType.BLOOM.getValue() | TRuntimeFilterType.MIN_MAX.getValue());
+
+ private final static Map<String, Long> varValueSet = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+
+ static {
+ varValueSet.put("IN", (long) TRuntimeFilterType.IN.getValue());
+ varValueSet.put("BLOOM_FILTER", (long) TRuntimeFilterType.BLOOM.getValue());
+ varValueSet.put("MIN_MAX", (long) TRuntimeFilterType.MIN_MAX.getValue());
+ }
+
+ // convert long type variable value to string type that user can read
+ public static String decode(Long varValue) throws DdlException {
+ // 0 parse to empty string
+ if (varValue == 0) {
+ return "";
+ }
+ if ((varValue & ~ALLOWED_MASK) != 0) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, varValue);
+ }
+
+ List<String> names = new ArrayList<String>();
+ for (Map.Entry<String, Long> value : getSupportedVarValue().entrySet()) {
+ if ((varValue & value.getValue()) != 0) {
+ names.add(value.getKey());
+ }
+ }
+
+ return Joiner.on(',').join(names);
+ }
+
+ // convert string type variable value to long type that session can store
+ public static Long encode(String varValue) throws DdlException {
+ List<String> names = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(varValue);
+
+ // empty string parse to 0
+ long resultCode = 0;
+ for (String key : names) {
+ long code = 0;
+ if (StringUtils.isNumeric(key)) {
+ code |= Long.parseLong(key);
+ } else {
+ code = getCodeFromString(key);
+ if (code == 0) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, key);
+ }
+ }
+ resultCode |= code;
+ if ((resultCode & ~ALLOWED_MASK) != 0) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, key);
+ }
+ }
+ return resultCode;
+ }
+
+ // check if this variable value is supported
+ public static boolean isSupportedVarValue(String varValue) {
+ return varValue != null && getSupportedVarValue().containsKey(varValue);
+ }
+
+ // encode variable value from string to long
+ private static long getCodeFromString(String varValue) {
+ long code = 0;
+ if (isSupportedVarValue(varValue)) {
+ code |= getSupportedVarValue().get(varValue);
+ }
+ return code;
+ }
+
+ public static Map<String, Long> getSupportedVarValue() {
+ return varValueSet;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3146ae3..0dcae49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -115,6 +115,24 @@ public class SessionVariable implements Serializable, Writable {
// when true, the partition column must be set to NOT NULL.
public static final String ALLOW_PARTITION_COLUMN_NULLABLE = "allow_partition_column_nullable";
+ // runtime filter run mode
+ public static final String RUNTIME_FILTER_MODE = "runtime_filter_mode";
+ // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will
+ // be rounded up to the nearest power of two.
+ public static final String RUNTIME_BLOOM_FILTER_SIZE = "runtime_bloom_filter_size";
+ // Minimum runtime bloom filter size, in bytes
+ public static final String RUNTIME_BLOOM_FILTER_MIN_SIZE = "runtime_bloom_filter_min_size";
+ // Maximum runtime bloom filter size, in bytes
+ public static final String RUNTIME_BLOOM_FILTER_MAX_SIZE = "runtime_bloom_filter_max_size";
+ // Time in ms to wait until runtime filters are delivered.
+ public static final String RUNTIME_FILTER_WAIT_TIME_MS = "runtime_filter_wait_time_ms";
+ // Maximum number of bloom runtime filters allowed per query
+ public static final String RUNTIME_FILTERS_MAX_NUM = "runtime_filters_max_num";
+ // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
+ public static final String RUNTIME_FILTER_TYPE = "runtime_filter_type";
+ // if the right table is greater than this value in the hash join, we will ignore IN filter
+ public static final String RUNTIME_FILTER_MAX_IN_NUM = "runtime_filter_max_in_num";
+
// max ms to wait transaction publish finish when exec insert stmt.
public static final String INSERT_VISIBLE_TIMEOUT_MS = "insert_visible_timeout_ms";
@@ -310,6 +328,23 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true)
public boolean extractWideRangeExpr = true;
+ @VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE)
+ private String runtimeFilterMode = "GLOBAL";
+ @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_SIZE)
+ private int runtimeBloomFilterSize = 2097152;
+ @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_MIN_SIZE)
+ private int runtimeBloomFilterMinSize = 1048576;
+ @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_MAX_SIZE)
+ private int runtimeBloomFilterMaxSize = 16777216;
+ @VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS)
+ private int runtimeFilterWaitTimeMs = 1000;
+ @VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM)
+ private int runtimeFiltersMaxNum = 10;
+ // Set runtimeFilterType to IN filter
+ @VariableMgr.VarAttr(name = RUNTIME_FILTER_TYPE)
+ private int runtimeFilterType = 1;
+ @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM)
+ private int runtimeFilterMaxInNum = 1024;
public long getMaxExecMemByte() {
return maxExecMemByte;
@@ -585,6 +620,70 @@ public class SessionVariable implements Serializable, Writable {
return allowPartitionColumnNullable;
}
+ public String getRuntimeFilterMode() {
+ return runtimeFilterMode;
+ }
+
+ public void setRuntimeFilterMode(String runtimeFilterMode) {
+ this.runtimeFilterMode = runtimeFilterMode;
+ }
+
+ public int getRuntimeBloomFilterSize() {
+ return runtimeBloomFilterSize;
+ }
+
+ public void setRuntimeBloomFilterSize(int runtimeBloomFilterSize) {
+ this.runtimeBloomFilterSize = runtimeBloomFilterSize;
+ }
+
+ public int getRuntimeBloomFilterMinSize() {
+ return runtimeBloomFilterMinSize;
+ }
+
+ public void setRuntimeBloomFilterMinSize(int runtimeBloomFilterMinSize) {
+ this.runtimeBloomFilterMinSize = runtimeBloomFilterMinSize;
+ }
+
+ public int getRuntimeBloomFilterMaxSize() {
+ return runtimeBloomFilterMaxSize;
+ }
+
+ public void setRuntimeBloomFilterMaxSize(int runtimeBloomFilterMaxSize) {
+ this.runtimeBloomFilterMaxSize = runtimeBloomFilterMaxSize;
+ }
+
+ public int getRuntimeFilterWaitTimeMs() {
+ return runtimeFilterWaitTimeMs;
+ }
+
+ public void setRuntimeFilterWaitTimeMs(int runtimeFilterWaitTimeMs) {
+ this.runtimeFilterWaitTimeMs = runtimeFilterWaitTimeMs;
+ }
+
+ public int getRuntimeFiltersMaxNum() {
+ return runtimeFiltersMaxNum;
+ }
+
+ public void setRuntimeFiltersMaxNum(int runtimeFiltersMaxNum) {
+ this.runtimeFiltersMaxNum = runtimeFiltersMaxNum;
+ }
+
+ public int getRuntimeFilterType() {
+ return runtimeFilterType;
+ }
+
+ public void setRuntimeFilterType(int runtimeFilterType) {
+ this.runtimeFilterType = runtimeFilterType;
+ }
+
+ public int getRuntimeFilterMaxInNum() {
+ return runtimeFilterMaxInNum;
+ }
+
+ public void setRuntimeFilterMaxInNum(int runtimeFilterMaxInNum) {
+ this.runtimeFilterMaxInNum = runtimeFilterMaxInNum;
+ }
+
public long getInsertVisibleTimeoutMs() {
if (insertVisibleTimeoutMs < MIN_INSERT_VISIBLE_TIMEOUT_MS) {
return MIN_INSERT_VISIBLE_TIMEOUT_MS;
@@ -658,6 +757,9 @@ public class SessionVariable implements Serializable, Writable {
tResult.setEnableSpilling(enableSpilling);
tResult.setEnableEnableExchangeNodeParallelMerge(enableExchangeNodeParallelMerge);
+
+ tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs);
+ tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum);
return tResult;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index e48811a..b4c85e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -156,42 +156,44 @@ public class VariableMgr {
// Set value to a variable
private static boolean setValue(Object obj, Field field, String value) throws DdlException {
VarAttr attr = field.getAnnotation(VarAttr.class);
- String convertedVal = VariableVarConverters.convert(attr.name(), value);
+ if (VariableVarConverters.hasConverter(attr.name())) {
+ value = VariableVarConverters.encode(attr.name(), value).toString();
+ }
try {
switch (field.getType().getSimpleName()) {
case "boolean":
- if (convertedVal.equalsIgnoreCase("ON")
- || convertedVal.equalsIgnoreCase("TRUE")
- || convertedVal.equalsIgnoreCase("1")) {
+ if (value.equalsIgnoreCase("ON")
+ || value.equalsIgnoreCase("TRUE")
+ || value.equalsIgnoreCase("1")) {
field.setBoolean(obj, true);
- } else if (convertedVal.equalsIgnoreCase("OFF")
- || convertedVal.equalsIgnoreCase("FALSE")
- || convertedVal.equalsIgnoreCase("0")) {
+ } else if (value.equalsIgnoreCase("OFF")
+ || value.equalsIgnoreCase("FALSE")
+ || value.equalsIgnoreCase("0")) {
field.setBoolean(obj, false);
} else {
throw new IllegalAccessException();
}
break;
case "byte":
- field.setByte(obj, Byte.valueOf(convertedVal));
+ field.setByte(obj, Byte.valueOf(value));
break;
case "short":
- field.setShort(obj, Short.valueOf(convertedVal));
+ field.setShort(obj, Short.valueOf(value));
break;
case "int":
- field.setInt(obj, Integer.valueOf(convertedVal));
+ field.setInt(obj, Integer.valueOf(value));
break;
case "long":
- field.setLong(obj, Long.valueOf(convertedVal));
+ field.setLong(obj, Long.valueOf(value));
break;
case "float":
- field.setFloat(obj, Float.valueOf(convertedVal));
+ field.setFloat(obj, Float.valueOf(value));
break;
case "double":
- field.setDouble(obj, Double.valueOf(convertedVal));
+ field.setDouble(obj, Double.valueOf(value));
break;
case "String":
- field.set(obj, convertedVal);
+ field.set(obj, value);
break;
default:
// Unsupported type variable.
@@ -496,12 +498,12 @@ public class VariableMgr {
row.add(getValue(ctx.getObj(), ctx.getField()));
}
- if (row.size() > 1 && row.get(0).equalsIgnoreCase(SessionVariable.SQL_MODE)) {
+ if (row.size() > 1 && VariableVarConverters.hasConverter(row.get(0))) {
try {
- row.set(1, SqlModeHelper.decode(Long.valueOf(row.get(1))));
+ row.set(1, VariableVarConverters.decode(row.get(0), Long.valueOf(row.get(1))));
} catch (DdlException e) {
row.set(1, "");
- LOG.warn("Decode sql mode failed");
+ LOG.warn("Decode session variable failed");
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
index 26d7dae..fcc06d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
@@ -20,5 +20,7 @@ package org.apache.doris.qe;
import org.apache.doris.common.DdlException;
public interface VariableVarConverterI {
- public String convert(String value) throws DdlException;
+ public Long encode(String value) throws DdlException;
+
+ public String decode(Long value) throws DdlException;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java
index 39b6155..8cecda1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java
@@ -22,23 +22,46 @@ import org.apache.doris.common.DdlException;
import java.util.Map;
-// Helper class to drives the convert of session variables according to the converters.
-// You can define your converter that implements interface VariableVarConverterI in here.
-// Each converter should put in map (variable name -> converters) and only converts the variable
-// with specified name.
+/**
+ * Helper class to drives the convert of session variables according to the converters.
+ * You can define your converter that implements interface VariableVarConverterI in here.
+ * Each converter should put in map (variable name -> converters) and only converts the variable with specified name.
+ *
+ * The converted session variable is a special kind of variable.
+ * It's real type is int, so for example, when querying `select @@sql_mode`, the return column
+ * type is "int".
+ * But user usually set this variable by string, such as:
+ * `set @@sql_mode = 'STRICT_TRANS_TABLES'`
+ * or
+ * `set @@sql_mode = concat(@@sql_mode, 'STRICT_TRANS_TABLES')'`
+ */
public class VariableVarConverters {
public static final Map<String, VariableVarConverterI> converters = Maps.newHashMap();
+
static {
SqlModeConverter sqlModeConverter = new SqlModeConverter();
converters.put(SessionVariable.SQL_MODE, sqlModeConverter);
+ RuntimeFilterTypeConverter runtimeFilterTypeConverter = new RuntimeFilterTypeConverter();
+ converters.put(SessionVariable.RUNTIME_FILTER_TYPE, runtimeFilterTypeConverter);
+ }
+
+ public static Boolean hasConverter(String varName) {
+ return converters.containsKey(varName);
}
- public static String convert(String varName, String value) throws DdlException {
+ public static Long encode(String varName, String value) throws DdlException {
if (converters.containsKey(varName)) {
- return converters.get(varName).convert(value);
+ return converters.get(varName).encode(value);
}
- return value;
+ return 0L;
+ }
+
+ public static String decode(String varName, Long value) throws DdlException {
+ if (converters.containsKey(varName)) {
+ return converters.get(varName).decode(value);
+ }
+ return "";
}
/* Converters */
@@ -46,8 +69,26 @@ public class VariableVarConverters {
// Converter to convert sql mode variable
public static class SqlModeConverter implements VariableVarConverterI {
@Override
- public String convert(String value) throws DdlException {
- return SqlModeHelper.encode(value).toString();
+ public Long encode(String value) throws DdlException {
+ return SqlModeHelper.encode(value);
+ }
+
+ @Override
+ public String decode(Long value) throws DdlException {
+ return SqlModeHelper.decode(value);
+ }
+ }
+
+ // Converter to convert runtime filter type variable
+ public static class RuntimeFilterTypeConverter implements VariableVarConverterI {
+ @Override
+ public Long encode(String value) throws DdlException {
+ return RuntimeFilterTypeHelper.encode(value);
+ }
+
+ @Override
+ public String decode(Long value) throws DdlException {
+ return RuntimeFilterTypeHelper.decode(value);
}
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 4aac012..5aac2db 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.thrift.TRuntimeFilterMode;
import org.apache.doris.utframe.UtFrameUtils;
import com.google.common.collect.Lists;
@@ -1317,6 +1318,84 @@ public class QueryPlanTest {
}
@Test
+ public void testRuntimeFilterMode() throws Exception {
+ connectContext.setDatabase("default_cluster:test");
+
+ String queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 = t2.k1";
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "LOCAL");
+ String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filter"));
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "REMOTE");
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertFalse(explainString.contains("runtime filter"));
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "OFF");
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertFalse(explainString.contains("runtime filter"));
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL");
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filter"));
+
+ queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 <=> t2.k1";
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "LOCAL");
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertFalse(explainString.contains("runtime filter"));
+
+ queryStr = "explain select * from jointest as a where k1 = (select count(1) from jointest as b where a.k1 = b.k1);";
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL");
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ System.out.println(explainString);
+ Assert.assertFalse(explainString.contains("runtime filter"));
+ }
+
+ @Test
+ public void testRuntimeFilterType() throws Exception {
+ connectContext.setDatabase("default_cluster:test");
+ String queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 = t2.k1";
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL");
+
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 0);
+ String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertFalse(explainString.contains("runtime filter"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 1);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 2);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 3);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 4);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[min_max] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[min_max] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 5);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 6);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] <- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] -> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`"));
+
+ Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[min_max] <- `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[min_max] -> `t2`.`k1`"));
+ }
+
+ @Test
public void testEmptyNode() throws Exception {
connectContext.setDatabase("default_cluster:test");
String emptyNode = "EMPTYSET";
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
new file mode 100644
index 0000000..09fe44a
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
@@ -0,0 +1,426 @@
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BaseTableRef;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.JoinOperator;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.thrift.TRuntimeFilterMode;
+
+import com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class RuntimeFilterGeneratorTest {
+ private Analyzer analyzer;
+ private PlanFragment testPlanFragment;
+ private HashJoinNode hashJoinNode;
+ private OlapScanNode lhsScanNode;
+ private OlapScanNode rhsScanNode;
+ @Mocked
+ private ConnectContext connectContext;
+
+ @Before
+ public void setUp() throws AnalysisException {
+ Catalog catalog = Deencapsulation.newInstance(Catalog.class);
+ analyzer = new Analyzer(catalog, connectContext);
+ new Expectations() {
+ {
+ analyzer.getClusterName();
+ result = "default_cluster";
+ }
+ };
+ TableRef tableRef = new TableRef();
+ Deencapsulation.setField(tableRef, "isAnalyzed", true);
+ Deencapsulation.setField(tableRef, "joinOp", JoinOperator.INNER_JOIN);
+
+ TupleDescriptor lhsTupleDescriptor = new TupleDescriptor(new TupleId(0));
+ lhsScanNode = new OlapScanNode(new PlanNodeId(0), lhsTupleDescriptor, "LEFT SCAN");
+ TableName lhsTableName = new TableName("default_cluster:test_db", "test_lhs_tbl");
+ SlotRef lhsExpr = new SlotRef(lhsTableName, "test_lhs_col");
+ SlotDescriptor lhsSlotDescriptor = new SlotDescriptor(new SlotId(0), lhsTupleDescriptor);
+ Column k1 = new Column("test_lhs_col", PrimitiveType.BIGINT);
+ k1.setIsKey(true);
+ k1.setIsAllowNull(false);
+ lhsSlotDescriptor.setColumn(k1);
+ lhsExpr.setDesc(lhsSlotDescriptor);
+ Table lhsTable = new Table(0, "test_lhs_tbl", Table.TableType.OLAP, Lists.newArrayList(k1));
+ BaseTableRef lhsTableRef = new BaseTableRef(tableRef, lhsTable, lhsTableName);
+ lhsTableRef.analyze(analyzer);
+
+ TupleDescriptor rhsTupleDescriptor = new TupleDescriptor(new TupleId(1));
+ rhsScanNode = new OlapScanNode(new PlanNodeId(1), rhsTupleDescriptor, "RIGHT SCAN");
+ TableName rhsTableName = new TableName("default_cluster:test_db", "test_rhs_tbl");
+ SlotRef rhsExpr = new SlotRef(rhsTableName, "test_rhs_col");
+ SlotDescriptor rhsSlotDescriptor = new SlotDescriptor(new SlotId(1), rhsTupleDescriptor);
+ Column k2 = new Column("test_rhs_col", PrimitiveType.INT);
+ k2.setIsKey(true);
+ k2.setIsAllowNull(false);
+ rhsSlotDescriptor.setColumn(k2);
+ rhsExpr.setDesc(rhsSlotDescriptor);
+ Table rhsTable = new Table(0, "test_rhs_tbl", Table.TableType.OLAP, Lists.newArrayList(k2));
+ BaseTableRef rhsTableRef = new BaseTableRef(tableRef, rhsTable, rhsTableName);
+ rhsTableRef.analyze(analyzer);
+
+ ArrayList<Expr> testJoinExprs = new ArrayList<>();
+ BinaryPredicate eqJoinConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhsExpr, rhsExpr);
+ testJoinExprs.add(eqJoinConjunct);
+
+ hashJoinNode = new HashJoinNode(new PlanNodeId(2), lhsScanNode, rhsScanNode, tableRef, testJoinExprs
+ , new ArrayList<>());
+ testPlanFragment = new PlanFragment(new PlanFragmentId(0), hashJoinNode
+ , new DataPartition(TPartitionType.UNPARTITIONED));
+ hashJoinNode.setFragment(testPlanFragment);
+ lhsScanNode.setFragment(testPlanFragment);
+ rhsScanNode.setFragment(testPlanFragment);
+
+ new Expectations() {
+ {
+ analyzer.getSlotDesc(new SlotId(0));
+ result = lhsSlotDescriptor;
+ analyzer.getSlotDesc(new SlotId(1));
+ result = rhsSlotDescriptor;
+
+ ConnectContext.get().getSessionVariable().getRuntimeFiltersMaxNum();
+ result = 8;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize();
+ result = 16777216;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize();
+ result = 1048576;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize();
+ result = 2097152;
+ }
+ };
+ }
+
+ private void clearRuntimeFilterState() {
+ testPlanFragment.clearRuntimeFilters();
+ analyzer.clearAssignedRuntimeFilters();
+ hashJoinNode.clearRuntimeFilters();
+ lhsScanNode.clearRuntimeFilters();
+ }
+
+ @Test
+ public void testGenerateRuntimeFiltersMode() {
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+ result = "GLOBAL";
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 7;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF002[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF002[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+ result = "LOCAL";
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF002[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF002[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+ result = "REMOTE";
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 0);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 0);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 0);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 0);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 0);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true), "");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false), "");
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGenerateRuntimeFiltersModeException() {
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 8;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ }
+
+ @Test
+ public void testGenerateRuntimeFiltersType() {
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 0;
+ ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+ result = "GLOBAL";
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true), "");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false), "");
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 0);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 0);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 0);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 0);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 0);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 1;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 2;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 3;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 4;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 5;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 6;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+ , "RF000[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+ ", RF001[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+ Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+ , "RF000[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+ ", RF001[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+ Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+ Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+ Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGenerateRuntimeFiltersTypeExceptionLess() {
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = -1;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGenerateRuntimeFiltersTypeExceptionMore() {
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 8;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ }
+
+ @Test
+ public void testGenerateRuntimeFiltersSize() {
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+ result = "GLOBAL";
+ ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+ result = 2;
+ }
+ };
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize();
+ result = 16777216;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize();
+ result = 1048576;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize();
+ result = 2097152;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 2097152);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize();
+ result = 16777216;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize();
+ result = 1048576;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize();
+ result = 1;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 1048576);
+
+ clearRuntimeFilterState();
+ new Expectations() {
+ {
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize();
+ result = 16777216;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize();
+ result = 1048576;
+ ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize();
+ result = 999999999;
+ }
+ };
+ RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+ Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 16777216);
+
+ // Use ndv and fpp to calculate the minimum space required for bloom filter
+ Assert.assertEquals(1L <<
+ RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.05), 1048576);
+ Assert.assertEquals(1L <<
+ RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.1), 1048576);
+ Assert.assertEquals(1L <<
+ RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.3), 524288);
+ Assert.assertEquals(1L <<
+ RuntimeFilter.GetMinLogSpaceForBloomFilter(10000000, 0.1), 8388608);
+ Assert.assertEquals(1L <<
+ RuntimeFilter.GetMinLogSpaceForBloomFilter(1000, 0.1), 1024);
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
new file mode 100644
index 0000000..d0ce41c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.DdlException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RuntimeFilterTypeHelperTest {
+
+ @Test
+ public void testNormal() throws DdlException {
+ String runtimeFilterType = "";
+ Assert.assertEquals(new Long(0L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "IN";
+ Assert.assertEquals(new Long(1L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "BLOOM_FILTER";
+ Assert.assertEquals(new Long(2L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "IN,BLOOM_FILTER";
+ Assert.assertEquals(new Long(3L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "MIN_MAX";
+ Assert.assertEquals(new Long(4L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "IN,MIN_MAX";
+ Assert.assertEquals(new Long(5L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "MIN_MAX, BLOOM_FILTER";
+ Assert.assertEquals(new Long(6L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ runtimeFilterType = "IN,BLOOM_FILTER,MIN_MAX";
+ Assert.assertEquals(new Long(7L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+ long runtimeFilterTypeValue = 0L;
+ Assert.assertEquals("", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue));
+
+ runtimeFilterTypeValue = 1L;
+ Assert.assertEquals("IN", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue));
+
+ runtimeFilterTypeValue = 3L;
+ Assert.assertEquals("BLOOM_FILTER,IN", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly
+
+ runtimeFilterTypeValue = 7L;
+ Assert.assertEquals("BLOOM_FILTER,IN,MIN_MAX", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly
+ }
+
+ @Test(expected = DdlException.class)
+ public void testInvalidSqlMode() throws DdlException {
+ RuntimeFilterTypeHelper.encode("BLOOM,IN");
+ Assert.fail("No exception throws");
+ }
+
+ @Test(expected = DdlException.class)
+ public void testInvalidDecode() throws DdlException {
+ RuntimeFilterTypeHelper.decode(10L);
+ Assert.fail("No exception throws");
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index a053c38..5cba8c2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -167,6 +167,12 @@ public class VariableMgrTest {
setVar7.analyze(null);
VariableMgr.setVar(var, setVar7);
Assert.assertEquals("-08:00", VariableMgr.newSessionVariable().getTimeZone());
+
+ SetVar setVar8 = new SetVar(SetType.SESSION, "runtime_filter_type", new StringLiteral(
+ RuntimeFilterTypeHelper.encode("BLOOM_FILTER").toString()));
+ setVar8.analyze(null);
+ VariableMgr.setVar(var, setVar8);
+ Assert.assertEquals(2L, var.getRuntimeFilterType());
}
@Test(expected = UserException.class)
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index 888af72..b2f0c89 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -141,29 +141,11 @@ struct TQueryOptions {
// whether enable parallel merge in exchange node
32: optional bool enable_enable_exchange_node_parallel_merge = false;
- // runtime filter run mode
- 33: optional string runtime_filter_mode = "GLOBAL";
-
- // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will
- // be rounded up to the nearest power of two.
- 34: optional i32 runtime_bloom_filter_size = 1048576
-
- // Minimum runtime bloom filter size, in bytes
- 35: optional i32 runtime_bloom_filter_min_size = 1048576
-
- // Maximum runtime bloom filter size, in bytes
- 36: optional i32 runtime_bloom_filter_max_size = 16777216
-
// Time in ms to wait until runtime filters are delivered.
- 37: optional i32 runtime_filter_wait_time_ms = 1000
-
- // Maximum number of bloom runtime filters allowed per query
- 38: optional i32 runtime_filters_max_num = 10
-
- // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
- 39: optional i32 runtime_filter_type = 1;
+ 33: optional i32 runtime_filter_wait_time_ms = 1000
- 40: optional i32 runtime_filter_max_in_num = 1024;
+ // if the right table is greater than this value in the hash join, we will ignore IN filter
+ 34: optional i32 runtime_filter_max_in_num = 1024;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index d3b2cd0..8d968dc 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -381,11 +381,10 @@ struct THashJoinNode {
// anything from the ON or USING clauses (but *not* the WHERE clause) that's not an
// equi-join predicate
3: optional list<Exprs.TExpr> other_join_conjuncts
- 4: optional bool is_push_down
// If true, this join node can (but may choose not to) generate slot filters
// after constructing the build side that can be applied to the probe side.
- 5: optional bool add_probe_filters
+ 4: optional bool add_probe_filters
}
struct TMergeJoinNode {
@@ -726,7 +725,6 @@ struct TRuntimeFilterDesc {
9: optional i64 bloom_filter_size_bytes
}
-
// This is essentially a union of all messages corresponding to subclasses
// of PlanNode.
struct TPlanNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org