You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/13 03:36:55 UTC

[incubator-doris] branch master updated: [Feature] Runtime Filtering for Doris (Background, Configuration, FE Implement, Tuning, Test ) (#6121)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 394a9a1  [Feature] Runtime Filtering for Doris (Background, Configuration, FE Implement, Tuning, Test ) (#6121)
394a9a1 is described below

commit 394a9a1472cb8bfe91f6649afcbeee7f0178dc62
Author: Xinyi Zou <zo...@foxmail.com>
AuthorDate: Tue Jul 13 11:36:01 2021 +0800

    [Feature] Runtime Filtering for Doris (Background, Configuration, FE Implement, Tuning, Test ) (#6121)
    
    - `RuntimeFilterGenerator` is used to generate Runtime Filter and assign it to the node that uses Runtime Filter in the query plan.
    
    - `RuntimeFilter` represents a filter in the query plan, including the specific properties of the filter, the binding method of expr and tuple slot, etc.
    
    - `RuntimeFilterTarget` indicates the filter information provided to ScanNode, including target expr, whether to merge, etc.
---
 .../java/org/apache/doris/analysis/Analyzer.java   |  46 +++
 .../org/apache/doris/analysis/BinaryPredicate.java |   2 +
 .../main/java/org/apache/doris/analysis/Expr.java  |  37 ++
 .../java/org/apache/doris/analysis/Predicate.java  |   5 +
 .../org/apache/doris/analysis/StringLiteral.java   |  30 +-
 .../org/apache/doris/analysis/SysVariableDesc.java |  20 +-
 .../doris/analysis/TupleIsNullPredicate.java       |  20 +
 .../util/BitUtil.java}                             |  23 +-
 .../apache/doris/planner/DistributedPlanner.java   |   3 -
 .../org/apache/doris/planner/HashJoinNode.java     |  15 +-
 .../org/apache/doris/planner/OlapScanNode.java     |   4 +
 .../org/apache/doris/planner/PlanFragment.java     |  30 ++
 .../java/org/apache/doris/planner/PlanNode.java    |  36 ++
 .../java/org/apache/doris/planner/Planner.java     |  10 +-
 .../org/apache/doris/planner/RuntimeFilter.java    | 457 +++++++++++++++++++++
 .../doris/planner/RuntimeFilterGenerator.java      | 400 ++++++++++++++++++
 .../org/apache/doris/planner/RuntimeFilterId.java  |  56 +++
 .../java/org/apache/doris/planner/ScanNode.java    |   2 +
 .../apache/doris/planner/SingleNodePlanner.java    |  31 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  74 +++-
 .../apache/doris/qe/RuntimeFilterTypeHelper.java   | 115 ++++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 102 +++++
 .../main/java/org/apache/doris/qe/VariableMgr.java |  36 +-
 .../org/apache/doris/qe/VariableVarConverterI.java |   4 +-
 .../org/apache/doris/qe/VariableVarConverters.java |  59 ++-
 .../org/apache/doris/planner/QueryPlanTest.java    |  79 ++++
 .../doris/planner/RuntimeFilterGeneratorTest.java  | 426 +++++++++++++++++++
 .../doris/qe/RuntimeFilterTypeHelperTest.java      |  77 ++++
 .../java/org/apache/doris/qe/VariableMgrTest.java  |   6 +
 gensrc/thrift/PaloInternalService.thrift           |  24 +-
 gensrc/thrift/PlanNodes.thrift                     |   4 +-
 31 files changed, 2131 insertions(+), 102 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 3634768..a879f08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.IdGenerator;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.RuntimeFilter;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.BetweenToCompoundRule;
 import org.apache.doris.rewrite.ExprRewriteRule;
@@ -148,6 +149,9 @@ public class Analyzer {
     private boolean isUDFAllowed = true;
     // timezone specified for some operation, such as broker load
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
+
+    // The runtime filter that is expected to be used
+    private final List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
     
     public void setIsSubquery() {
         isSubquery = true;
@@ -163,6 +167,10 @@ public class Analyzer {
     public void setTimezone(String timezone) { this.timezone = timezone; }
     public String getTimezone() { return timezone; }
 
+    public void putAssignedRuntimeFilter(RuntimeFilter rf) { assignedRuntimeFilters.add(rf); }
+    public List<RuntimeFilter> getAssignedRuntimeFilter() { return assignedRuntimeFilters; }
+    public void clearAssignedRuntimeFilters() { assignedRuntimeFilters.clear(); }
+
     // state shared between all objects of an Analyzer tree
     // TODO: Many maps here contain properties about tuples, e.g., whether
     // a tuple is outer/semi joined, etc. Remove the maps in favor of making
@@ -551,6 +559,10 @@ public class Analyzer {
         return globalState.descTbl.getTupleDesc(id);
     }
 
+    public SlotDescriptor getSlotDesc(SlotId id) {
+        return globalState.descTbl.getSlotDesc(id);
+    }
+
     /**
      * Given a "table alias"."column alias", return the SlotDescriptor
      *
@@ -1809,4 +1821,38 @@ public class Analyzer {
             }
         }
     }
+
+    /**
+     * Column conduction, can slot a value-transfer to slot b
+     *
+     * TODO(zxy) Use value-transfer graph to check
+     */
+    public boolean hasValueTransfer(SlotId a, SlotId b) {
+        return a.equals(b);
+    }
+
+    /**
+     * Returns sorted slot IDs with value transfers from 'srcSid'.
+     * Time complexity: O(V) where V = number of slots
+     *
+     * TODO(zxy) Use value-transfer graph to check
+     */
+    public List<SlotId> getValueTransferTargets(SlotId srcSid) {
+        List<SlotId> result = new ArrayList<>();
+        result.add(srcSid);
+        return result;
+    }
+
+    /**
+     * Returns true if any of the given slot ids or their value-transfer targets belong
+     * to an outer-joined tuple.
+     */
+    public boolean hasOuterJoinedValueTransferTarget(List<SlotId> sids) {
+        for (SlotId srcSid: sids) {
+            for (SlotId dstSid: getValueTransferTargets(srcSid)) {
+                if (isOuterJoined(getTupleId(dstSid))) return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
index 9a98ac5..8c3da5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
@@ -133,6 +133,8 @@ public class BinaryPredicate extends Predicate implements Writable {
 
         public boolean isEquivalence() { return this == EQ || this == EQ_FOR_NULL; };
 
+        public boolean isUnNullSafeEquivalence() { return this == EQ; };
+
         public boolean isUnequivalence() { return this == NE; }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index 3f73a31..cafd88f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -1704,4 +1704,41 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         final Expr newExpr = ExpressionFunctions.INSTANCE.evalExpr(this);
         return newExpr != null ? newExpr : this;
     }
+
+    public String getStringValue() {
+        if (this instanceof LiteralExpr) {
+            return ((LiteralExpr) this).getStringValue();
+        }
+        return "";
+    }
+
+    public static Expr getFirstBoundChild(Expr expr, List<TupleId> tids) {
+        for (Expr child: expr.getChildren()) {
+            if (child.isBoundByTupleIds(tids)) return child;
+        }
+        return null;
+    }
+
+    /**
+     * Returns true if expr contains specify function, otherwise false.
+     */
+    public boolean isContainsFunction(String functionName) {
+        if (fn == null) return false;
+        if (fn.functionName().equalsIgnoreCase(functionName))  return true;
+        for (Expr child: children) {
+            if (child.isContainsFunction(functionName)) return true;
+        }
+        return false;
+    }
+
+    /**
+     * Returns true if expr contains specify className, otherwise false.
+     */
+    public boolean isContainsClass(String className) {
+        if (this.getClass().getName().equalsIgnoreCase(className)) return true;
+        for (Expr child: children) {
+            if (child.isContainsClass(className)) return true;
+        }
+        return false;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java
index 0f033b4..cc4f4dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java
@@ -98,6 +98,11 @@ public abstract class Predicate extends Expr {
                 && ((BinaryPredicate) expr).getOp().isEquivalence();
     }
 
+    public static boolean isUnNullSafeEquivalencePredicate(Expr expr) {
+        return (expr instanceof BinaryPredicate)
+                && ((BinaryPredicate) expr).getOp().isUnNullSafeEquivalence();
+    }
+
     public static boolean canPushDownPredicate(Expr expr) {
         if (!(expr instanceof Predicate)) {
             return false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
index 8af05d3..ab202e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
@@ -24,7 +24,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.io.Text;
-import org.apache.doris.qe.SqlModeHelper;
+import org.apache.doris.qe.VariableVarConverters;
 import org.apache.doris.thrift.TExprNode;
 import org.apache.doris.thrift.TExprNodeType;
 import org.apache.doris.thrift.TStringLiteral;
@@ -43,18 +43,8 @@ import java.util.Objects;
 public class StringLiteral extends LiteralExpr {
     private static final Logger LOG = LogManager.getLogger(StringLiteral.class);
     private String value;
-    /**
-     * the session variable `sql_mode` is a special kind of variable.
-     * it's real type is int, so when querying `select @@sql_mode`, the return column
-     * type is "int". but user usually set this variable by string, such as:
-     * `set @@sql_mode = 'STRICT_TRANS_TABLES'`
-     * or
-     * `set @@sql_mode = concat(@@sql_mode, 'STRICT_TRANS_TABLES')'`
-     * <p>
-     * So when it need to be cast to int, it means "cast 'STRICT_TRANS_TABLES' to Integer".
-     * To support this, we set `isSqlMode` to true, so that it can cast sql mode name to integer.
-     */
-    private boolean isSqlMode = false;
+    // Means the converted session variable need to be cast to int, such as "cast 'STRICT_TRANS_TABLES' to Integer".
+    private String beConverted = "";
     
     public StringLiteral() {
         super();
@@ -73,8 +63,8 @@ public class StringLiteral extends LiteralExpr {
         value = other.value;
     }
 
-    public void setIsSqlMode(boolean val) {
-        this.isSqlMode = val;
+    public void setBeConverted(String val) {
+        this.beConverted = val;
     }
 
     @Override
@@ -203,20 +193,18 @@ public class StringLiteral extends LiteralExpr {
                 case SMALLINT:
                 case INT:
                 case BIGINT:
-                    if (isSqlMode) {
+                    if (VariableVarConverters.hasConverter(beConverted)) {
                         try {
-                            long sqlMode = SqlModeHelper.encode(value);
-                            return new IntLiteral(sqlMode, targetType);
+                            return new IntLiteral(VariableVarConverters.encode(beConverted, value), targetType);
                         } catch (DdlException e) {
                             throw new AnalysisException(e.getMessage());
                         }
                     }
                     return new IntLiteral(value, targetType);
                 case LARGEINT:
-                    if (isSqlMode) {
+                    if (VariableVarConverters.hasConverter(beConverted)) {
                         try {
-                            long sqlMode = SqlModeHelper.encode(value);
-                            return new LargeIntLiteral(String.valueOf(sqlMode));
+                            return new LargeIntLiteral(String.valueOf(VariableVarConverters.encode(beConverted, value)));
                         } catch (DdlException e) {
                             throw new AnalysisException(e.getMessage());
                         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java
index 5675f52..c000baf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java
@@ -21,9 +21,8 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorReport;
-import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.qe.SqlModeHelper;
 import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.qe.VariableVarConverters;
 import org.apache.doris.thrift.TBoolLiteral;
 import org.apache.doris.thrift.TExprNode;
 import org.apache.doris.thrift.TExprNodeType;
@@ -72,10 +71,10 @@ public class SysVariableDesc extends Expr {
     @Override
     public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
         VariableMgr.fillValue(analyzer.getContext().getSessionVariable(), this);
-        if (!Strings.isNullOrEmpty(name) && name.equalsIgnoreCase(SessionVariable.SQL_MODE)) {
+        if (!Strings.isNullOrEmpty(name) && VariableVarConverters.hasConverter(name)) {
             setType(Type.VARCHAR);
             try {
-                setStringValue(SqlModeHelper.decode(intValue));
+                setStringValue(VariableVarConverters.decode(name, intValue));
             } catch (DdlException e) {
                 ErrorReport.reportAnalysisException(e.getMessage());
             }
@@ -117,16 +116,13 @@ public class SysVariableDesc extends Expr {
     @Override
     public Expr getResultValue() throws AnalysisException {
         Expr expr = super.getResultValue();
-        if (!Strings.isNullOrEmpty(name) && name.equalsIgnoreCase(SessionVariable.SQL_MODE)) {
-            // SQL_MODE is a special variable. Its type is int, but it is usually set using a string.
-            // Such as `set sql_mode =  concat(@@sql_mode, "STRICT_TRANS_TABLES");`
-            // So we return the string type here so that it can correctly match the subsequent function signature.
-            // We will convert the string to int in VariableMgr.
-            // And we also set `isSqlMode` to true in StringLiteral, so that it can be cast back
+        if (!Strings.isNullOrEmpty(name) && VariableVarConverters.hasConverter(name)) {
+            // Return the string type here so that it can correctly match the subsequent function signature.
+            // And we also set `beConverted` to session variable name in StringLiteral, so that it can be cast back
             // to Integer when returning value.
             try {
-                StringLiteral s = new StringLiteral(SqlModeHelper.decode(intValue));
-                s.setIsSqlMode(true);
+                StringLiteral s = new StringLiteral(VariableVarConverters.decode(name, intValue));
+                s.setBeConverted(name);
                 return s;
             } catch (DdlException e) {
                 throw new AnalysisException(e.getMessage());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java
index 872c18c..c4cb13f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java
@@ -153,4 +153,24 @@ public class TupleIsNullPredicate extends Predicate {
     public String toSqlImpl() {
         return "TupleIsNull(" + Joiner.on(",").join(tupleIds) + ")";
     }
+
+    /**
+     * Recursive function that replaces all 'IF(TupleIsNull(), NULL, e)' exprs in
+     * 'expr' with e and returns the modified expr.
+     */
+    public static Expr unwrapExpr(Expr expr)  {
+        if (expr instanceof FunctionCallExpr) {
+            FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr;
+            List<Expr> params = fnCallExpr.getParams().exprs();
+            if (fnCallExpr.getFnName().getFunction().equals("if") &&
+                    params.get(0) instanceof TupleIsNullPredicate &&
+                    Expr.IS_NULL_LITERAL.apply(params.get(1))) {
+                return unwrapExpr(params.get(2));
+            }
+        }
+        for (int i = 0; i < expr.getChildren().size(); ++i) {
+            expr.setChild(i, unwrapExpr(expr.getChild(i)));
+        }
+        return expr;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java
similarity index 53%
copy from fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
copy to fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java
index 26d7dae..5702755 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java
@@ -15,10 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.qe;
+package org.apache.doris.common.util;
 
-import org.apache.doris.common.DdlException;
+public class BitUtil {
 
-public interface VariableVarConverterI {
-    public String convert(String value) throws DdlException;
+    // Returns the log2 of 'val'. 'val' must be > 0.
+    public static int log2Ceiling(long val) {
+        // Formula is based on the Long.numberOfLeadingZeros() javadoc comment.
+        return 64 - Long.numberOfLeadingZeros(val - 1);
+    }
+
+    // Round up 'val' to the nearest power of two. 'val' must be > 0.
+    public static long roundUpToPowerOf2(long val) {
+        return 1L << log2Ceiling(val);
+    }
+
+    // Round up 'val' to the nearest multiple of a power-of-two 'factor'.
+    // 'val' must be > 0.
+    public static long roundUpToPowerOf2Factor(long val, long factor) {
+        return (val + (factor - 1)) & ~(factor - 1);
+    }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 677bde5..5aa9bb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -427,7 +427,6 @@ public class DistributedPlanner {
             node.setChild(0, leftChildFragment.getPlanRoot());
             connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
             leftChildFragment.setPlanRoot(node);
-
             return leftChildFragment;
         } else {
             node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
@@ -472,8 +471,6 @@ public class DistributedPlanner {
             rightChildFragment.setDestination(rhsExchange);
             rightChildFragment.setOutputPartition(rhsJoinPartition);
 
-            // TODO: Before we support global runtime filter, only shuffle join do not enable local runtime filter
-            node.setIsPushDown(false);
             return joinFragment;
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 2360d5c..6b06de2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -60,7 +60,6 @@ public class HashJoinNode extends PlanNode {
     private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
     // join conjuncts from the JOIN clause that aren't equi-join predicates
     private  List<Expr> otherJoinConjuncts;
-    private boolean isPushDown = false;
     private DistributionMode distrMode;
     private boolean isColocate = false; //the flag for colocate join
     private String colocateReason = ""; // if can not do colocate join, set reason here
@@ -85,9 +84,6 @@ public class HashJoinNode extends PlanNode {
         this.otherJoinConjuncts = otherJoinConjuncts;
         children.add(outer);
         children.add(inner);
-        if (this.joinOp.isInnerJoin() || this.joinOp.isLeftSemiJoin()) {
-            this.isPushDown = true;
-        }
 
         // Inherits all the nullable tuple from the children
         // Mark tuples that form the "nullable" side of the outer join as nullable.
@@ -282,10 +278,6 @@ public class HashJoinNode extends PlanNode {
         }
     }
 
-    public void setIsPushDown(boolean isPushDown) {
-        this.isPushDown = isPushDown;
-    }
-
     @Override
     protected void toThrift(TPlanNode msg) {
         msg.node_type = TPlanNodeType.HASH_JOIN_NODE;
@@ -300,7 +292,6 @@ public class HashJoinNode extends PlanNode {
         for (Expr e : otherJoinConjuncts) {
             msg.hash_join_node.addToOtherJoinConjuncts(e.treeToThrift());
         }
-        msg.hash_join_node.setIsPushDown(isPushDown);
     }
 
     @Override
@@ -326,6 +317,12 @@ public class HashJoinNode extends PlanNode {
         if (!conjuncts.isEmpty()) {
             output.append(detailPrefix).append("other predicates: ").append(getExplainString(conjuncts)).append("\n");
         }
+        if (!runtimeFilters.isEmpty()) {
+            output.append(detailPrefix).append("runtime filters: ");
+            output.append(getRuntimeFilterExplainString(true));
+        }
+        output.append(detailPrefix).append(String.format(
+                "cardinality=%s", cardinality)).append("\n");
         return output.toString();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 62948c7..99e158e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -591,6 +591,10 @@ public class OlapScanNode extends ScanNode {
             output.append(prefix).append("PREDICATES: ").append(
                     getExplainString(conjuncts)).append("\n");
         }
+        if (!runtimeFilters.isEmpty()) {
+            output.append(prefix).append("runtime filters: ");
+            output.append(getRuntimeFilterExplainString(false));
+        }
 
         output.append(prefix).append(String.format(
                     "partitions=%s/%s",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index f714132..e865cf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -36,7 +36,9 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -127,6 +129,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     // default value is 1
     private int parallelExecNum = 1;
 
+    // The runtime filter id that produced
+    private Set<RuntimeFilterId> builderRuntimeFilterIds;
+    // The runtime filter id that is expected to be used
+    private Set<RuntimeFilterId> targetRuntimeFilterIds;
+
     /**
      * C'tor for fragment with specific partition; the output is by default broadcast.
      */
@@ -136,6 +143,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         this.dataPartition = partition;
         this.outputPartition = DataPartition.UNPARTITIONED;
         this.transferQueryStatisticsWithEveryBatch = false;
+        this.builderRuntimeFilterIds = new HashSet<>();
+        this.targetRuntimeFilterIds = new HashSet<>();
         setParallelExecNumIfExists();
         setFragmentInPlanTree(planRoot);
     }
@@ -177,6 +186,14 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         this.outputExprs = Expr.cloneList(outputExprs, null);
     }
 
+    public void setBuilderRuntimeFilterIds(RuntimeFilterId rid) {
+        this.builderRuntimeFilterIds.add(rid);
+    }
+
+    public void setTargetRuntimeFilterIds(RuntimeFilterId rid) {
+        this.targetRuntimeFilterIds.add(rid);
+    }
+
     /**
      * Finalize plan tree and create stream sink, if needed.
      */
@@ -344,6 +361,19 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         return fragmentId;
     }
 
+    public Set<RuntimeFilterId> getBuilderRuntimeFilterIds() {
+        return builderRuntimeFilterIds;
+    }
+
+    public Set<RuntimeFilterId> getTargetRuntimeFilterIds() {
+        return targetRuntimeFilterIds;
+    }
+
+    public void clearRuntimeFilters() {
+        builderRuntimeFilterIds.clear();
+        targetRuntimeFilterIds.clear();
+    }
+
     public void setTransferQueryStatisticsWithEveryBatch(boolean value) {
         transferQueryStatisticsWithEveryBatch = value;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index dd7f636..1233f45 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.planner;
 
+import com.google.common.base.Joiner;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
@@ -40,6 +41,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
@@ -117,6 +119,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         return planNodeName;
     }
 
+    // Runtime filters assigned to this node.
+    protected List<RuntimeFilter> runtimeFilters = new ArrayList<>();
+
     protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName) {
         this.id = id;
         this.limit = -1;
@@ -411,6 +416,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         for (Expr e : conjuncts) {
             msg.addToConjuncts(e.treeToThrift());
         }
+        // Serialize any runtime filters
+        for (RuntimeFilter filter : runtimeFilters) {
+            msg.addToRuntimeFilters(filter.toThrift());
+        }
         msg.compact_data = compactData;
         toThrift(msg);
         container.addToNodes(msg);
@@ -659,4 +668,31 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         }
         return null;
     }
+
+    protected void addRuntimeFilter(RuntimeFilter filter) { runtimeFilters.add(filter); }
+
+    protected Collection<RuntimeFilter> getRuntimeFilters() { return runtimeFilters; }
+
+    public void clearRuntimeFilters() { runtimeFilters.clear(); }
+
+    protected String getRuntimeFilterExplainString(boolean isBuildNode) {
+        if (runtimeFilters.isEmpty()) return "";
+        List<String> filtersStr = new ArrayList<>();
+        for (RuntimeFilter filter: runtimeFilters) {
+            StringBuilder filterStr = new StringBuilder();
+            filterStr.append(filter.getFilterId());
+            filterStr.append("[");
+            filterStr.append(filter.getType().toString().toLowerCase());
+            filterStr.append("]");
+            if (isBuildNode) {
+                filterStr.append(" <- ");
+                filterStr.append(filter.getSrcExpr().toSql());
+            } else {
+                filterStr.append(" -> ");
+                filterStr.append(filter.getTargetExpr(getId()).toSql());
+            }
+            filtersStr.add(filterStr.toString());
+        }
+        return Joiner.on(", ").join(filtersStr) + "\n";
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index fa7a101..16c0a37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.PlanTreeBuilder;
 import org.apache.doris.common.profile.PlanTreePrinter;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TQueryOptions;
@@ -38,6 +39,7 @@ import org.apache.doris.thrift.TQueryOptions;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.doris.thrift.TRuntimeFilterMode;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -201,7 +203,13 @@ public class Planner {
         QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer = new QueryStatisticsTransferOptimizer(rootFragment);
         queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer();
 
-        if (statement instanceof InsertStmt) {
+        // Create runtime filters.
+        if (!ConnectContext.get().getSessionVariable().getRuntimeFilterMode().toUpperCase()
+                .equals(TRuntimeFilterMode.OFF.name())) {
+            RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
+        }
+
+	if (statement instanceof InsertStmt) {
             InsertStmt insertStmt = (InsertStmt) statement;
             rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
             rootFragment.setSink(insertStmt.getDataSink());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
new file mode 100644
index 0000000..a368208
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.analysis.TupleIsNullPredicate;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.thrift.TRuntimeFilterDesc;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Representation of a runtime filter. A runtime filter is generated from
+ * an equi-join predicate of the form <lhs_expr> = <rhs_expr>, where lhs_expr is the
+ * expr on which the filter is applied and must be bound by a single tuple id from
+ * the left plan subtree of the associated join node, while rhs_expr is the expr on
+ * which the filter is built and can be bound by any number of tuple ids from the
+ * right plan subtree. Every runtime filter must record the join node that constructs
+ * the filter and the scan nodes that apply the filter (destination nodes).
+ */
+public final class RuntimeFilter {
+    private final static Logger LOG = LogManager.getLogger(RuntimeFilter.class);
+
+    // Identifier of the filter (unique within a query)
+    private final RuntimeFilterId id;
+    // Join node that builds the filter
+    private final HashJoinNode builderNode;
+    // Expr (rhs of join predicate) on which the filter is built
+    private final Expr srcExpr;
+    // The position of expr in the join condition
+    private final int exprOrder;
+    // Expr (lhs of join predicate) from which the targetExprs_ are generated.
+    private final Expr origTargetExpr;
+    // Runtime filter targets
+    private final List<RuntimeFilterTarget> targets = new ArrayList<>();
+    // Slots from base table tuples that have value transfer from the slots
+    // of 'origTargetExpr'. The slots are grouped by tuple id.
+    private final Map<TupleId, List<SlotId>> targetSlotsByTid;
+    // If true, the join node building this filter is executed using a broadcast join;
+    // set in the DistributedPlanner.createHashJoinFragment()
+    private boolean isBroadcastJoin;
+    // Estimate of the number of distinct values that will be inserted into this filter,
+    // globally across all instances of the source node. Used to compute an optimal size
+    // for the filter. A value of -1 means no estimate is available, and default filter
+    // parameters should be used.
+    private long ndvEstimate = -1;
+    // Size of the filter (in Bytes). Should be greater than zero for bloom filters.
+    private long filterSizeBytes = 0;
+    // If true, the filter is produced by a broadcast join and there is at least one
+    // destination scan node which is in the same fragment as the join; set in
+    // DistributedPlanner.createHashJoinFragment().
+    private boolean hasLocalTargets = false;
+    // If true, there is at least one destination scan node which is not in the same
+    // fragment as the join that produced the filter; set in
+    // DistributedPlanner.createHashJoinFragment().
+    private boolean hasRemoteTargets = false;
+    // If set, indicates that the filter can't be assigned to another scan node.
+    // Once set, it can't be unset.
+    private boolean finalized = false;
+    // The type of filter to build.
+    private TRuntimeFilterType runtimeFilterType;
+
+    /**
+     * Internal representation of a runtime filter target.
+     */
+    public static class RuntimeFilterTarget {
+        // Scan node that applies the filter
+        public ScanNode node;
+        // Expr on which the filter is applied
+        public Expr expr;
+        // Indicates if 'expr' is bound only by partition columns
+        public final boolean isBoundByKeyColumns;
+        // Indicates if 'node' is in the same fragment as the join that produces the filter
+        public final boolean isLocalTarget;
+
+        public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr,
+                                   boolean isBoundByKeyColumns, boolean isLocalTarget) {
+            Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
+            this.node = targetNode;
+            this.expr = targetExpr;
+            this.isBoundByKeyColumns = isBoundByKeyColumns;
+            this.isLocalTarget = isLocalTarget;
+        }
+
+        @Override
+        public String toString() {
+            return "Target Id: " + node.getId() + " " +
+                    "Target expr: " + expr.debugString() + " " +
+                    "Is only Bound By Key: " + isBoundByKeyColumns +
+                    "Is local: " + isLocalTarget;
+        }
+    }
+
+    private RuntimeFilter(RuntimeFilterId filterId, HashJoinNode filterSrcNode, Expr srcExpr, int exprOrder,
+                          Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots,
+                          TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+        this.id = filterId;
+        this.builderNode = filterSrcNode;
+        this.srcExpr = srcExpr;
+        this.exprOrder = exprOrder;
+        this.origTargetExpr = origTargetExpr;
+        this.targetSlotsByTid = targetSlots;
+        this.runtimeFilterType = type;
+        computeNdvEstimate();
+        calculateFilterSize(filterSizeLimits);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof RuntimeFilter)) return false;
+        return ((RuntimeFilter) obj).id.equals(id);
+    }
+
+    @Override
+    public int hashCode() { return id.hashCode(); }
+
+    public void markFinalized() { finalized = true; }
+    public boolean isFinalized() { return finalized; }
+
+    /**
+     * Serializes a runtime filter to Thrift.
+     */
+    public TRuntimeFilterDesc toThrift() {
+        TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc();
+        tFilter.setFilterId(id.asInt());
+        tFilter.setSrcExpr(srcExpr.treeToThrift());
+        tFilter.setExprOrder(exprOrder);
+        tFilter.setIsBroadcastJoin(isBroadcastJoin);
+        tFilter.setHasLocalTargets(hasLocalTargets);
+        tFilter.setHasRemoteTargets(hasRemoteTargets);
+        for (RuntimeFilterTarget target : targets) {
+            tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift());
+        }
+        tFilter.setType(runtimeFilterType);
+        tFilter.setBloomFilterSizeBytes(filterSizeBytes);
+        return tFilter;
+    }
+
+    public List<RuntimeFilterTarget> getTargets() { return targets; }
+    public boolean hasTargets() { return !targets.isEmpty(); }
+    public Expr getSrcExpr() { return srcExpr; }
+    public Expr getOrigTargetExpr() { return origTargetExpr; }
+    public Map<TupleId, List<SlotId>> getTargetSlots() { return targetSlotsByTid; }
+    public RuntimeFilterId getFilterId() { return id; }
+    public TRuntimeFilterType getType() { return runtimeFilterType; }
+    public void setType(TRuntimeFilterType type) { runtimeFilterType = type; }
+    public boolean hasRemoteTargets() { return hasRemoteTargets; }
+    public HashJoinNode getBuilderNode() { return builderNode; }
+
+    /**
+     * Static function to create a RuntimeFilter from 'joinPredicate' that is assigned
+     * to the join node 'filterSrcNode'. Returns an instance of RuntimeFilter
+     * or null if a runtime filter cannot be generated from the specified predicate.
+     */
+    public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer analyzer,
+                                       Expr joinPredicate, int exprOrder, HashJoinNode filterSrcNode,
+                                       TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+        Preconditions.checkNotNull(idGen);
+        Preconditions.checkNotNull(joinPredicate);
+        Preconditions.checkNotNull(filterSrcNode);
+        // Only consider binary equality predicates and not contain Null-safe equals.
+        // The predicate could not be pushed down when there is Null-safe equal operator. Because the runtimeFilter
+        // will filter the null value in child[0] while it is needed in the Null-safe equal join.
+        // For example: select * from a join b where a.id<=>b.id
+        // the null value in table a should be return by scan node instead of filtering it by runtimeFilter.
+        if (!Predicate.isUnNullSafeEquivalencePredicate(joinPredicate)) return null;
+
+        BinaryPredicate normalizedJoinConjunct =
+                SingleNodePlanner.getNormalizedEqPred(joinPredicate,
+                        filterSrcNode.getChild(0).getTupleIds(),
+                        filterSrcNode.getChild(1).getTupleIds(), analyzer);
+        if (normalizedJoinConjunct == null) return null;
+
+        // Ensure that the target expr does not contain TupleIsNull predicates as these
+        // can't be evaluated at a scan node.
+        Expr targetExpr =
+                TupleIsNullPredicate.unwrapExpr(normalizedJoinConjunct.getChild(0).clone());
+        Expr srcExpr = normalizedJoinConjunct.getChild(1);
+
+        if (srcExpr.getType().equals(ScalarType.createHllType())
+                || srcExpr.getType().equals(ScalarType.createType(PrimitiveType.BITMAP))) return null;
+
+        Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, targetExpr);
+        Preconditions.checkNotNull(targetSlots);
+        if (targetSlots.isEmpty()) return null;
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Generating runtime filter from predicate " + joinPredicate);
+        }
+        return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder,
+                targetExpr, targetSlots, type, filterSizeLimits);
+    }
+
+    /**
+     * Returns the ids of base table tuple slots on which a runtime filter expr can be
+     * applied. Due to the existence of equivalence classes, a filter expr may be
+     * applicable at multiple scan nodes. The returned slot ids are grouped by tuple id.
+     * Returns an empty collection if the filter expr cannot be applied at a base table
+     * or if applying the filter might lead to incorrect results.
+     * Returns the slot id of the base table expected to use this target expr.
+     */
+    private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer analyzer, Expr expr) {
+        // 'expr' is not a SlotRef and may contain multiple SlotRefs
+        List<TupleId> tids = new ArrayList<>();
+        List<SlotId> sids = new ArrayList<>();
+        expr.getIds(tids, sids);
+
+        /*
+          If the target expression evaluates to a non-NULL value for outer-join non-matches, then assigning the
+          filter below the nullable side of an outer join may produce incorrect query results.
+          This check is conservative but correct to keep the code simple. In particular, it would otherwise be
+          difficult to identify incorrect runtime filter assignments through outer-joined inline views because
+          the 'expr' has already been fully resolved.
+          TODO(zxy) We rely on the value-transfer graph to check whether 'expr' could potentially be assigned
+           below an outer-joined inline view.
+
+          Queries with the following characteristics may produce wrong results due to an incorrectly assigned
+          runtime filter:
+               1)The query has an outer join
+               2)A scan on the nullable side of that outer join has a runtime filter with a NULL-checking
+                 expression such as COALESCE/IFNULL/CASE
+               3)The latter point imples that there is another join above the outer join with a NULL-checking
+                 expression in it's join condition
+
+           Reproduction:
+               TPC-DS 1T Benchmarks test
+               "
+                   select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk
+                   where coalesce(t2.s_store_sk + 100, 100) in (select ifnull(100, s_store_sk) from store);
+
+                   select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk
+                   where case when t2.s_store_sk is NULL then 100 else t2.s_store_sk end
+                   in (select ifnull(100, s_store_sk) from store limit 10);
+               "
+               We expect a count of 0. A count of 1024 is incorrect.
+               Query plan:
+                   |   4:HASH JOIN
+                   |   |  join op: LEFT SEMI JOIN (BROADCAST)
+                   |   |  equal join conjunct: coalesce(`t2`.`s_store_sk` + 100, 100) = ifnull(100, `s_store_sk`)
+                   |   |  runtime filters: RF000[in] <- ifnull(100, `s_store_sk`)
+                   |   |  cardinality=1002
+                   |   |----7:EXCHANGE
+                   |   3:HASH JOIN
+                   |   |  join op: LEFT OUTER JOIN
+                   |   |  equal join conjunct: `t1`.`s_store_sk` = `t2`.`s_store_sk`
+                   |   |----1:OlapScanNode
+                   |   |       TABLE: store
+                   |   |       runtime filters: RF000[in] -> coalesce(`t2`.`s_store_sk` + 100, 100)
+                   |   0:OlapScanNode
+                   |      TABLE: store
+               Explanation:
+                   RF000 filters out all rows in scan 01.
+                   In join 03 there are no join matches since the right-hand is empty. All rows from the right-hand
+                   side are nulled.
+                   The join condition in join 04 now satisfies all input rows because every "t2.id" is NULL,
+                   so after the COALESCE() the join condition becomes 100 = 100.
+         */
+        if (analyzer.hasOuterJoinedValueTransferTarget(sids)) {
+            // Do not push down when contains NULL-checking expression COALESCE/IFNULL/CASE
+            // TODO(zxy) Returns true if 'p' evaluates to true when all its referenced slots are NULL, returns false
+            //  otherwise. Throws if backend expression evaluation fails.
+            if (expr.isContainsFunction("COALESCE") || expr.isContainsFunction("IFNULL")
+                    || expr.isContainsClass("org.apache.doris.analysis.CaseExpr"))
+                return Collections.emptyMap();
+        }
+
+        Map<TupleId, List<SlotId>> slotsByTid = new HashMap<>();
+        // We need to iterate over all the slots of 'expr' and check if they have
+        // equivalent slots that are bound by the same base table tuple(s).
+        for (SlotId slotId: sids) {
+            Map<TupleId, List<SlotId>> currSlotsByTid = getBaseTblEquivSlots(analyzer, slotId);
+            if (currSlotsByTid.isEmpty()) return Collections.emptyMap();
+            if (slotsByTid.isEmpty()) {
+                slotsByTid.putAll(currSlotsByTid);
+                continue;
+            }
+
+            // Compute the intersection between tuple ids from 'slotsByTid' and
+            // 'currSlotsByTid'. If the intersection is empty, an empty collection
+            // is returned.
+            Iterator<Map.Entry<TupleId, List<SlotId>>> iter = slotsByTid.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<TupleId, List<SlotId>> entry = iter.next();
+                List<SlotId> slotIds = currSlotsByTid.get(entry.getKey());
+                // Take the intersection of the tuple ids of all slots in expr to
+                // form <tupleid, slotid> and return.
+                // A.a + B.b = C.c, when the tuple IDs of the two slots A.a and B.b are different, at this
+                // time cannot be pushed down, so remove. If you can get A.a and transferd to B.a, then
+                // the tuple IDs of A.a and B.b have intersection B, So target expr is available, the tuple
+                // ID of this intersection is the scan node that is expected to use this runtime fitler
+                if (slotIds == null) {
+                    iter.remove();
+                } else {
+                    entry.getValue().addAll(slotIds);
+                }
+            }
+            if (slotsByTid.isEmpty()) return Collections.emptyMap();
+        }
+        return slotsByTid;
+    }
+
+    /**
+     * Static function that returns the ids of slots bound by base table tuples for which
+     * there is a value transfer from 'srcSid'. The slots are grouped by tuple id.
+     * That is, srcSid can be calculated from the <tuple id, slot id> of the base table.
+     */
+    private static Map<TupleId, List<SlotId>> getBaseTblEquivSlots(Analyzer analyzer,
+                                                                   SlotId srcSid) {
+        Map<TupleId, List<SlotId>> slotsByTid = new HashMap<>();
+        for (SlotId targetSid: analyzer.getValueTransferTargets(srcSid)) {
+            TupleDescriptor tupleDesc = analyzer.getSlotDesc(targetSid).getParent();
+            if (tupleDesc.getTable() == null) continue;
+            List<SlotId> sids = slotsByTid.computeIfAbsent(tupleDesc.getId(), k -> new ArrayList<>());
+            sids.add(targetSid);
+        }
+        return slotsByTid;
+    }
+
+    public Expr getTargetExpr(PlanNodeId targetPlanNodeId) {
+        for (RuntimeFilterTarget target: targets) {
+            if (target.node.getId() != targetPlanNodeId) continue;
+            return target.expr;
+        }
+        return null;
+    }
+
+    /**
+     * Estimates the selectivity of a runtime filter as the cardinality of the
+     * associated source join node over the cardinality of that join node's left
+     * child.
+     */
+    public double getSelectivity() {
+        if (builderNode.getCardinality() == -1
+                || builderNode.getChild(0).getCardinality() == -1
+                || builderNode.getChild(0).getCardinality() == 0) {
+            return -1;
+        }
+        return builderNode.getCardinality() / (double) builderNode.getChild(0).getCardinality();
+    }
+
+    public void addTarget(RuntimeFilterTarget target) { targets.add(target); }
+
+    public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin = isBroadcast; }
+
+    public void computeNdvEstimate() { ndvEstimate = builderNode.getChild(1).getCardinality(); }
+
+    public void extractTargetsPosition() {
+        Preconditions.checkNotNull(builderNode.getFragment());
+        Preconditions.checkState(hasTargets());
+        for (RuntimeFilterTarget target: targets) {
+            Preconditions.checkNotNull(target.node.getFragment());
+            hasLocalTargets = hasLocalTargets || target.isLocalTarget;
+            hasRemoteTargets = hasRemoteTargets || !target.isLocalTarget;
+        }
+    }
+
+    /**
+     * Sets the filter size (in bytes) required for a bloom filter to achieve the
+     * configured maximum false-positive rate based on the expected NDV. Also bounds the
+     * filter size between the max and minimum filter sizes supplied to it by
+     * 'filterSizeLimits'.
+     * Considering that the `IN` filter may be converted to the `Bloom FIlter` when crossing fragments,
+     * the bloom filter size is always calculated.
+     */
+    private void calculateFilterSize(RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+        if (ndvEstimate == -1) {
+            filterSizeBytes = filterSizeLimits.defaultVal;
+            return;
+        }
+        double fpp = FeConstants.default_bloom_filter_fpp;
+        int logFilterSize = GetMinLogSpaceForBloomFilter(ndvEstimate, fpp);
+        filterSizeBytes = 1L << logFilterSize;
+        filterSizeBytes = Math.max(filterSizeBytes, filterSizeLimits.minVal);
+        filterSizeBytes = Math.min(filterSizeBytes, filterSizeLimits.maxVal);
+    }
+
+    /**
+     * Returns the log (base 2) of the minimum number of bytes we need for a Bloom
+     * filter with 'ndv' unique elements and a false positive probability of less
+     * than 'fpp'.
+     */
+    public static int GetMinLogSpaceForBloomFilter(long ndv, double fpp) {
+        if (0 == ndv) return 0;
+        double k = 8; // BUCKET_WORDS
+        // m is the number of bits we would need to get the fpp specified
+        double m = -k * ndv / Math.log(1 - Math.pow(fpp, 1.0 / k));
+
+        // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
+        return Math.max(0, (int)(Math.ceil(Math.log(m / 8)/Math.log(2))));
+    }
+
+    /**
+     * Assigns this runtime filter to the corresponding plan nodes.
+     */
+    public void assignToPlanNodes() {
+        Preconditions.checkState(hasTargets());
+        builderNode.addRuntimeFilter(this);
+        for (RuntimeFilterTarget target: targets) {
+            target.node.addRuntimeFilter(this);
+            // fragment is expected to use this filter id
+            target.node.fragment_.setTargetRuntimeFilterIds(this.id);
+        }
+    }
+
+    public void registerToPlan(Analyzer analyzer) {
+        setIsBroadcast(getBuilderNode().getDistributionMode() == HashJoinNode.DistributionMode.BROADCAST);
+        if (LOG.isTraceEnabled()) LOG.trace("Runtime filter: " + debugString());
+        assignToPlanNodes();
+        analyzer.putAssignedRuntimeFilter(this);
+        getBuilderNode().fragment_.setBuilderRuntimeFilterIds(getFilterId());
+    }
+
+    public String debugString() {
+        return "FilterID: " + id + " " +
+                "Source: " + builderNode.getId() + " " +
+                "SrcExpr: " + getSrcExpr().debugString() + " " +
+                "Target(s): " +
+                Joiner.on(", ").join(targets) + " " +
+                "Selectivity: " + getSelectivity();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
new file mode 100644
index 0000000..7c53de3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
@@ -0,0 +1,400 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprSubstitutionMap;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.util.BitUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterMode;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class used for generating and assigning runtime filters to a query plan using
+ * runtime filter propagation. Runtime filter propagation is an optimization technique
+ * used to filter scanned tuples or scan ranges based on information collected at
+ * runtime. A runtime filter is constructed during the build phase of a join node, and is
+ * applied at, potentially, multiple scan nodes on the probe side of that join node.
+ * Runtime filters are generated from equal-join predicates but they do not replace the
+ * original predicates.
+ *
+ * MinMax filters are of a fixed size (except for those used for string type) and
+ * therefore only sizes for bloom filters need to be calculated. These calculations are
+ * based on the NDV estimates of the associated table columns, the min buffer size that
+ * can be allocated by the bufferpool, and the query options. Moreover, it is also bound
+ * by the MIN/MAX_BLOOM_FILTER_SIZE limits which are enforced on the query options before
+ * this phase of planning.
+ *
+ * Example: select * from T1, T2 where T1.a = T2.b and T2.c = '1';
+ * Assuming that T1 is a fact table and T2 is a significantly smaller dimension table, a
+ * runtime filter is constructed at the join node between tables T1 and T2 while building
+ * the hash table on the values of T2.b (rhs of the join condition) from the tuples of T2
+ * that satisfy predicate T2.c = '1'. The runtime filter is subsequently sent to the
+ * scan node of table T1 and is applied on the values of T1.a (lhs of the join condition)
+ * to prune tuples of T2 that cannot be part of the join result.
+ */
+public final class RuntimeFilterGenerator {
+    private final static Logger LOG = LogManager.getLogger(RuntimeFilterGenerator.class);
+
+    // Map of base table tuple ids to a list of runtime filters that
+    // can be applied at the corresponding scan nodes.
+    private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid = new HashMap<>();
+
+    // Generator for filter ids
+    private final IdGenerator<RuntimeFilterId> filterIdGenerator = RuntimeFilterId.createGenerator();
+
+    /**
+     * Internal class that encapsulates the max, min and default sizes used for creating
+     * bloom filter objects.
+     */
+    public static class FilterSizeLimits {
+        // Maximum filter size, in bytes, rounded up to a power of two.
+        public final long maxVal;
+
+        // Minimum filter size, in bytes, rounded up to a power of two.
+        public final long minVal;
+
+        // Pre-computed default filter size, in bytes, rounded up to a power of two.
+        public final long defaultVal;
+
+        public FilterSizeLimits(SessionVariable sessionVariable) {
+            // Round up all limits to a power of two
+            long maxLimit = sessionVariable.getRuntimeBloomFilterMaxSize();
+            maxVal = BitUtil.roundUpToPowerOf2(maxLimit);
+
+            long minLimit = sessionVariable.getRuntimeBloomFilterMinSize();
+            // Make sure minVal <= defaultVal <= maxVal
+            minVal = BitUtil.roundUpToPowerOf2(Math.min(minLimit, maxVal));
+
+            long defaultValue = sessionVariable.getRuntimeBloomFilterSize();
+            defaultValue = Math.max(defaultValue, minVal);
+            defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal));
+        }
+    }
+
+    // Contains size limits for bloom filters.
+    private final FilterSizeLimits bloomFilterSizeLimits;
+
+    private final Analyzer analyzer;
+    private final SessionVariable sessionVariable;
+
+    private RuntimeFilterGenerator(Analyzer analyzer) {
+        this.analyzer = analyzer;
+        this.sessionVariable = ConnectContext.get().getSessionVariable();
+        Preconditions.checkNotNull(this.sessionVariable);
+        bloomFilterSizeLimits = new FilterSizeLimits(sessionVariable);
+    }
+
+    /**
+     * Generates and assigns runtime filters to a query plan tree.
+     */
+    public static void generateRuntimeFilters(Analyzer analyzer, PlanNode plan) {
+        Preconditions.checkNotNull(analyzer);
+        int maxNumBloomFilters = ConnectContext.get().getSessionVariable().getRuntimeFiltersMaxNum();
+        int runtimeFilterType = ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+        Preconditions.checkState(maxNumBloomFilters >= 0);
+        RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(analyzer);
+        Preconditions.checkState(runtimeFilterType >= 0, "runtimeFilterType not expected");
+        Preconditions.checkState(runtimeFilterType
+                <= Arrays.stream(TRuntimeFilterType.values()).mapToInt(TRuntimeFilterType::getValue).sum()
+                , "runtimeFilterType not expected");
+        filterGenerator.generateFilters(plan);
+        List<RuntimeFilter> filters = filterGenerator.getRuntimeFilters();
+        if (filters.size() > maxNumBloomFilters) {
+            // If more than 'maxNumBloomFilters' were generated, sort them by increasing
+            // selectivity and keep the 'maxNumBloomFilters' most selective bloom filters.
+            filters.sort((a, b) -> {
+                double aSelectivity =
+                        a.getSelectivity() == -1 ? Double.MAX_VALUE : a.getSelectivity();
+                double bSelectivity =
+                        b.getSelectivity() == -1 ? Double.MAX_VALUE : b.getSelectivity();
+                return Double.compare(aSelectivity, bSelectivity);
+            });
+        }
+        // We only enforce a limit on the number of bloom filters as they are much more
+        // heavy-weight than the other filter types.
+        int numBloomFilters = 0;
+        for (RuntimeFilter filter: filters) {
+            filter.extractTargetsPosition();
+            // When there is a remote target, the producer and consumer of the filter are not in the same fragment at
+            // this time, and the filter build by the producer needs to be merged. Currently, the IN filter has
+            // no merge logic, so replace it with Bloom Filter.
+            // The reason for this is that in the IN pushdown implemented by early Doris, when OlapScanNode and
+            // HashJoinNode are not in the same fragment, the IN filter will be pushed down to the nearest
+            // ExchangeNode, so that although it cannot be pushed down to the storage engine to improve performance,
+            // In some extreme cases, the number of rows in the hash table constructed by HashJoinNode can be reduced,
+            // thereby avoiding OOM. To cover the previous case (from tpcds 1T query 17), replace IN with Bloom Filter.
+            // Only when no Bloom Filter is generated, will IN be converted to Bloom Filter and pushed down.
+            if (filter.getType() == TRuntimeFilterType.IN && filter.hasRemoteTargets()) {
+                if ((runtimeFilterType & TRuntimeFilterType.BLOOM.getValue()) == 0) {
+                    filter.setType(TRuntimeFilterType.BLOOM);
+                } else {
+                    continue;
+                }
+            }
+            if (filter.getType() == TRuntimeFilterType.BLOOM) {
+                if (numBloomFilters >= maxNumBloomFilters) continue;
+                ++numBloomFilters;
+            }
+            filter.registerToPlan(analyzer);
+        }
+    }
+
+    /**
+     * Returns a list of all the registered runtime filters, ordered by filter ID.
+     */
+    public List<RuntimeFilter> getRuntimeFilters() {
+        Set<RuntimeFilter> resultSet = new HashSet<>();
+        for (List<RuntimeFilter> filters: runtimeFiltersByTid.values()) {
+            resultSet.addAll(filters);
+        }
+        List<RuntimeFilter> resultList = Lists.newArrayList(resultSet);
+        resultList.sort((a, b) -> a.getFilterId().compareTo(b.getFilterId()));
+        return resultList;
+    }
+
+    /**
+     * Generates the runtime filters for a query by recursively traversing the distributed
+     * plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate
+     * runtime filters are generated from equi-join predicates assigned to hash-join nodes.
+     * In the bottom-up traversal of the plan tree, the filters are assigned to destination
+     * (scan) nodes. Filters that cannot be assigned to a scan node are discarded.
+     */
+    private void generateFilters(PlanNode root) {
+        if (root instanceof HashJoinNode) {
+            HashJoinNode joinNode = (HashJoinNode) root;
+            List<Expr> joinConjuncts = new ArrayList<>();
+            // It's not correct to push runtime filters to the left side of a left outer,
+            // full outer or anti join if the filter corresponds to an equi-join predicate
+            // from the ON clause.
+            if (!joinNode.getJoinOp().isLeftOuterJoin()
+                    && !joinNode.getJoinOp().isFullOuterJoin()
+                    && !joinNode.getJoinOp().isAntiJoin()) {
+                joinConjuncts.addAll(joinNode.getEqJoinConjuncts());
+            }
+
+            // TODO(zxy) supports PlanNode.conjuncts generate runtime filter.
+            // PlanNode.conjuncts (call joinNode.getConjuncts() here) Different from HashJoinNode.eqJoinConjuncts
+            // and HashJoinNode.otherJoinConjuncts.
+            // In previous tests, it was found that using PlanNode.conjuncts to generate runtimeFilter may cause
+            // incorrect results. For example, When the `in` subquery is converted to join, the join conjunct will be
+            // saved in PlanNode.conjuncts. At this time, using the automatically generated join conjunct to generate
+            // a runtimeFilter, some rows may be missing in the result.
+            // SQL: select * from T as a where k1 = (select count(1) from T as b where a.k1 = b.k1);
+            // Table T has only one INT column. At this time, `a.k1 = b.k1` is in eqJoinConjuncts,
+            // `k1` = ifnull(xxx) is in conjuncts, the runtimeFilter generated according to conjuncts will cause
+            // the result to be empty, but the actual result should have data returned.
+
+            List<RuntimeFilter> filters = new ArrayList<>();
+            // Actually all types of Runtime Filter objects generated by the same joinConjunct have the same
+            // properties except ID. Maybe consider avoiding repeated generation
+            for (TRuntimeFilterType type : TRuntimeFilterType.values()) {
+                if ((sessionVariable.getRuntimeFilterType() & type.getValue()) == 0) continue;
+                for (int i = 0; i < joinConjuncts.size(); i++) {
+                    Expr conjunct = joinConjuncts.get(i);
+                    RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
+                            analyzer, conjunct, i, joinNode, type, bloomFilterSizeLimits);
+                    if (filter == null) continue;
+                    registerRuntimeFilter(filter);
+                    filters.add(filter);
+                }
+            }
+            generateFilters(root.getChild(0));
+            // Finalize every runtime filter of that join. This is to ensure that we don't
+            // assign a filter to a scan node from the right subtree of joinNode or ancestor
+            // join nodes in case we don't find a destination node in the left subtree.
+            for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter);
+            generateFilters(root.getChild(1));
+        } else if (root instanceof ScanNode) {
+            assignRuntimeFilters((ScanNode) root);
+        } else {
+            for (PlanNode childNode: root.getChildren()) {
+                generateFilters(childNode);
+            }
+        }
+    }
+
+    /**
+     * Registers a runtime filter with the tuple id of every scan node that is a candidate
+     * destination node for that filter.
+     */
+    private void registerRuntimeFilter(RuntimeFilter filter) {
+        Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots();
+        Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty());
+        for (TupleId tupleId: targetSlotsByTid.keySet()) {
+            registerRuntimeFilter(filter, tupleId);
+        }
+    }
+
+    /**
+     * Registers a runtime filter with a specific target tuple id.
+     */
+    private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) {
+        Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+        List<RuntimeFilter> filters = runtimeFiltersByTid.computeIfAbsent(targetTid, k -> new ArrayList<>());
+        Preconditions.checkState(!filter.isFinalized());
+        filters.add(filter);
+    }
+
+    /**
+     * Finalizes a runtime filter by disassociating it from all the candidate target scan
+     * nodes that haven't been used as destinations for that filter. Also sets the
+     * finalized flag of that filter so that it can't be assigned to any other scan nodes.
+     */
+    private void finalizeRuntimeFilter(RuntimeFilter runtimeFilter) {
+        Set<TupleId> targetTupleIds = new HashSet<>();
+        for (RuntimeFilter.RuntimeFilterTarget target: runtimeFilter.getTargets()) {
+            targetTupleIds.addAll(target.node.getTupleIds());
+        }
+        for (TupleId tupleId: runtimeFilter.getTargetSlots().keySet()) {
+            if (!targetTupleIds.contains(tupleId)) {
+                runtimeFiltersByTid.get(tupleId).remove(runtimeFilter);
+            }
+        }
+        runtimeFilter.markFinalized();
+    }
+
+    /**
+     * Assigns runtime filters to a specific scan node 'scanNode'.
+     * The assigned filters are the ones for which 'scanNode' can be used as a destination
+     * node. The following constraints are enforced when assigning filters to 'scanNode':
+     * 1. If the RUNTIME_FILTER_MODE query option is set to LOCAL, a filter is only assigned
+     *    to 'scanNode' if the filter is produced within the same fragment that contains the
+     *    scan node.
+     * 2. Only olap scan nodes are supported:
+     */
+    private void assignRuntimeFilters(ScanNode scanNode) {
+        if (!(scanNode instanceof OlapScanNode)) return;
+        TupleId tid = scanNode.getTupleIds().get(0);
+        if (!runtimeFiltersByTid.containsKey(tid)) return;
+        String runtimeFilterMode = sessionVariable.getRuntimeFilterMode();
+        Preconditions.checkState(Arrays.stream(TRuntimeFilterMode.values()).map(Enum::name).anyMatch(
+                p -> p.equals(runtimeFilterMode.toUpperCase())), "runtimeFilterMode not expected");
+        for (RuntimeFilter filter: runtimeFiltersByTid.get(tid)) {
+            if (filter.isFinalized()) continue;
+            Expr targetExpr = computeTargetExpr(filter, tid);
+            if (targetExpr == null) continue;
+            boolean isBoundByKeyColumns = isBoundByKeyColumns(analyzer, targetExpr, scanNode);
+            boolean isLocalTarget = isLocalTarget(filter, scanNode);
+            if (runtimeFilterMode.equals(TRuntimeFilterMode.LOCAL.name()) && !isLocalTarget) continue;
+            if (runtimeFilterMode.equals(TRuntimeFilterMode.REMOTE.name()) && isLocalTarget) continue;
+
+            RuntimeFilter.RuntimeFilterTarget target = new RuntimeFilter.RuntimeFilterTarget(
+                    scanNode, targetExpr, isBoundByKeyColumns, isLocalTarget);
+            filter.addTarget(target);
+        }
+    }
+
+    /**
+     * Check if 'targetNode' is local to the source node of 'filter'.
+     */
+    private static boolean isLocalTarget(RuntimeFilter filter, ScanNode targetNode) {
+        return targetNode.getFragment().getId().equals(filter.getBuilderNode().getFragment().getId());
+    }
+
+    /**
+     * Check if all the slots of'targetExpr' is key.
+     */
+    private static boolean isBoundByKeyColumns(Analyzer analyzer, Expr targetExpr, ScanNode targetNode) {
+        Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
+        List<SlotId> sids = new ArrayList<>();
+        targetExpr.getIds(null, sids);
+        for (SlotId sid : sids) {
+            // Take slotDesc from the desc of targetExpr the same
+            SlotDescriptor slotDesc = analyzer.getSlotDesc(sid);
+            if (slotDesc.getColumn() == null || !slotDesc.getColumn().isKey()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Computes the target expr for a specified runtime filter 'filter' to be applied at
+     * the scan node with target tuple descriptor 'targetTid'.
+     */
+    private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid) {
+        Expr targetExpr = filter.getOrigTargetExpr();
+        // if there is a subquery on the left side of join, in order to push to scan in the subquery,
+        // targetExpr will return false as long as there is a slotref parent node that is not targetTid.
+        // But when this slotref can be transferred to the targetTid slot, such as Aa + Bb = Cc,
+        // targetTid is B, if Aa can be transferred to Ba, that is, Aa and Ba are equivalent columns,
+        // then replace Aa with Ba, and then calculate for targetTid targetExpr
+        if (!targetExpr.isBound(targetTid)) {
+            Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+            // Modify the filter target expr using the equivalent slots from the scan node
+            // on which the filter will be applied.
+            ExprSubstitutionMap smap = new ExprSubstitutionMap();
+            List<SlotRef> exprSlots = new ArrayList<>();
+            // Get the ids of all slotRef children of targetExpr, which is equal to the deduplication of
+            // all slots of targetSlotsByTid.
+            targetExpr.collect(SlotRef.class, exprSlots);
+            // targetExpr specifies the id of the slotRef node in the `tupleID`
+            List<SlotId> sids = filter.getTargetSlots().get(targetTid);
+            for (SlotRef slotRef: exprSlots) {
+                for (SlotId sid: sids) {
+                    if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) {
+                        SlotRef newSlotRef = new SlotRef(analyzer.getSlotDesc(sid));
+                        newSlotRef.analyzeNoThrow(analyzer);
+                        smap.put(slotRef, newSlotRef);
+                        break;
+                    }
+                }
+            }
+            Preconditions.checkState(exprSlots.size() == smap.size());
+            try {
+                targetExpr = targetExpr.substitute(smap, analyzer, false);
+            } catch (Exception e) {
+                return null;
+            }
+        }
+        Type srcType = filter.getSrcExpr().getType();
+        // Types of targetExpr and srcExpr must be exactly the same since runtime filters are
+        // based on hashing.
+        if (!targetExpr.getType().equals(srcType)) {
+            try {
+                targetExpr = targetExpr.castTo(srcType);
+            } catch (Exception e) {
+                return null;
+            }
+        }
+        return targetExpr;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java
new file mode 100644
index 0000000..c4339b4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.common.Id;
+import org.apache.doris.common.IdGenerator;
+
+public class RuntimeFilterId extends Id<RuntimeFilterId> {
+    // Construction only allowed via an IdGenerator.
+    protected RuntimeFilterId(int id) {
+        super(id);
+    }
+
+    public static IdGenerator<RuntimeFilterId> createGenerator() {
+        return new IdGenerator<RuntimeFilterId>() {
+            @Override
+            public RuntimeFilterId getNextId() {
+                return new RuntimeFilterId(nextId_++);
+            }
+
+            @Override
+            public RuntimeFilterId getMaxId() {
+                return new RuntimeFilterId(nextId_ - 1);
+            }
+        };
+    }
+
+    @Override
+    public String toString() {
+        return String.format("RF%03d", id);
+    }
+
+    @Override
+    public int hashCode() {
+        return id;
+    }
+
+    public int compareTo(RuntimeFilterId cmp) {
+        return Integer.compare(id, cmp.id);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 2a71437..e039881 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -55,6 +55,8 @@ abstract public class ScanNode extends PlanNode {
         return result;
     }
 
+    public TupleDescriptor getTupleDesc() { return desc; }
+
     public void setColumnFilters(Map<String, PartitionColumnFilter> columnFilters) {
         this.columnFilters = columnFilters;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index ffd8cc3..9dcadd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -2202,4 +2202,35 @@ public class SingleNodePlanner {
         }
         return analyzer.getUnassignedConjuncts(tupleIds);
     }
+
+    /**
+     * Returns a normalized version of a binary equality predicate 'expr' where the lhs
+     * child expr is bound by some tuple in 'lhsTids' and the rhs child expr is bound by
+     * some tuple in 'rhsTids'. Returns 'expr' if this predicate is already normalized.
+     * Returns null in any of the following cases:
+     * 1. It is not an equality predicate
+     * 2. One of the operands is a constant
+     * 3. Both children of this predicate are the same expr
+     * The so-called normalization is to ensure that the above conditions are met, and then
+     * to ensure that the order of expr is consistent with the order of node
+     */
+    public static BinaryPredicate getNormalizedEqPred(Expr expr, List<TupleId> lhsTids,
+                                                      List<TupleId> rhsTids, Analyzer analyzer) {
+        if (!(expr instanceof BinaryPredicate)) return null;
+        BinaryPredicate pred = (BinaryPredicate) expr;
+        if (!pred.getOp().isEquivalence()) {
+            return null;
+        }
+        if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) return null;
+
+        // Use the child that contains lhsTids as lhsExpr, for example, A join B on B.k = A.k,
+        // where lhsExpr=A.k, rhsExpr=B.k, changed the order, A.k = B.k
+        Expr lhsExpr = Expr.getFirstBoundChild(pred, lhsTids);
+        Expr rhsExpr = Expr.getFirstBoundChild(pred, rhsTids);
+        if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) return null;
+
+        BinaryPredicate result = new BinaryPredicate(pred.getOp(), lhsExpr, rhsExpr);
+        result.analyzeNoThrow(analyzer);
+        return result;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 10ae31f..11375f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -48,6 +48,8 @@ import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ResultSink;
+import org.apache.doris.planner.RuntimeFilter;
+import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.UnionNode;
@@ -72,6 +74,8 @@ import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TRuntimeFilterParams;
+import org.apache.doris.thrift.TRuntimeFilterTargetParams;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TScanRangeParams;
@@ -201,6 +205,17 @@ public class Coordinator {
     // parallel execute
     private final TUniqueId nextInstanceId;
 
+    // Runtime filter merge instance address and ID
+    public TNetworkAddress runtimeFilterMergeAddr;
+    public TUniqueId runtimeFilterMergeInstanceId;
+    // Runtime filter ID to the target instance address of the fragment,
+    // that is expected to use this runtime filter, the instance address is not repeated
+    public Map<RuntimeFilterId, List<FRuntimeFilterTargetParam>> ridToTargetParam = Maps.newHashMap();
+    // The runtime filter that expects the instance to be used
+    public List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
+    // Runtime filter ID to the builder instance number
+    public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
+
     // Used for query/insert
     public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
         this.isBlockQuery = planner.isBlockQuery();
@@ -224,6 +239,7 @@ public class Coordinator {
         this.nextInstanceId = new TUniqueId();
         nextInstanceId.setHi(queryId.hi);
         nextInstanceId.setLo(queryId.lo + 1);
+        this.assignedRuntimeFilters = analyzer.getAssignedRuntimeFilter();
     }
 
     // Used for broker load task/export task coordinator
@@ -804,6 +820,9 @@ public class Coordinator {
             }
         }
 
+        // assign runtime filter merge addr and target addr
+        assignRuntimeFilterAddr();
+
         // compute destinations and # senders per exchange node
         // (the root fragment doesn't have a destination)
         for (FragmentExecParams params : fragmentExecParamsMap.values()) {
@@ -1117,6 +1136,31 @@ public class Coordinator {
         }
     }
 
+    // Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship
+    // between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter
+    private void assignRuntimeFilterAddr() throws Exception {
+        for (PlanFragment fragment: fragments) {
+            FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());
+            // Transform <fragment, runtimeFilterId> to <runtimeFilterId, fragment>
+            for (RuntimeFilterId rid: fragment.getTargetRuntimeFilterIds()) {
+                List<FRuntimeFilterTargetParam> targetFragments =
+                        ridToTargetParam.computeIfAbsent(rid, k -> new ArrayList<>());
+                for (final FInstanceExecParam instance : params.instanceExecParams) {
+                    targetFragments.add(new FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host)));
+                }
+            }
+
+            for (RuntimeFilterId rid: fragment.getBuilderRuntimeFilterIds()) {
+                ridToBuilderNum.merge(rid, params.instanceExecParams.size(), Integer::sum);
+            }
+        }
+        // Use the uppermost fragment as a merged node, the uppermost fragment has one and only one instance
+        FragmentExecParams uppermostParams = fragmentExecParamsMap.get(fragments.get(0).getFragmentId());
+        runtimeFilterMergeAddr = toBrpcHost(uppermostParams.instanceExecParams.get(0).host);
+        runtimeFilterMergeInstanceId = uppermostParams.instanceExecParams.get(0).instanceId;
+    }
+
+    // One fragment could only have one HashJoinNode
     private boolean isColocateJoin(PlanNode node) {
         // TODO(cmy): some internal process, such as broker load task, do not have ConnectContext.
         // Any configurations needed by the Coordinator should be passed in Coordinator initialization.
@@ -1957,6 +2001,24 @@ public class Coordinator {
                 params.setQueryOptions(queryOptions);
                 params.params.setSendQueryStatisticsWithEveryBatch(
                         fragment.isTransferQueryStatisticsWithEveryBatch());
+                params.params.setRuntimeFilterParams(new TRuntimeFilterParams());
+                params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
+                if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
+                    for (Map.Entry<RuntimeFilterId, List<FRuntimeFilterTargetParam>> entry: ridToTargetParam.entrySet()) {
+                        List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
+                        for (FRuntimeFilterTargetParam targetParam: entry.getValue()) {
+                            targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
+                                    targetParam.targetFragmentInstanceAddr));
+                        }
+                        params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(), targetParams);
+                    }
+                    for (Map.Entry<RuntimeFilterId, Integer> entry: ridToBuilderNum.entrySet()) {
+                        params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(entry.getKey().asInt(), entry.getValue());
+                    }
+                    for (RuntimeFilter rf: assignedRuntimeFilters) {
+                        params.params.runtime_filter_params.putToRidToRuntimeFilter(rf.getFilterId().asInt(), rf.toThrift());
+                    }
+                }
                 if (queryOptions.getQueryType() == TQueryType.LOAD) {
                     LoadErrorHub.Param param = Catalog.getCurrentCatalog().getLoadInstance().getLoadErrorHubInfo();
                     if (param != null) {
@@ -1966,7 +2028,6 @@ public class Coordinator {
                         }
                     }
                 }
-
                 paramsList.add(params);
             }
             return paramsList;
@@ -2089,6 +2150,17 @@ public class Coordinator {
             fragmentProfile.get(backendExecState.profileFragmentId).addChild(backendExecState.profile);
         }
     }
+
+    // Runtime filter target fragment instance param
+    static class FRuntimeFilterTargetParam {
+        public TUniqueId targetFragmentInstanceId;;
+        public TNetworkAddress targetFragmentInstanceAddr;
+
+        public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
+            this.targetFragmentInstanceId = id;
+            this.targetFragmentInstanceAddr = host;
+        }
+    }
 }
 
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
new file mode 100644
index 0000000..8806263
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used for encoding and decoding of session variable runtime_filter_type
+ */
+public class RuntimeFilterTypeHelper {
+    private static final Logger LOG = LogManager.getLogger(RuntimeFilterTypeHelper.class);
+
+    public final static long ALLOWED_MASK = (TRuntimeFilterType.IN.getValue() |
+            TRuntimeFilterType.BLOOM.getValue() | TRuntimeFilterType.MIN_MAX.getValue());
+
+    private final static Map<String, Long> varValueSet = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+
+    static {
+        varValueSet.put("IN", (long) TRuntimeFilterType.IN.getValue());
+        varValueSet.put("BLOOM_FILTER", (long) TRuntimeFilterType.BLOOM.getValue());
+        varValueSet.put("MIN_MAX", (long) TRuntimeFilterType.MIN_MAX.getValue());
+    }
+
+    // convert long type variable value to string type that user can read
+    public static String decode(Long varValue) throws DdlException {
+        // 0 parse to empty string
+        if (varValue == 0) {
+            return "";
+        }
+        if ((varValue & ~ALLOWED_MASK) != 0) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, varValue);
+        }
+
+        List<String> names = new ArrayList<String>();
+        for (Map.Entry<String, Long> value : getSupportedVarValue().entrySet()) {
+            if ((varValue & value.getValue()) != 0) {
+                names.add(value.getKey());
+            }
+        }
+
+        return Joiner.on(',').join(names);
+    }
+
+    // convert string type variable value to long type that session can store
+    public static Long encode(String varValue) throws DdlException {
+        List<String> names = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(varValue);
+
+        // empty string parse to 0
+        long resultCode = 0;
+        for (String key : names) {
+            long code = 0;
+            if (StringUtils.isNumeric(key)) {
+                code |= Long.parseLong(key);
+            } else {
+                code = getCodeFromString(key);
+                if (code == 0) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, key);
+                }
+            }
+            resultCode |= code;
+            if ((resultCode & ~ALLOWED_MASK) != 0) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, key);
+            }
+        }
+        return resultCode;
+    }
+
+    // check if this variable value is supported
+    public static boolean isSupportedVarValue(String varValue) {
+        return varValue != null && getSupportedVarValue().containsKey(varValue);
+    }
+
+    // encode variable value from string to long
+    private static long getCodeFromString(String varValue) {
+        long code = 0;
+        if (isSupportedVarValue(varValue)) {
+            code |= getSupportedVarValue().get(varValue);
+        }
+        return code;
+    }
+
+    public static Map<String, Long> getSupportedVarValue() {
+        return varValueSet;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3146ae3..0dcae49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -115,6 +115,24 @@ public class SessionVariable implements Serializable, Writable {
     // when true, the partition column must be set to NOT NULL.
     public static final String ALLOW_PARTITION_COLUMN_NULLABLE = "allow_partition_column_nullable";
 
+    // runtime filter run mode
+    public static final String RUNTIME_FILTER_MODE = "runtime_filter_mode";
+    // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will
+    // be rounded up to the nearest power of two.
+    public static final String RUNTIME_BLOOM_FILTER_SIZE = "runtime_bloom_filter_size";
+    // Minimum runtime bloom filter size, in bytes
+    public static final String RUNTIME_BLOOM_FILTER_MIN_SIZE = "runtime_bloom_filter_min_size";
+    // Maximum runtime bloom filter size, in bytes
+    public static final String RUNTIME_BLOOM_FILTER_MAX_SIZE = "runtime_bloom_filter_max_size";
+    // Time in ms to wait until runtime filters are delivered.
+    public static final String RUNTIME_FILTER_WAIT_TIME_MS = "runtime_filter_wait_time_ms";
+    // Maximum number of bloom runtime filters allowed per query
+    public static final String RUNTIME_FILTERS_MAX_NUM = "runtime_filters_max_num";
+    // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
+    public static final String RUNTIME_FILTER_TYPE = "runtime_filter_type";
+    // if the right table is greater than this value in the hash join,  we will ignore IN filter
+    public static final String RUNTIME_FILTER_MAX_IN_NUM = "runtime_filter_max_in_num";
+
     // max ms to wait transaction publish finish when exec insert stmt.
     public static final String INSERT_VISIBLE_TIMEOUT_MS = "insert_visible_timeout_ms";
 
@@ -310,6 +328,23 @@ public class SessionVariable implements Serializable, Writable {
 
     @VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true)
     public boolean extractWideRangeExpr = true;
+    @VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE)
+    private String runtimeFilterMode = "GLOBAL";
+    @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_SIZE)
+    private int runtimeBloomFilterSize = 2097152;
+    @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_MIN_SIZE)
+    private int runtimeBloomFilterMinSize = 1048576;
+    @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_MAX_SIZE)
+    private int runtimeBloomFilterMaxSize = 16777216;
+    @VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS)
+    private int runtimeFilterWaitTimeMs = 1000;
+    @VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM)
+    private int runtimeFiltersMaxNum = 10;
+    // Set runtimeFilterType to IN filter
+    @VariableMgr.VarAttr(name = RUNTIME_FILTER_TYPE)
+    private int runtimeFilterType = 1;
+    @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM)
+    private int runtimeFilterMaxInNum = 1024;
 
     public long getMaxExecMemByte() {
         return maxExecMemByte;
@@ -585,6 +620,70 @@ public class SessionVariable implements Serializable, Writable {
         return allowPartitionColumnNullable;
     }
 
+    public String getRuntimeFilterMode() {
+        return runtimeFilterMode;
+    }
+
+    public void setRuntimeFilterMode(String runtimeFilterMode) {
+        this.runtimeFilterMode = runtimeFilterMode;
+    }
+
+    public int getRuntimeBloomFilterSize() {
+        return runtimeBloomFilterSize;
+    }
+
+    public void setRuntimeBloomFilterSize(int runtimeBloomFilterSize) {
+        this.runtimeBloomFilterSize = runtimeBloomFilterSize;
+    }
+
+    public int getRuntimeBloomFilterMinSize() {
+        return runtimeBloomFilterMinSize;
+    }
+
+    public void setRuntimeBloomFilterMinSize(int runtimeBloomFilterMinSize) {
+        this.runtimeBloomFilterMinSize = runtimeBloomFilterMinSize;
+    }
+
+    public int getRuntimeBloomFilterMaxSize() {
+        return runtimeBloomFilterMaxSize;
+    }
+
+    public void setRuntimeBloomFilterMaxSize(int runtimeBloomFilterMaxSize) {
+        this.runtimeBloomFilterMaxSize = runtimeBloomFilterMaxSize;
+    }
+
+    public int getRuntimeFilterWaitTimeMs() {
+        return runtimeFilterWaitTimeMs;
+    }
+
+    public void setRuntimeFilterWaitTimeMs(int runtimeFilterWaitTimeMs) {
+        this.runtimeFilterWaitTimeMs = runtimeFilterWaitTimeMs;
+    }
+
+    public int getRuntimeFiltersMaxNum() {
+        return runtimeFiltersMaxNum;
+    }
+
+    public void setRuntimeFiltersMaxNum(int runtimeFiltersMaxNum) {
+        this.runtimeFiltersMaxNum = runtimeFiltersMaxNum;
+    }
+
+    public int getRuntimeFilterType() {
+        return runtimeFilterType;
+    }
+
+    public void setRuntimeFilterType(int runtimeFilterType) {
+        this.runtimeFilterType = runtimeFilterType;
+    }
+
+    public int getRuntimeFilterMaxInNum() {
+        return runtimeFilterMaxInNum;
+    }
+
+    public void setRuntimeFilterMaxInNum(int runtimeFilterMaxInNum) {
+        this.runtimeFilterMaxInNum = runtimeFilterMaxInNum;
+    }
+
     public long getInsertVisibleTimeoutMs() {
         if (insertVisibleTimeoutMs < MIN_INSERT_VISIBLE_TIMEOUT_MS) {
             return MIN_INSERT_VISIBLE_TIMEOUT_MS;
@@ -658,6 +757,9 @@ public class SessionVariable implements Serializable, Writable {
 
         tResult.setEnableSpilling(enableSpilling);
         tResult.setEnableEnableExchangeNodeParallelMerge(enableExchangeNodeParallelMerge);
+
+        tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs);
+        tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum);
         return tResult;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index e48811a..b4c85e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -156,42 +156,44 @@ public class VariableMgr {
     // Set value to a variable
     private static boolean setValue(Object obj, Field field, String value) throws DdlException {
         VarAttr attr = field.getAnnotation(VarAttr.class);
-        String convertedVal = VariableVarConverters.convert(attr.name(), value);
+        if (VariableVarConverters.hasConverter(attr.name())) {
+            value = VariableVarConverters.encode(attr.name(), value).toString();
+        }
         try {
             switch (field.getType().getSimpleName()) {
                 case "boolean":
-                    if (convertedVal.equalsIgnoreCase("ON")
-                            || convertedVal.equalsIgnoreCase("TRUE")
-                            || convertedVal.equalsIgnoreCase("1")) {
+                    if (value.equalsIgnoreCase("ON")
+                            || value.equalsIgnoreCase("TRUE")
+                            || value.equalsIgnoreCase("1")) {
                         field.setBoolean(obj, true);
-                    } else if (convertedVal.equalsIgnoreCase("OFF")
-                            || convertedVal.equalsIgnoreCase("FALSE")
-                            || convertedVal.equalsIgnoreCase("0")) {
+                    } else if (value.equalsIgnoreCase("OFF")
+                            || value.equalsIgnoreCase("FALSE")
+                            || value.equalsIgnoreCase("0")) {
                         field.setBoolean(obj, false);
                     } else {
                         throw new IllegalAccessException();
                     }
                     break;
                 case "byte":
-                    field.setByte(obj, Byte.valueOf(convertedVal));
+                    field.setByte(obj, Byte.valueOf(value));
                     break;
                 case "short":
-                    field.setShort(obj, Short.valueOf(convertedVal));
+                    field.setShort(obj, Short.valueOf(value));
                     break;
                 case "int":
-                    field.setInt(obj, Integer.valueOf(convertedVal));
+                    field.setInt(obj, Integer.valueOf(value));
                     break;
                 case "long":
-                    field.setLong(obj, Long.valueOf(convertedVal));
+                    field.setLong(obj, Long.valueOf(value));
                     break;
                 case "float":
-                    field.setFloat(obj, Float.valueOf(convertedVal));
+                    field.setFloat(obj, Float.valueOf(value));
                     break;
                 case "double":
-                    field.setDouble(obj, Double.valueOf(convertedVal));
+                    field.setDouble(obj, Double.valueOf(value));
                     break;
                 case "String":
-                    field.set(obj, convertedVal);
+                    field.set(obj, value);
                     break;
                 default:
                     // Unsupported type variable.
@@ -496,12 +498,12 @@ public class VariableMgr {
                     row.add(getValue(ctx.getObj(), ctx.getField()));
                 }
 
-                if (row.size() > 1 && row.get(0).equalsIgnoreCase(SessionVariable.SQL_MODE)) {
+                if (row.size() > 1 && VariableVarConverters.hasConverter(row.get(0))) {
                     try {
-                        row.set(1, SqlModeHelper.decode(Long.valueOf(row.get(1))));
+                        row.set(1, VariableVarConverters.decode(row.get(0), Long.valueOf(row.get(1))));
                     } catch (DdlException e) {
                         row.set(1, "");
-                        LOG.warn("Decode sql mode failed");
+                        LOG.warn("Decode session variable failed");
                     }
                 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
index 26d7dae..fcc06d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java
@@ -20,5 +20,7 @@ package org.apache.doris.qe;
 import org.apache.doris.common.DdlException;
 
 public interface VariableVarConverterI {
-    public String convert(String value) throws DdlException;
+    public Long encode(String value) throws DdlException;
+
+    public String decode(Long value) throws DdlException;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java
index 39b6155..8cecda1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java
@@ -22,23 +22,46 @@ import org.apache.doris.common.DdlException;
 
 import java.util.Map;
 
-// Helper class to drives the convert of session variables according to the converters.
-// You can define your converter that implements interface VariableVarConverterI in here.
-// Each converter should put in map (variable name -> converters) and only converts the variable
-// with specified name.
+/**
+ * Helper class to drives the convert of session variables according to the converters.
+ * You can define your converter that implements interface VariableVarConverterI in here.
+ * Each converter should put in map (variable name -> converters) and only converts the variable with specified name.
+ *
+ * The converted session variable is a special kind of variable.
+ * It's real type is int, so for example, when querying `select @@sql_mode`, the return column
+ * type is "int".
+ * But user usually set this variable by string, such as:
+ * `set @@sql_mode = 'STRICT_TRANS_TABLES'`
+ * or
+ * `set @@sql_mode = concat(@@sql_mode, 'STRICT_TRANS_TABLES')'`
+ */
 public class VariableVarConverters {
 
     public static final Map<String, VariableVarConverterI> converters = Maps.newHashMap();
+
     static {
         SqlModeConverter sqlModeConverter = new SqlModeConverter();
         converters.put(SessionVariable.SQL_MODE, sqlModeConverter);
+        RuntimeFilterTypeConverter runtimeFilterTypeConverter = new RuntimeFilterTypeConverter();
+        converters.put(SessionVariable.RUNTIME_FILTER_TYPE, runtimeFilterTypeConverter);
+    }
+
+    public static Boolean hasConverter(String varName) {
+        return converters.containsKey(varName);
     }
 
-    public static String convert(String varName, String value) throws DdlException {
+    public static Long encode(String varName, String value) throws DdlException {
         if (converters.containsKey(varName)) {
-            return converters.get(varName).convert(value);
+            return converters.get(varName).encode(value);
         }
-        return value;
+        return 0L;
+    }
+
+    public static String decode(String varName, Long value) throws DdlException {
+        if (converters.containsKey(varName)) {
+            return converters.get(varName).decode(value);
+        }
+        return "";
     }
 
     /* Converters */
@@ -46,8 +69,26 @@ public class VariableVarConverters {
     // Converter to convert sql mode variable
     public static class SqlModeConverter implements VariableVarConverterI {
         @Override
-        public String convert(String value) throws DdlException {
-            return SqlModeHelper.encode(value).toString();
+        public Long encode(String value) throws DdlException {
+            return SqlModeHelper.encode(value);
+        }
+
+        @Override
+        public String decode(Long value) throws DdlException {
+            return SqlModeHelper.decode(value);
+        }
+    }
+
+    // Converter to convert runtime filter type variable
+    public static class RuntimeFilterTypeConverter implements VariableVarConverterI {
+        @Override
+        public Long encode(String value) throws DdlException {
+            return RuntimeFilterTypeHelper.encode(value);
+        }
+
+        @Override
+        public String decode(Long value) throws DdlException {
+            return RuntimeFilterTypeHelper.decode(value);
         }
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 4aac012..5aac2db 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.thrift.TRuntimeFilterMode;
 import org.apache.doris.utframe.UtFrameUtils;
 
 import com.google.common.collect.Lists;
@@ -1317,6 +1318,84 @@ public class QueryPlanTest {
     }
 
     @Test
+    public void testRuntimeFilterMode() throws Exception {
+        connectContext.setDatabase("default_cluster:test");
+
+        String queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 = t2.k1";
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "LOCAL");
+        String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filter"));
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "REMOTE");
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertFalse(explainString.contains("runtime filter"));
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "OFF");
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertFalse(explainString.contains("runtime filter"));
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL");
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filter"));
+
+        queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 <=> t2.k1";
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "LOCAL");
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertFalse(explainString.contains("runtime filter"));
+
+        queryStr = "explain select * from jointest as a where k1 = (select count(1) from jointest as b where a.k1 = b.k1);";
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL");
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        System.out.println(explainString);
+        Assert.assertFalse(explainString.contains("runtime filter"));
+    }
+
+    @Test
+    public void testRuntimeFilterType() throws Exception {
+        connectContext.setDatabase("default_cluster:test");
+        String queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 = t2.k1";
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL");
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 0);
+        String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertFalse(explainString.contains("runtime filter"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 1);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 2);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 3);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 4);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[min_max] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[min_max] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 5);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 6);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] <- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] -> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`"));
+
+        Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[min_max] <- `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[min_max] -> `t2`.`k1`"));
+    }
+
+    @Test
     public void testEmptyNode() throws Exception {
         connectContext.setDatabase("default_cluster:test");
         String emptyNode = "EMPTYSET";
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
new file mode 100644
index 0000000..09fe44a
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java
@@ -0,0 +1,426 @@
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BaseTableRef;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.JoinOperator;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.thrift.TRuntimeFilterMode;
+
+import com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class RuntimeFilterGeneratorTest {
+    private Analyzer analyzer;
+    private PlanFragment testPlanFragment;
+    private HashJoinNode hashJoinNode;
+    private OlapScanNode lhsScanNode;
+    private OlapScanNode rhsScanNode;
+    @Mocked
+    private ConnectContext connectContext;
+
+    @Before
+    public void setUp() throws AnalysisException {
+        Catalog catalog = Deencapsulation.newInstance(Catalog.class);
+        analyzer = new Analyzer(catalog, connectContext);
+        new Expectations() {
+            {
+                analyzer.getClusterName();
+                result = "default_cluster";
+            }
+        };
+        TableRef tableRef = new TableRef();
+        Deencapsulation.setField(tableRef, "isAnalyzed", true);
+        Deencapsulation.setField(tableRef, "joinOp", JoinOperator.INNER_JOIN);
+
+        TupleDescriptor lhsTupleDescriptor = new TupleDescriptor(new TupleId(0));
+        lhsScanNode = new OlapScanNode(new PlanNodeId(0), lhsTupleDescriptor, "LEFT SCAN");
+        TableName lhsTableName = new TableName("default_cluster:test_db", "test_lhs_tbl");
+        SlotRef lhsExpr = new SlotRef(lhsTableName, "test_lhs_col");
+        SlotDescriptor lhsSlotDescriptor = new SlotDescriptor(new SlotId(0), lhsTupleDescriptor);
+        Column k1 = new Column("test_lhs_col", PrimitiveType.BIGINT);
+        k1.setIsKey(true);
+        k1.setIsAllowNull(false);
+        lhsSlotDescriptor.setColumn(k1);
+        lhsExpr.setDesc(lhsSlotDescriptor);
+        Table lhsTable = new Table(0, "test_lhs_tbl", Table.TableType.OLAP, Lists.newArrayList(k1));
+        BaseTableRef lhsTableRef = new BaseTableRef(tableRef, lhsTable, lhsTableName);
+        lhsTableRef.analyze(analyzer);
+
+        TupleDescriptor rhsTupleDescriptor = new TupleDescriptor(new TupleId(1));
+        rhsScanNode = new OlapScanNode(new PlanNodeId(1), rhsTupleDescriptor, "RIGHT SCAN");
+        TableName rhsTableName = new TableName("default_cluster:test_db", "test_rhs_tbl");
+        SlotRef rhsExpr = new SlotRef(rhsTableName, "test_rhs_col");
+        SlotDescriptor rhsSlotDescriptor = new SlotDescriptor(new SlotId(1), rhsTupleDescriptor);
+        Column k2 = new Column("test_rhs_col", PrimitiveType.INT);
+        k2.setIsKey(true);
+        k2.setIsAllowNull(false);
+        rhsSlotDescriptor.setColumn(k2);
+        rhsExpr.setDesc(rhsSlotDescriptor);
+        Table rhsTable = new Table(0, "test_rhs_tbl", Table.TableType.OLAP, Lists.newArrayList(k2));
+        BaseTableRef rhsTableRef = new BaseTableRef(tableRef, rhsTable, rhsTableName);
+        rhsTableRef.analyze(analyzer);
+
+        ArrayList<Expr> testJoinExprs = new ArrayList<>();
+        BinaryPredicate eqJoinConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhsExpr, rhsExpr);
+        testJoinExprs.add(eqJoinConjunct);
+
+        hashJoinNode = new HashJoinNode(new PlanNodeId(2), lhsScanNode, rhsScanNode, tableRef, testJoinExprs
+                , new ArrayList<>());
+        testPlanFragment = new PlanFragment(new PlanFragmentId(0), hashJoinNode
+                , new DataPartition(TPartitionType.UNPARTITIONED));
+        hashJoinNode.setFragment(testPlanFragment);
+        lhsScanNode.setFragment(testPlanFragment);
+        rhsScanNode.setFragment(testPlanFragment);
+
+        new Expectations() {
+            {
+                analyzer.getSlotDesc(new SlotId(0));
+                result = lhsSlotDescriptor;
+                analyzer.getSlotDesc(new SlotId(1));
+                result = rhsSlotDescriptor;
+
+                ConnectContext.get().getSessionVariable().getRuntimeFiltersMaxNum();
+                result = 8;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize();
+                result = 16777216;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize();
+                result = 1048576;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize();
+                result = 2097152;
+            }
+        };
+    }
+
+    private void clearRuntimeFilterState() {
+        testPlanFragment.clearRuntimeFilters();
+        analyzer.clearAssignedRuntimeFilters();
+        hashJoinNode.clearRuntimeFilters();
+        lhsScanNode.clearRuntimeFilters();
+    }
+
+    @Test
+    public void testGenerateRuntimeFiltersMode() {
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+                result = "GLOBAL";
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 7;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+                , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF002[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+                , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF002[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+                result = "LOCAL";
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+                , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF002[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+                , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF002[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+                result = "REMOTE";
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 0);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 0);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 0);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 0);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 0);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true), "");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false), "");
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testGenerateRuntimeFiltersModeException() {
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 8;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+    }
+
+    @Test
+    public void testGenerateRuntimeFiltersType() {
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 0;
+                ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+                result = "GLOBAL";
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true), "");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false), "");
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 0);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 0);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 0);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 0);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 0);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 1;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+                , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+                , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 2;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+                , "RF000[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+                , "RF000[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 3;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+                , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+                , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 4;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+                , "RF000[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+                , "RF000[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 5;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+                , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF001[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+                , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF001[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 6;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true)
+                , "RF000[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" +
+                        ", RF001[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n");
+        Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false)
+                , "RF000[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" +
+                        ", RF001[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n");
+        Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2);
+        Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2);
+        Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testGenerateRuntimeFiltersTypeExceptionLess() {
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = -1;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testGenerateRuntimeFiltersTypeExceptionMore() {
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 8;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+    }
+
+    @Test
+    public void testGenerateRuntimeFiltersSize() {
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeFilterMode();
+                result = "GLOBAL";
+                ConnectContext.get().getSessionVariable().getRuntimeFilterType();
+                result = 2;
+            }
+        };
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize();
+                result = 16777216;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize();
+                result = 1048576;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize();
+                result = 2097152;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 2097152);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize();
+                result = 16777216;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize();
+                result = 1048576;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize();
+                result = 1;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 1048576);
+
+        clearRuntimeFilterState();
+        new Expectations() {
+            {
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize();
+                result = 16777216;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize();
+                result = 1048576;
+                ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize();
+                result = 999999999;
+            }
+        };
+        RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode);
+        Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 16777216);
+
+        // Use ndv and fpp to calculate the minimum space required for bloom filter
+        Assert.assertEquals(1L <<
+                RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.05), 1048576);
+        Assert.assertEquals(1L <<
+                RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.1), 1048576);
+        Assert.assertEquals(1L <<
+                RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.3), 524288);
+        Assert.assertEquals(1L <<
+                RuntimeFilter.GetMinLogSpaceForBloomFilter(10000000, 0.1), 8388608);
+        Assert.assertEquals(1L <<
+                RuntimeFilter.GetMinLogSpaceForBloomFilter(1000, 0.1), 1024);
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
new file mode 100644
index 0000000..d0ce41c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.DdlException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RuntimeFilterTypeHelperTest {
+
+    @Test
+    public void testNormal() throws DdlException {
+        String runtimeFilterType = "";
+        Assert.assertEquals(new Long(0L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "IN";
+        Assert.assertEquals(new Long(1L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "BLOOM_FILTER";
+        Assert.assertEquals(new Long(2L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "IN,BLOOM_FILTER";
+        Assert.assertEquals(new Long(3L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "MIN_MAX";
+        Assert.assertEquals(new Long(4L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "IN,MIN_MAX";
+        Assert.assertEquals(new Long(5L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "MIN_MAX, BLOOM_FILTER";
+        Assert.assertEquals(new Long(6L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        runtimeFilterType = "IN,BLOOM_FILTER,MIN_MAX";
+        Assert.assertEquals(new Long(7L), RuntimeFilterTypeHelper.encode(runtimeFilterType));
+
+        long runtimeFilterTypeValue = 0L;
+        Assert.assertEquals("", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue));
+
+        runtimeFilterTypeValue = 1L;
+        Assert.assertEquals("IN", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue));
+
+        runtimeFilterTypeValue = 3L;
+        Assert.assertEquals("BLOOM_FILTER,IN", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly
+
+        runtimeFilterTypeValue = 7L;
+        Assert.assertEquals("BLOOM_FILTER,IN,MIN_MAX", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly
+    }
+
+    @Test(expected = DdlException.class)
+    public void testInvalidSqlMode() throws DdlException {
+        RuntimeFilterTypeHelper.encode("BLOOM,IN");
+        Assert.fail("No exception throws");
+    }
+
+    @Test(expected = DdlException.class)
+    public void testInvalidDecode() throws DdlException {
+        RuntimeFilterTypeHelper.decode(10L);
+        Assert.fail("No exception throws");
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index a053c38..5cba8c2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -167,6 +167,12 @@ public class VariableMgrTest {
         setVar7.analyze(null);
         VariableMgr.setVar(var, setVar7);
         Assert.assertEquals("-08:00", VariableMgr.newSessionVariable().getTimeZone());
+
+        SetVar setVar8 = new SetVar(SetType.SESSION, "runtime_filter_type", new StringLiteral(
+                RuntimeFilterTypeHelper.encode("BLOOM_FILTER").toString()));
+        setVar8.analyze(null);
+        VariableMgr.setVar(var, setVar8);
+        Assert.assertEquals(2L, var.getRuntimeFilterType());
     }
 
     @Test(expected = UserException.class)
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index 888af72..b2f0c89 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -141,29 +141,11 @@ struct TQueryOptions {
   // whether enable parallel merge in exchange node
   32: optional bool enable_enable_exchange_node_parallel_merge = false;
 
-  // runtime filter run mode
-  33: optional string runtime_filter_mode = "GLOBAL";
-
-  // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will
-  // be rounded up to the nearest power of two.
-  34: optional i32 runtime_bloom_filter_size = 1048576
-
-  // Minimum runtime bloom filter size, in bytes
-  35: optional i32 runtime_bloom_filter_min_size = 1048576
-
-  // Maximum runtime bloom filter size, in bytes
-  36: optional i32 runtime_bloom_filter_max_size = 16777216
-
   // Time in ms to wait until runtime filters are delivered.
-  37: optional i32 runtime_filter_wait_time_ms = 1000
-
-  // Maximum number of bloom runtime filters allowed per query
-  38: optional i32 runtime_filters_max_num = 10
-
-  // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
-  39: optional i32 runtime_filter_type = 1;
+  33: optional i32 runtime_filter_wait_time_ms = 1000
 
-  40: optional i32 runtime_filter_max_in_num = 1024;
+  // if the right table is greater than this value in the hash join,  we will ignore IN filter
+  34: optional i32 runtime_filter_max_in_num = 1024;
 }
     
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index d3b2cd0..8d968dc 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -381,11 +381,10 @@ struct THashJoinNode {
   // anything from the ON or USING clauses (but *not* the WHERE clause) that's not an
   // equi-join predicate
   3: optional list<Exprs.TExpr> other_join_conjuncts
-  4: optional bool is_push_down
 
   // If true, this join node can (but may choose not to) generate slot filters
   // after constructing the build side that can be applied to the probe side.
-  5: optional bool add_probe_filters
+  4: optional bool add_probe_filters
 }
 
 struct TMergeJoinNode {
@@ -726,7 +725,6 @@ struct TRuntimeFilterDesc {
   9: optional i64 bloom_filter_size_bytes
 }
 
-
 // This is essentially a union of all messages corresponding to subclasses
 // of PlanNode.
 struct TPlanNode {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org