You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 14:17:58 UTC

[doris] branch branch-1.2-lts updated (768ff43c65 -> d078112585)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


    from 768ff43c65 [Compile](lzo) fix lzo decompressor compiler error (#15956)
     new 5d5c2e9541 [fix](planner)wrong result when has order by under join (#15974)
     new 5e97bce8c0 [fix](fe)fix anti join bug (#15955)
     new 784e8d0331 [fix](regression-test) Fix the build for Java UDF Case (#15851)
     new 6ec00a340c [enhance](planner)convert 'or' into 'in-predicate' (#15737)
     new d078112585 (improvement)[bucket] Add auto bucket implement (#15250)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 fe/fe-core/src/main/cup/sql_parser.cup             |  20 +-
 .../java/org/apache/doris/analysis/Analyzer.java   |  93 +++++--
 .../org/apache/doris/analysis/BinaryPredicate.java |   7 +
 .../apache/doris/analysis/CompoundPredicate.java   |  26 ++
 .../org/apache/doris/analysis/CreateTableStmt.java |  34 ++-
 .../apache/doris/analysis/DistributionDesc.java    |  50 ++--
 .../doris/analysis/HashDistributionDesc.java       |  49 +---
 .../org/apache/doris/analysis/JoinOperator.java    |  13 +
 .../org/apache/doris/analysis/LiteralExpr.java     |   5 +
 .../doris/analysis/RandomDistributionDesc.java     |  33 +--
 .../java/org/apache/doris/analysis/SlotRef.java    |  12 +
 .../org/apache/doris/catalog/DistributionInfo.java |  20 ++
 .../main/java/org/apache/doris/catalog/Env.java    |   6 +
 .../apache/doris/catalog/HashDistributionInfo.java |  26 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |  47 +++-
 .../doris/catalog/RandomDistributionInfo.java      |  20 +-
 .../org/apache/doris/catalog/TableProperty.java    |   8 +
 .../doris/clone/DynamicPartitionScheduler.java     |  86 +++++-
 .../java/org/apache/doris/common/FeConstants.java  |   4 +
 .../apache/doris/common/util/AutoBucketUtils.java  |  98 +++++++
 .../apache/doris/common/util/PropertyAnalyzer.java |  25 +-
 .../apache/doris/datasource/InternalCatalog.java   |  36 ++-
 .../org/apache/doris/planner/ExchangeNode.java     |   5 +-
 .../apache/doris/planner/SingleNodePlanner.java    |   9 +-
 .../doris/rewrite/ExtractCommonFactorsRule.java    | 128 ++++++---
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   1 +
 .../org/apache/doris/analysis/SelectStmtTest.java  |  26 +-
 .../doris/common/util/AutoBucketUtilsTest.java     | 294 +++++++++++++++++++++
 .../java/org/apache/doris/planner/PlannerTest.java |   2 +-
 .../org/apache/doris/planner/QueryPlanTest.java    |  59 ++++-
 .../ExtractCommonFactorsRuleFunctionTest.java      |  12 +-
 .../org/apache/doris/utframe/UtFrameUtils.java     |  83 +++++-
 .../data/performance_p0/redundant_conjuncts.out    |   2 +-
 regression-test/data/query_p0/join/test_join.out   |  80 ++++++
 regression-test/java-udf-src/pom.xml               |   8 +
 .../test_autobucket.groovy}                        |  40 ++-
 .../suites/query_p0/join/test_join.groovy          |  10 +-
 .../limit/OffsetInSubqueryWithJoin.groovy}         |  24 +-
 38 files changed, 1234 insertions(+), 267 deletions(-)
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
 create mode 100644 fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
 copy regression-test/suites/{correctness_p0/test_keywords.groovy => autobucket/test_autobucket.groovy} (55%)
 copy regression-test/suites/{nereids_syntax_p0/group_by_constant.groovy => query_p0/limit/OffsetInSubqueryWithJoin.groovy} (61%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 05/05: (improvement)[bucket] Add auto bucket implement (#15250)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d07811258500093691f020d0c92966cdc26e4e27
Author: Drogon <ja...@gmail.com>
AuthorDate: Wed Jan 18 19:50:18 2023 +0800

    (improvement)[bucket] Add auto bucket implement (#15250)
---
 fe/fe-core/src/main/cup/sql_parser.cup             |  20 +-
 .../org/apache/doris/analysis/CreateTableStmt.java |  34 ++-
 .../apache/doris/analysis/DistributionDesc.java    |  50 ++--
 .../doris/analysis/HashDistributionDesc.java       |  49 +---
 .../doris/analysis/RandomDistributionDesc.java     |  33 +--
 .../org/apache/doris/catalog/DistributionInfo.java |  20 ++
 .../main/java/org/apache/doris/catalog/Env.java    |   6 +
 .../apache/doris/catalog/HashDistributionInfo.java |  26 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |  47 +++-
 .../doris/catalog/RandomDistributionInfo.java      |  20 +-
 .../org/apache/doris/catalog/TableProperty.java    |   8 +
 .../doris/clone/DynamicPartitionScheduler.java     |  86 +++++-
 .../java/org/apache/doris/common/FeConstants.java  |   4 +
 .../apache/doris/common/util/AutoBucketUtils.java  |  98 +++++++
 .../apache/doris/common/util/PropertyAnalyzer.java |  25 +-
 .../apache/doris/datasource/InternalCatalog.java   |  36 ++-
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   1 +
 .../doris/common/util/AutoBucketUtilsTest.java     | 294 +++++++++++++++++++++
 .../org/apache/doris/utframe/UtFrameUtils.java     |  83 +++++-
 .../suites/autobucket/test_autobucket.groovy       |  41 +++
 20 files changed, 835 insertions(+), 146 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 83d105c734..12b4643552 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -45,6 +45,7 @@ import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.View;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Version;
 import org.apache.doris.mysql.MysqlPassword;
 import org.apache.doris.load.loadv2.LoadTask;
@@ -602,7 +603,8 @@ terminal String
     KW_WORK,
     KW_WRITE,
     KW_YEAR,
-    KW_MTMV;
+    KW_MTMV,
+    KW_AUTO;
 
 terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
 terminal BITAND, BITOR, BITXOR, BITNOT;
@@ -2884,12 +2886,16 @@ opt_distribution ::=
     /* Hash distributed */
     | KW_DISTRIBUTED KW_BY KW_HASH LPAREN ident_list:columns RPAREN opt_distribution_number:numDistribution
     {:
-        RESULT = new HashDistributionDesc(numDistribution, columns);
+        int bucketNum = (numDistribution == null ? -1 : numDistribution);
+        boolean is_auto_bucket = (numDistribution == null);
+        RESULT = new HashDistributionDesc(bucketNum, is_auto_bucket, columns);
     :}
     /* Random distributed */
     | KW_DISTRIBUTED KW_BY KW_RANDOM opt_distribution_number:numDistribution
     {:
-        RESULT = new RandomDistributionDesc(numDistribution);
+        int bucketNum = (numDistribution == null ? -1 : numDistribution);
+        boolean is_auto_bucket = (numDistribution == null);
+        RESULT = new RandomDistributionDesc(bucketNum, is_auto_bucket);
     :}
     ;
 
@@ -2907,13 +2913,17 @@ opt_rollup ::=
 opt_distribution_number ::=
     /* Empty */
     {:
-        /* If distribution number is null, default distribution number is 10. */
-        RESULT = 10;
+        /* If distribution number is null, default distribution number is FeConstants.default_bucket_num. */
+        RESULT = FeConstants.default_bucket_num;
     :}
     | KW_BUCKETS INTEGER_LITERAL:numDistribution
     {:
         RESULT = numDistribution.intValue();
     :}
+    | KW_BUCKETS KW_AUTO
+    {:
+        RESULT = null;
+    :}
     ;
 
 opt_keys ::=
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 826aee4663..867cb6aa38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -31,6 +31,8 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.AutoBucketUtils;
+import org.apache.doris.common.util.ParseUtil;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
@@ -94,6 +96,32 @@ public class CreateTableStmt extends DdlStmt {
         engineNames.add("jdbc");
     }
 
+    // if auto bucket auto bucket enable, rewrite distribution bucket num &&
+    // set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
+    private static Map<String, String> maybeRewriteByAutoBucket(DistributionDesc distributionDesc,
+            Map<String, String> properties) throws AnalysisException {
+        if (distributionDesc == null || !distributionDesc.isAutoBucket()) {
+            return properties;
+        }
+
+        // auto bucket is enable
+        Map<String, String> newProperties = properties;
+        if (newProperties == null) {
+            newProperties = new HashMap<String, String>();
+        }
+        newProperties.put(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "true");
+
+        if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) {
+            distributionDesc.setBuckets(FeConstants.default_bucket_num);
+        } else {
+            long partitionSize = ParseUtil
+                    .analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
+            distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize));
+        }
+
+        return newProperties;
+    }
+
     public CreateTableStmt() {
         // for persist
         tableName = new TableName();
@@ -260,7 +288,11 @@ public class CreateTableStmt extends DdlStmt {
     }
 
     @Override
-    public void analyze(Analyzer analyzer) throws UserException {
+    public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
+        if (Strings.isNullOrEmpty(engineName) || engineName.equalsIgnoreCase("olap")) {
+            this.properties = maybeRewriteByAutoBucket(distributionDesc, properties);
+        }
+
         super.analyze(analyzer);
         tableName.analyze(analyzer);
         FeNameFormat.checkTableName(tableName.getTbl());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java
index 3e755c750c..02005a3985 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java
@@ -22,57 +22,47 @@ import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
 
 import org.apache.commons.lang.NotImplementedException;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
-public class DistributionDesc implements Writable {
+public class DistributionDesc {
     protected DistributionInfoType type;
+    protected int numBucket;
+    protected boolean autoBucket;
 
-    public DistributionDesc() {
+    public DistributionDesc(int numBucket) {
+        this(numBucket, false);
+    }
 
+    public DistributionDesc(int numBucket, boolean autoBucket) {
+        this.numBucket = numBucket;
+        this.autoBucket = autoBucket;
     }
 
-    public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
-        throw new NotImplementedException();
+    public int getBuckets() {
+        return numBucket;
     }
 
-    public String toSql() {
-        throw new NotImplementedException();
+    public int setBuckets(int numBucket) {
+        return this.numBucket = numBucket;
     }
 
-    public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlException {
-        throw new NotImplementedException();
+    public boolean isAutoBucket() {
+        return autoBucket;
     }
 
-    public static DistributionDesc read(DataInput in) throws IOException {
-        DistributionInfoType type = DistributionInfoType.valueOf(Text.readString(in));
-        if (type == DistributionInfoType.HASH) {
-            DistributionDesc desc = new HashDistributionDesc();
-            desc.readFields(in);
-            return desc;
-        } else if (type == DistributionInfoType.RANDOM) {
-            DistributionDesc desc = new RandomDistributionDesc();
-            desc.readFields(in);
-            return desc;
-        } else {
-            throw new IOException("Unknown distribution type: " + type);
-        }
+    public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
+        throw new NotImplementedException();
     }
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        Text.writeString(out, type.name());
+    public String toSql() {
+        throw new NotImplementedException();
     }
 
-    public void readFields(DataInput in) throws IOException {
+    public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlException {
         throw new NotImplementedException();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
index 6d14fb3844..8b312a6ba7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
@@ -25,29 +25,25 @@ import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.io.Text;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
 public class HashDistributionDesc extends DistributionDesc {
-    private int numBucket;
     private List<String> distributionColumnNames;
 
-    public HashDistributionDesc() {
+    public HashDistributionDesc(int numBucket, List<String> distributionColumnNames) {
+        super(numBucket);
         type = DistributionInfoType.HASH;
-        distributionColumnNames = Lists.newArrayList();
+        this.distributionColumnNames = distributionColumnNames;
     }
 
-    public HashDistributionDesc(int numBucket, List<String> distributionColumnNames) {
+    public HashDistributionDesc(int numBucket, boolean autoBucket, List<String> distributionColumnNames) {
+        super(numBucket, autoBucket);
         type = DistributionInfoType.HASH;
-        this.numBucket = numBucket;
         this.distributionColumnNames = distributionColumnNames;
     }
 
@@ -55,14 +51,10 @@ public class HashDistributionDesc extends DistributionDesc {
         return distributionColumnNames;
     }
 
-    public int getBuckets() {
-        return numBucket;
-    }
-
     @Override
     public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
         if (numBucket <= 0) {
-            throw new AnalysisException("Number of hash distribution should be larger than zero.");
+            throw new AnalysisException("Number of hash distribution should be greater than zero.");
         }
         if (distributionColumnNames == null || distributionColumnNames.size() == 0) {
             throw new AnalysisException("Number of hash column should be larger than zero.");
@@ -100,7 +92,11 @@ public class HashDistributionDesc extends DistributionDesc {
             i++;
         }
         stringBuilder.append(")\n");
-        stringBuilder.append("BUCKETS ").append(numBucket);
+        if (autoBucket) {
+            stringBuilder.append("BUCKETS AUTO");
+        } else {
+            stringBuilder.append("BUCKETS ").append(numBucket);
+        }
         return stringBuilder.toString();
     }
 
@@ -139,27 +135,8 @@ public class HashDistributionDesc extends DistributionDesc {
             }
         }
 
-        HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(numBucket, distributionColumns);
+        HashDistributionInfo hashDistributionInfo =
+                                new HashDistributionInfo(numBucket, autoBucket, distributionColumns);
         return hashDistributionInfo;
     }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        super.write(out);
-
-        out.writeInt(numBucket);
-        int count = distributionColumnNames.size();
-        out.writeInt(count);
-        for (String colName : distributionColumnNames) {
-            Text.writeString(out, colName);
-        }
-    }
-
-    public void readFields(DataInput in) throws IOException {
-        numBucket = in.readInt();
-        int count = in.readInt();
-        for (int i = 0; i < count; i++) {
-            distributionColumnNames.add(Text.readString(in));
-        }
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
index e445aa5bdf..3b89dfaff7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
@@ -23,28 +23,24 @@ import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.common.AnalysisException;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
 public class RandomDistributionDesc extends DistributionDesc {
-    int numBucket;
-
-    public RandomDistributionDesc() {
+    public RandomDistributionDesc(int numBucket) {
+        super(numBucket);
         type = DistributionInfoType.RANDOM;
     }
 
-    public RandomDistributionDesc(int numBucket) {
+    public RandomDistributionDesc(int numBucket, boolean autoBucket) {
+        super(numBucket, autoBucket);
         type = DistributionInfoType.RANDOM;
-        this.numBucket = numBucket;
     }
 
     @Override
     public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
         if (numBucket <= 0) {
-            throw new AnalysisException("Number of random distribution should be larger than zero.");
+            throw new AnalysisException("Number of random distribution should be greater than zero.");
         }
     }
 
@@ -52,23 +48,18 @@ public class RandomDistributionDesc extends DistributionDesc {
     public String toSql() {
         StringBuilder stringBuilder = new StringBuilder();
         stringBuilder.append("DISTRIBUTED BY RANDOM\n")
-                .append("BUCKETS ").append(numBucket);
+                .append("BUCKETS ");
+        if (autoBucket) {
+            stringBuilder.append("AUTO");
+        } else {
+            stringBuilder.append(numBucket);
+        }
         return stringBuilder.toString();
     }
 
     @Override
     public DistributionInfo toDistributionInfo(List<Column> columns) {
-        RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket);
+        RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket, autoBucket);
         return randomDistributionInfo;
     }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        super.write(out);
-        out.writeInt(numBucket);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-        numBucket = in.readInt();
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
index e7f66c42f1..227900905e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
@@ -40,12 +40,28 @@ public abstract class DistributionInfo implements Writable {
     @SerializedName(value = "type")
     protected DistributionInfoType type;
 
+    @SerializedName(value = "bucketNum")
+    protected int bucketNum;
+
+    @SerializedName(value = "autoBucket")
+    protected boolean autoBucket;
+
     public DistributionInfo() {
         // for persist
     }
 
     public DistributionInfo(DistributionInfoType type) {
+        this(type, 0, false);
+    }
+
+    public DistributionInfo(DistributionInfoType type, int bucketNum) {
+        this(type, bucketNum, false);
+    }
+
+    public DistributionInfo(DistributionInfoType type, int bucketNum, boolean autoBucket) {
         this.type = type;
+        this.bucketNum = bucketNum;
+        this.autoBucket = autoBucket;
     }
 
     public DistributionInfoType getType() {
@@ -62,6 +78,10 @@ public abstract class DistributionInfo implements Writable {
         throw new NotImplementedException("not implemented");
     }
 
+    public void markAutoBucket() {
+        autoBucket = true;
+    }
+
     public DistributionDesc toDistributionDesc() {
         throw new NotImplementedException();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 2cfd39f70a..42368f40da 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -2981,6 +2981,12 @@ public class Env {
                 sb.append(olapTable.getCompressionType()).append("\"");
             }
 
+            // estimate_partition_size
+            if (!olapTable.getEstimatePartitionSize().equals("")) {
+                sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE).append("\" = \"");
+                sb.append(olapTable.getEstimatePartitionSize()).append("\"");
+            }
+
             // unique key table with merge on write
             if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
                 sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE).append("\" = \"");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
index a651b8c0ab..d746bd355f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
@@ -37,8 +37,6 @@ import java.util.Objects;
 public class HashDistributionInfo extends DistributionInfo {
     @SerializedName(value = "distributionColumns")
     private List<Column> distributionColumns;
-    @SerializedName(value = "bucketNum")
-    private int bucketNum;
 
     public HashDistributionInfo() {
         super();
@@ -46,9 +44,13 @@ public class HashDistributionInfo extends DistributionInfo {
     }
 
     public HashDistributionInfo(int bucketNum, List<Column> distributionColumns) {
-        super(DistributionInfoType.HASH);
+        super(DistributionInfoType.HASH, bucketNum);
+        this.distributionColumns = distributionColumns;
+    }
+
+    public HashDistributionInfo(int bucketNum, boolean autoBucket, List<Column> distributionColumns) {
+        super(DistributionInfoType.HASH, bucketNum, autoBucket);
         this.distributionColumns = distributionColumns;
-        this.bucketNum = bucketNum;
     }
 
     public List<Column> getDistributionColumns() {
@@ -65,6 +67,7 @@ public class HashDistributionInfo extends DistributionInfo {
         this.bucketNum = bucketNum;
     }
 
+    @Override
     public void write(DataOutput out) throws IOException {
         super.write(out);
         int columnCount = distributionColumns.size();
@@ -75,6 +78,7 @@ public class HashDistributionInfo extends DistributionInfo {
         out.writeInt(bucketNum);
     }
 
+    @Override
     public void readFields(DataInput in) throws IOException {
         super.readFields(in);
         int columnCount = in.readInt();
@@ -117,7 +121,7 @@ public class HashDistributionInfo extends DistributionInfo {
         for (Column col : distributionColumns) {
             distriColNames.add(col.getName());
         }
-        DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, distriColNames);
+        DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, autoBucket, distriColNames);
         return distributionDesc;
     }
 
@@ -133,7 +137,11 @@ public class HashDistributionInfo extends DistributionInfo {
         String colList = Joiner.on(", ").join(colNames);
         builder.append(colList);
 
-        builder.append(") BUCKETS ").append(bucketNum);
+        if (autoBucket) {
+            builder.append(") BUCKETS AUTO");
+        } else {
+            builder.append(") BUCKETS ").append(bucketNum);
+        }
         return builder.toString();
     }
 
@@ -148,7 +156,11 @@ public class HashDistributionInfo extends DistributionInfo {
         }
         builder.append("]; ");
 
-        builder.append("bucket num: ").append(bucketNum).append("; ");
+        if (autoBucket) {
+            builder.append("bucket num: auto;");
+        } else {
+            builder.append("bucket num: ").append(bucketNum).append(";");
+        }
 
         return builder.toString();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 87ba47bb27..c83acb1af4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -591,8 +591,8 @@ public class OlapTable extends Table {
         if (full) {
             return indexIdToMeta.get(indexId).getSchema();
         } else {
-            return indexIdToMeta.get(indexId).getSchema().stream().filter(column ->
-                    column.isVisible()).collect(Collectors.toList());
+            return indexIdToMeta.get(indexId).getSchema().stream().filter(column -> column.isVisible())
+                    .collect(Collectors.toList());
         }
     }
 
@@ -1124,7 +1124,6 @@ public class OlapTable extends Table {
         return false;
     }
 
-
     @Override
     public void write(DataOutput out) throws IOException {
         super.write(out);
@@ -1274,6 +1273,9 @@ public class OlapTable extends Table {
         if (in.readBoolean()) {
             tableProperty = TableProperty.read(in);
         }
+        if (isAutoBucket()) {
+            defaultDistributionInfo.markAutoBucket();
+        }
 
         // temp partitions
         tempPartitions = TempPartitions.read(in);
@@ -1617,6 +1619,36 @@ public class OlapTable extends Table {
         tableProperty.buildInMemory();
     }
 
+    public Boolean isAutoBucket() {
+        if (tableProperty != null) {
+            return tableProperty.isAutoBucket();
+        }
+        return false;
+    }
+
+    public void setIsAutoBucket(boolean isAutoBucket) {
+        if (tableProperty == null) {
+            tableProperty = new TableProperty(new HashMap<>());
+        }
+        tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET,
+                Boolean.valueOf(isAutoBucket).toString());
+    }
+
+    public void setEstimatePartitionSize(String estimatePartitionSize) {
+        if (tableProperty == null) {
+            tableProperty = new TableProperty(new HashMap<>());
+        }
+        tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE,
+                estimatePartitionSize);
+    }
+
+    public String getEstimatePartitionSize() {
+        if (tableProperty != null) {
+            return tableProperty.getEstimatePartitionSize();
+        }
+        return "";
+    }
+
     public boolean getEnableLightSchemaChange() {
         if (tableProperty != null) {
             return tableProperty.getUseSchemaLightChange();
@@ -1870,11 +1902,11 @@ public class OlapTable extends Table {
             return false;
         }
         List<Expr> partitionExps = aggregateInfo.getPartitionExprs() != null
-                ? aggregateInfo.getPartitionExprs() : groupingExps;
+                ? aggregateInfo.getPartitionExprs()
+                : groupingExps;
         DistributionInfo distribution = getDefaultDistributionInfo();
         if (distribution instanceof HashDistributionInfo) {
-            List<Column> distributeColumns =
-                    ((HashDistributionInfo) distribution).getDistributionColumns();
+            List<Column> distributeColumns = ((HashDistributionInfo) distribution).getDistributionColumns();
             PartitionInfo partitionInfo = getPartitionInfo();
             if (partitionInfo instanceof RangePartitionInfo) {
                 List<Column> rangeColumns = partitionInfo.getPartitionColumns();
@@ -1882,8 +1914,7 @@ public class OlapTable extends Table {
                     return false;
                 }
             }
-            List<SlotRef> partitionSlots =
-                    partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList());
+            List<SlotRef> partitionSlots = partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList());
             if (partitionSlots.contains(null)) {
                 return false;
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
index 2e11b5cfd4..31371e7799 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
@@ -29,21 +29,21 @@ import java.util.Objects;
  * Random partition.
  */
 public class RandomDistributionInfo extends DistributionInfo {
-
-    private int bucketNum;
-
     public RandomDistributionInfo() {
         super();
     }
 
     public RandomDistributionInfo(int bucketNum) {
-        super(DistributionInfoType.RANDOM);
-        this.bucketNum = bucketNum;
+        super(DistributionInfoType.RANDOM, bucketNum);
+    }
+
+    public RandomDistributionInfo(int bucketNum, boolean autoBucket) {
+        super(DistributionInfoType.RANDOM, bucketNum, autoBucket);
     }
 
     @Override
     public DistributionDesc toDistributionDesc() {
-        DistributionDesc distributionDesc = new RandomDistributionDesc(bucketNum);
+        DistributionDesc distributionDesc = new RandomDistributionDesc(bucketNum, autoBucket);
         return distributionDesc;
     }
 
@@ -55,15 +55,21 @@ public class RandomDistributionInfo extends DistributionInfo {
     @Override
     public String toSql() {
         StringBuilder builder = new StringBuilder();
-        builder.append("DISTRIBUTED BY RANDOM BUCKETS ").append(bucketNum);
+        if (autoBucket) {
+            builder.append("DISTRIBUTED BY RANDOM() BUCKETS AUTO");
+        } else {
+            builder.append("DISTRIBUTED BY RANDOM() BUCKETS ").append(bucketNum);
+        }
         return builder.toString();
     }
 
+    @Override
     public void write(DataOutput out) throws IOException {
         super.write(out);
         out.writeInt(bucketNum);
     }
 
+    @Override
     public void readFields(DataInput in) throws IOException {
         super.readFields(in);
         bucketNum = in.readInt();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 860d404cd0..04db48067e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -244,6 +244,14 @@ public class TableProperty implements Writable {
         return isInMemory;
     }
 
+    public boolean isAutoBucket() {
+        return Boolean.parseBoolean(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "false"));
+    }
+
+    public String getEstimatePartitionSize() {
+        return properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE, "");
+    }
+
     public TStorageFormat getStorageFormat() {
         // Force convert all V1 table to V2 table
         if (TStorageFormat.V1 == storageFormat) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 754126caaa..0640a82cca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.DynamicPartitionProperty;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.RangePartitionInfo;
@@ -42,6 +43,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.AutoBucketUtils;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.PropertyAnalyzer;
@@ -140,6 +142,75 @@ public class DynamicPartitionScheduler extends MasterDaemon {
         return defaultRuntimeInfo;
     }
 
+    // exponential moving average
+    private static long ema(ArrayList<Long> history, int period) {
+        double alpha = 2.0 / (period + 1);
+        double ema = history.get(0);
+        for (int i = 1; i < history.size(); i++) {
+            ema = alpha * history.get(i) + (1 - alpha) * ema;
+        }
+        return (long) ema;
+    }
+
+    private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) {
+        if (historyPartitionsSize.size() < 2) {
+            return historyPartitionsSize.get(0);
+        }
+
+        int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size();
+
+        boolean isAscending = true;
+        for (int i = 1; i < size; i++) {
+            if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) {
+                isAscending = false;
+                break;
+            }
+        }
+
+        if (isAscending) {
+            ArrayList<Long> historyDeltaSize = Lists.newArrayList();
+            for (int i = 1; i < size; i++) {
+                historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1));
+            }
+            return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7);
+        } else {
+            return ema(historyPartitionsSize, 7);
+        }
+    }
+
+    private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table) {
+        if (!table.isAutoBucket()) {
+            return property.getBuckets();
+        }
+
+        List<Partition> partitions = Lists.newArrayList();
+        RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo());
+        List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
+        idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
+        for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
+            Partition partition = table.getPartition(idToItem.getKey());
+            if (partition != null) {
+                partitions.add(partition);
+            }
+        }
+
+        // auto bucket
+        if (partitions.size() == 0) {
+            return property.getBuckets();
+        }
+
+        ArrayList<Long> partitionSizeArray = Lists.newArrayList();
+        for (Partition partition : partitions) {
+            if (partition.getVisibleVersion() >= 2) {
+                partitionSizeArray.add(partition.getDataSize());
+            }
+        }
+
+        // * 5 for uncompressed data
+        long uncompressedPartitionSize = getNextPartitionSize(partitionSizeArray) * 5;
+        return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize);
+    }
+
     private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable,
             Column partitionColumn, String partitionFormat) {
         ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
@@ -231,21 +302,22 @@ public class DynamicPartitionScheduler extends MasterDaemon {
 
             String partitionName = dynamicPartitionProperty.getPrefix()
                     + DynamicPartitionUtil.getFormattedPartitionName(dynamicPartitionProperty.getTimeZone(),
-                    prevBorder, dynamicPartitionProperty.getTimeUnit());
+                            prevBorder, dynamicPartitionProperty.getTimeUnit());
             SinglePartitionDesc rangePartitionDesc = new SinglePartitionDesc(true, partitionName,
                     partitionKeyDesc, partitionProperties);
 
             DistributionDesc distributionDesc = null;
             DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
+            int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable);
             if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
                 HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
                 List<String> distColumnNames = new ArrayList<>();
                 for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
                     distColumnNames.add(distributionColumn.getName());
                 }
-                distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames);
+                distributionDesc = new HashDistributionDesc(bucketsNum, distColumnNames);
             } else {
-                distributionDesc = new RandomDistributionDesc(dynamicPartitionProperty.getBuckets());
+                distributionDesc = new RandomDistributionDesc(bucketsNum);
             }
             // add partition according to partition desc and distribution desc
             addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false));
@@ -265,8 +337,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
     }
 
     private void setStoragePolicyProperty(HashMap<String, String> partitionProperties,
-                                          DynamicPartitionProperty property, ZonedDateTime now, int offset,
-                                          String storagePolicyName) {
+            DynamicPartitionProperty property, ZonedDateTime now, int offset,
+            String storagePolicyName) {
         partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, storagePolicyName);
         String baseTime = DynamicPartitionUtil.getPartitionRangeString(
                 property, now, offset, DynamicPartitionUtil.DATETIME_FORMAT);
@@ -341,8 +413,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
                             dynamicPartitionProperty, range.lowerEndpoint().toString(), partitionFormat);
                     String upperBorderOfReservedHistory = DynamicPartitionUtil.getHistoryPartitionRangeString(
                             dynamicPartitionProperty, range.upperEndpoint().toString(), partitionFormat);
-                    Range<PartitionKey> reservedHistoryPartitionKeyRange
-                            = getClosedRange(db, olapTable, partitionColumn, partitionFormat,
+                    Range<PartitionKey> reservedHistoryPartitionKeyRange = getClosedRange(db, olapTable,
+                            partitionColumn, partitionFormat,
                             lowerBorderOfReservedHistory, upperBorderOfReservedHistory);
                     reservedHistoryPartitionKeyRangeList.add(reservedHistoryPartitionKeyRange);
                 } catch (IllegalArgumentException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index dfde23e0a0..1b05795220 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -22,6 +22,10 @@ import org.apache.doris.persist.meta.FeMetaFormat;
 public class FeConstants {
     // Database and table's default configurations, we will never change them
     public static short default_replication_num = 3;
+
+    // The default value of bucket setting && auto bucket without estimate_partition_size
+    public static int default_bucket_num = 10;
+
     /*
      * Those two fields is responsible for determining the default key columns in duplicate table.
      * If user does not specify key of duplicate table in create table stmt,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
new file mode 100644
index 0000000000..49ec8c6bf7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.DiskInfo.DiskState;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class AutoBucketUtils {
+    private static Logger logger = LogManager.getLogger(AutoBucketUtils.class);
+
+    static final long SIZE_100MB = 100 * 1024 * 1024L;
+    static final long SIZE_1GB = 1 * 1024 * 1024 * 1024L;
+    static final long SIZE_1TB = 1024 * SIZE_1GB;
+
+    private static int getBENum() {
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
+
+        int activeBENum = 0;
+        for (Backend backend : backends.values()) {
+            if (backend.isAlive()) {
+                ++activeBENum;
+            }
+        }
+        return activeBENum;
+    }
+
+    private static int getBucketsNumByBEDisks() {
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
+
+        int buckets = 0;
+        for (Backend backend : backends.values()) {
+            if (!backend.isLoadAvailable()) {
+                continue;
+            }
+
+            ImmutableMap<String, DiskInfo> disks = backend.getDisks();
+            for (DiskInfo diskInfo : disks.values()) {
+                if (diskInfo.getState() == DiskState.ONLINE && diskInfo.hasPathHash()) {
+                    buckets += (diskInfo.getAvailableCapacityB() - 1) / (50 * SIZE_1GB) + 1;
+                }
+            }
+        }
+        return buckets;
+    }
+
+    private static int convertParitionSizeToBucketsNum(long partitionSize) {
+        partitionSize /= 5; // for compression 5:1
+
+        // <= 100MB, 1 bucket
+        // <= 1GB, 2 buckets
+        // > 1GB, round to (size / 1G)
+        if (partitionSize <= SIZE_100MB) {
+            return 1;
+        } else if (partitionSize <= SIZE_1GB) {
+            return 2;
+        } else {
+            return (int) ((partitionSize - 1) / SIZE_1GB + 1);
+        }
+    }
+
+    public static int getBucketsNum(long partitionSize) {
+        int bucketsNumByPartitionSize = convertParitionSizeToBucketsNum(partitionSize);
+        int bucketsNumByBE = getBucketsNumByBEDisks();
+        int bucketsNum = Math.min(128, Math.min(bucketsNumByPartitionSize, bucketsNumByBE));
+        int beNum = getBENum();
+        logger.debug("AutoBucketsUtil: bucketsNumByPartitionSize {}, bucketsNumByBE {}, bucketsNum {}, beNum {}",
+                bucketsNumByPartitionSize, bucketsNumByBE, bucketsNum, beNum);
+        if (bucketsNum < bucketsNumByPartitionSize && bucketsNum < beNum) {
+            bucketsNum = beNum;
+        }
+        logger.debug("AutoBucketsUtil: final bucketsNum {}", bucketsNum);
+        return bucketsNum;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index ff59546543..c4fdf9b9d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -92,6 +92,10 @@ public class PropertyAnalyzer {
 
     public static final String PROPERTIES_INMEMORY = "in_memory";
 
+    // _auto_bucket can only set in create table stmt rewrite bucket and can not be changed
+    public static final String PROPERTIES_AUTO_BUCKET = "_auto_bucket";
+    public static final String PROPERTIES_ESTIMATE_PARTITION_SIZE = "estimate_partition_size";
+
     public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
 
     public static final String PROPERTIES_STRICT_RANGE = "strict_range";
@@ -131,7 +135,7 @@ public class PropertyAnalyzer {
     /**
      * check and replace members of DataProperty by properties.
      *
-     * @param properties key->value for members to change.
+     * @param properties      key->value for members to change.
      * @param oldDataProperty old DataProperty
      * @return new DataProperty
      * @throws AnalysisException property has invalid key->value
@@ -246,7 +250,8 @@ public class PropertyAnalyzer {
             throws AnalysisException {
         Short replicationNum = oldReplicationNum;
         String propKey = Strings.isNullOrEmpty(prefix)
-                ? PROPERTIES_REPLICATION_NUM : prefix + "." + PROPERTIES_REPLICATION_NUM;
+                ? PROPERTIES_REPLICATION_NUM
+                : prefix + "." + PROPERTIES_REPLICATION_NUM;
         if (properties != null && properties.containsKey(propKey)) {
             try {
                 replicationNum = Short.valueOf(properties.get(propKey));
@@ -348,7 +353,7 @@ public class PropertyAnalyzer {
     }
 
     public static Set<String> analyzeBloomFilterColumns(Map<String, String> properties, List<Column> columns,
-                                                        KeysType keysType) throws AnalysisException {
+            KeysType keysType) throws AnalysisException {
         Set<String> bfColumns = null;
         if (properties != null && properties.containsKey(PROPERTIES_BF_COLUMNS)) {
             bfColumns = Sets.newHashSet();
@@ -484,7 +489,7 @@ public class PropertyAnalyzer {
     }
 
     // analyzeCompressionType will parse the compression type from properties
-    public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws  AnalysisException {
+    public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException {
         String compressionType = "";
         if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) {
             compressionType = properties.get(PROPERTIES_COMPRESSION);
@@ -546,6 +551,15 @@ public class PropertyAnalyzer {
         return defaultVal;
     }
 
+    public static String analyzeEstimatePartitionSize(Map<String, String> properties) {
+        String  estimatePartitionSize = "";
+        if (properties != null && properties.containsKey(PROPERTIES_ESTIMATE_PARTITION_SIZE)) {
+            estimatePartitionSize = properties.get(PROPERTIES_ESTIMATE_PARTITION_SIZE);
+            properties.remove(PROPERTIES_ESTIMATE_PARTITION_SIZE);
+        }
+        return estimatePartitionSize;
+    }
+
     public static String analyzeStoragePolicy(Map<String, String> properties) throws AnalysisException {
         String storagePolicy = "";
         if (properties != null && properties.containsKey(PROPERTIES_STORAGE_POLICY)) {
@@ -761,7 +775,6 @@ public class PropertyAnalyzer {
         throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " must be `true` or `false`");
     }
 
-
     /**
      * Check the type property of the catalog props.
      */
@@ -777,5 +790,3 @@ public class PropertyAnalyzer {
         }
     }
 }
-
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 415d306f31..0466ac4af0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -241,7 +241,6 @@ public class InternalCatalog implements CatalogIf<Database> {
         return INTERNAL_CATALOG_NAME;
     }
 
-
     @Override
     public List<String> getDbNames() {
         return Lists.newArrayList(fullNameToDb.keySet());
@@ -734,12 +733,12 @@ public class InternalCatalog implements CatalogIf<Database> {
             if (Strings.isNullOrEmpty(newPartitionName)) {
                 if (olapTable.getPartition(partitionName) != null) {
                     throw new DdlException("partition[" + partitionName + "] "
-                        + "already exist in table[" + tableName + "]");
+                            + "already exist in table[" + tableName + "]");
                 }
             } else {
                 if (olapTable.getPartition(newPartitionName) != null) {
                     throw new DdlException("partition[" + newPartitionName + "] "
-                        + "already exist in table[" + tableName + "]");
+                            + "already exist in table[" + tableName + "]");
                 }
             }
 
@@ -932,7 +931,7 @@ public class InternalCatalog implements CatalogIf<Database> {
     }
 
     public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay,
-                                      long recycleTime) {
+            long recycleTime) {
         if (table.getType() == TableType.ELASTICSEARCH) {
             esRepository.deRegisterTable(table.getId());
         } else if (table.getType() == TableType.OLAP) {
@@ -964,7 +963,7 @@ public class InternalCatalog implements CatalogIf<Database> {
     }
 
     public void replayDropTable(Database db, long tableId, boolean isForceDrop,
-                                Long recycleTime) throws MetaNotFoundException {
+            Long recycleTime) throws MetaNotFoundException {
         Table table = db.getTableOrMetaException(tableId);
         db.writeLock();
         table.writeLock();
@@ -1002,10 +1001,10 @@ public class InternalCatalog implements CatalogIf<Database> {
             schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId());
         }
 
-        Replica replica =
-                new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash, info.getDataSize(),
-                        info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
-                        info.getLastSuccessVersion());
+        Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash,
+                info.getDataSize(),
+                info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
+                info.getLastSuccessVersion());
         tablet.addReplica(replica);
     }
 
@@ -1368,8 +1367,8 @@ public class InternalCatalog implements CatalogIf<Database> {
                 if (distributionInfo.getType() == DistributionInfoType.HASH) {
                     HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
                     List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
-                    List<Column> defaultDistriCols
-                            = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
+                    List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo)
+                            .getDistributionColumns();
                     if (!newDistriCols.equals(defaultDistriCols)) {
                         throw new DdlException(
                                 "Cannot assign hash distribution with different distribution cols. " + "default is: "
@@ -1629,7 +1628,7 @@ public class InternalCatalog implements CatalogIf<Database> {
                 olapTable.dropTempPartition(info.getPartitionName(), true);
             } else {
                 Partition partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(),
-                                info.isForceDrop());
+                        info.isForceDrop());
                 if (!info.isForceDrop() && partition != null && info.getRecycleTime() != 0) {
                     Env.getCurrentRecycleBin().setRecycleTimeByIdForReplay(partition.getId(), info.getRecycleTime());
                 }
@@ -1660,7 +1659,7 @@ public class InternalCatalog implements CatalogIf<Database> {
             DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
             Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
             boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
-            DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite,  String storagePolicy,
+            DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
             IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction) throws DdlException {
         // create base index first.
         Preconditions.checkArgument(baseIndexId != -1);
@@ -1920,6 +1919,17 @@ public class InternalCatalog implements CatalogIf<Database> {
 
         olapTable.setReplicationAllocation(replicaAlloc);
 
+        // set auto bucket
+        boolean isAutoBucket = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_AUTO_BUCKET,
+                false);
+        olapTable.setIsAutoBucket(isAutoBucket);
+
+        // set estimate partition size
+        if (isAutoBucket) {
+            String estimatePartitionSize = PropertyAnalyzer.analyzeEstimatePartitionSize(properties);
+            olapTable.setEstimatePartitionSize(estimatePartitionSize);
+        }
+
         // set in memory
         boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY,
                 false);
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 1f50df0105..7cbc2c896b 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -465,6 +465,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("write", new Integer(SqlParserSymbols.KW_WRITE));
         keywordMap.put("year", new Integer(SqlParserSymbols.KW_YEAR));
         keywordMap.put("mtmv", new Integer(SqlParserSymbols.KW_MTMV));
+        keywordMap.put("auto", new Integer(SqlParserSymbols.KW_AUTO));
    }
     
   // map from token id to token description
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
new file mode 100644
index 0000000000..fa61bd48a2
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ResultSetMetaData;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TDisk;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class AutoBucketUtilsTest {
+    private static String databaseName = "AutoBucketUtilsTest";
+    // use a unique dir so that it won't be conflict with other unit test which
+    // may also start a Mocked Frontend
+    private static String runningDirBase = "fe";
+    private static String runningDir = runningDirBase + "/mocked/AutoBucketUtilsTest/" + UUID.randomUUID().toString()
+            + "/";
+    private static List<Backend> backends = Lists.newArrayList();
+    private static Random random = new Random(System.currentTimeMillis());
+    private ConnectContext connectContext;
+
+    // // create backends by be num, disk num, disk capacity
+    private static void createClusterWithBackends(int beNum, int diskNum, long diskCapacity) throws Exception {
+        UtFrameUtils.createDorisClusterWithMultiTag(runningDir, beNum);
+        // must set disk info, or the tablet scheduler won't work
+        backends = Env.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
+        for (Backend be : backends) {
+            setDiskInfos(diskNum, diskCapacity, be);
+        }
+    }
+
+    private static ImmutableMap<Long, Backend> createBackends(int beNum, int diskNum, long diskCapacity)
+            throws Exception {
+        // must set disk info, or the tablet scheduler won't work
+        Map<Long, Backend> backends = Maps.newHashMap();
+        for (int i = 0; i < beNum; ++i) {
+            Backend be = new Backend(10000 + i, "127.0.0." + (i + 1), 9000 + i);
+            be.setAlive(true);
+            backends.put(be.getId(), be);
+        }
+        for (Backend be : backends.values()) {
+            setDiskInfos(diskNum, diskCapacity, be);
+        }
+        return ImmutableMap.copyOf(backends);
+    }
+
+    private static void setDiskInfos(int diskNum, long diskCapacity, Backend be) {
+        Map<String, TDisk> backendDisks = Maps.newHashMap();
+        for (int i = 0; i < diskNum; ++i) {
+            TDisk disk = new TDisk();
+            disk.setRootPath("/home/doris/" + UUID.randomUUID().toString());
+            disk.setDiskTotalCapacity(diskCapacity);
+            disk.setDataUsedCapacity(0);
+            disk.setUsed(true);
+            disk.setDiskAvailableCapacity(disk.disk_total_capacity - disk.data_used_capacity);
+            disk.setPathHash(random.nextLong());
+            disk.setStorageMedium(TStorageMedium.HDD);
+            backendDisks.put(disk.getRootPath(), disk);
+        }
+        be.updateDisks(backendDisks);
+    }
+
+    private void expectations(Env env, EditLog editLog, SystemInfoService systemInfoService,
+            ImmutableMap<Long, Backend> backends) {
+        new Expectations() {
+            {
+                Env.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.getBackendsInCluster(null);
+                minTimes = 0;
+                result = backends;
+
+                Env.getCurrentEnv();
+                minTimes = 0;
+                result = env;
+
+                env.getEditLog();
+                minTimes = 0;
+                result = editLog;
+
+                editLog.logBackendStateChange((Backend) any);
+                minTimes = 0;
+            }
+        };
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        FeConstants.runningUnitTest = true;
+        FeConstants.tablet_checker_interval_ms = 1000;
+        FeConstants.default_scheduler_interval_millisecond = 100;
+        Config.tablet_repair_delay_factor_second = 1;
+        connectContext = UtFrameUtils.createDefaultCtx();
+    }
+
+    @After
+    public void tearDown() {
+        Env.getCurrentEnv().clear();
+        UtFrameUtils.cleanDorisFeDir(runningDirBase);
+    }
+
+    private static String genTableNameWithoutDatabase(String estimatePartitionSize) {
+        return "size_" + estimatePartitionSize;
+    }
+
+    private static String genTableName(String estimatePartitionSize) {
+        return databaseName + "." + genTableNameWithoutDatabase(estimatePartitionSize);
+    }
+
+    private static String genTableNameByTag(String estimatePartitionSize, String tag) {
+        return databaseName + "." + genTableNameWithoutDatabase(estimatePartitionSize) + "_" + tag;
+    }
+
+    private static String genCreateTableSql(String estimatePartitionSize) {
+        return "CREATE TABLE IF NOT EXISTS " + genTableName(estimatePartitionSize) + "\n"
+                + "(\n"
+                + "`user_id` LARGEINT NOT NULL\n"
+                + ")\n"
+                + "DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"
+                + "PROPERTIES (\n"
+                + "\"estimate_partition_size\" = \"" + estimatePartitionSize + "\",\n"
+                + "\"replication_num\" = \"1\"\n"
+                + ")";
+    }
+
+    private void createTable(String sql) throws Exception {
+        // create database first
+        UtFrameUtils.createDatabase(connectContext, databaseName);
+        UtFrameUtils.createTable(connectContext, sql);
+    }
+
+    private void createTableBySize(String estimatePartitionSize) throws Exception {
+        createTable(genCreateTableSql(estimatePartitionSize));
+    }
+
+    private int getPartitionBucketNum(String tableName) throws Exception {
+        ShowResultSet result = UtFrameUtils.showPartitionsByName(connectContext, tableName);
+        ResultSetMetaData metaData = result.getMetaData();
+
+        for (int i = 0; i < metaData.getColumnCount(); ++i) {
+            if (metaData.getColumn(i).getName().equalsIgnoreCase("buckets")) {
+                return Integer.valueOf(result.getResultRows().get(0).get(i));
+            }
+        }
+
+        throw new Exception("No buckets column in show partitions result");
+    }
+
+    // also has checked create table && show partitions
+    @Test
+    public void testWithoutEstimatePartitionSize() throws Exception {
+        String tableName = genTableName("");
+        String sql = "CREATE TABLE IF NOT EXISTS " + tableName + "\n"
+                + "(\n"
+                + "`user_id` LARGEINT NOT NULL\n"
+                + ")\n"
+                + "DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"
+                + "PROPERTIES (\n"
+                + "\"replication_num\" = \"1\"\n"
+                + ")";
+
+        createClusterWithBackends(1, 1, 2000000000);
+
+        createTable(sql);
+        ShowResultSet showCreateTableResult = UtFrameUtils.showCreateTableByName(connectContext, tableName);
+        String showCreateTableResultSql = showCreateTableResult.getResultRows().get(0).get(1);
+        MatcherAssert.assertThat(showCreateTableResultSql,
+                StringContains.containsString("DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"));
+        int bucketNum = getPartitionBucketNum(tableName);
+        Assert.assertEquals(FeConstants.default_bucket_num, bucketNum);
+    }
+
+    @Test
+    public void test100MB(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = AutoBucketUtils.SIZE_100MB;
+        ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2000000000);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(1, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+
+    @Test
+    public void test500MB(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = 5 * AutoBucketUtils.SIZE_100MB;
+        ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2000000000);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(1, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+
+    @Test
+    public void test1G(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = AutoBucketUtils.SIZE_1GB;
+        ImmutableMap<Long, Backend> backends = createBackends(3, 2, 500 * AutoBucketUtils.SIZE_1GB);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(2, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+
+    @Test
+    public void test100G(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = 100 * AutoBucketUtils.SIZE_1GB;
+        ImmutableMap<Long, Backend> backends = createBackends(3, 2, 500 * AutoBucketUtils.SIZE_1GB);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(20, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+
+    @Test
+    public void test500G_0(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
+        ImmutableMap<Long, Backend> backends = createBackends(3, 1, AutoBucketUtils.SIZE_1TB);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(63, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+
+    @Test
+    public void test500G_1(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
+        ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2 * AutoBucketUtils.SIZE_1TB);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(100, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+
+    @Test
+    public void test500G_2(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
+        ImmutableMap<Long, Backend> backends = createBackends(1, 1, 100 * AutoBucketUtils.SIZE_1TB);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(100, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+
+    @Test
+    public void test1T_0(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = AutoBucketUtils.SIZE_1TB;
+        ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2 * AutoBucketUtils.SIZE_1TB);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(128, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+
+    @Test
+    public void test1T_1(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+            throws Exception {
+        long estimatePartitionSize = AutoBucketUtils.SIZE_1TB;
+        ImmutableMap<Long, Backend> backends = createBackends(200, 7, 4 * AutoBucketUtils.SIZE_1TB);
+        expectations(env, editLog, systemInfoService, backends);
+        Assert.assertEquals(200, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index adc5a86a39..0161152904 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -18,14 +18,20 @@
 package org.apache.doris.utframe;
 
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.ExplainOptions;
 import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.ShowCreateTableStmt;
+import org.apache.doris.analysis.ShowPartitionsStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -35,6 +41,8 @@ import org.apache.doris.planner.Planner;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.ShowExecutor;
+import org.apache.doris.qe.ShowResultSet;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -62,6 +70,7 @@ import java.net.ServerSocket;
 import java.net.SocketException;
 import java.nio.channels.SocketChannel;
 import java.nio.file.Files;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -265,7 +274,7 @@ public class UtFrameUtils {
         // start be
         MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort,
                 beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
-            new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
+                new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
         backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
         backend.start();
 
@@ -305,7 +314,7 @@ public class UtFrameUtils {
                     datagramSocket.setReuseAddress(true);
                     break;
                 } catch (SocketException e) {
-                    System.out.println("The port " + port  + " is invalid and try another port.");
+                    System.out.println("The port " + port + " is invalid and try another port.");
                 }
             } catch (IOException e) {
                 throw new IllegalStateException("Could not find a free TCP/IP port to start HTTP Server on");
@@ -355,8 +364,8 @@ public class UtFrameUtils {
     }
 
     public static String getStmtDigest(ConnectContext connectContext, String originStmt) throws Exception {
-        SqlScanner input =
-                new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode());
+        SqlScanner input = new SqlScanner(new StringReader(originStmt),
+                connectContext.getSessionVariable().getSqlMode());
         SqlParser parser = new SqlParser(input);
         StatementBase statementBase = SqlParserUtils.getFirstStmt(parser);
         Preconditions.checkState(statementBase instanceof QueryStmt);
@@ -370,4 +379,70 @@ public class UtFrameUtils {
         String realVNodeName = idx + ":V" + nodeName;
         return planResult.contains(realNodeName) || planResult.contains(realVNodeName);
     }
+
+    public static void createDatabase(ConnectContext ctx, String db) throws Exception {
+        String createDbStmtStr = "CREATE DATABASE " + db;
+        CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt(createDbStmtStr, ctx);
+        Env.getCurrentEnv().createDb(createDbStmt);
+    }
+
+    public static void createTable(ConnectContext ctx, String sql) throws Exception {
+        try {
+            createTables(ctx, sql);
+        } catch (ConcurrentModificationException e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    public static void createTables(ConnectContext ctx, String... sqls) throws Exception {
+        for (String sql : sqls) {
+            CreateTableStmt stmt = (CreateTableStmt) parseAndAnalyzeStmt(sql, ctx);
+            Env.getCurrentEnv().createTable(stmt);
+        }
+        updateReplicaPathHash();
+    }
+
+    public static ShowResultSet showCreateTable(ConnectContext ctx, String sql) throws Exception {
+        ShowCreateTableStmt stmt = (ShowCreateTableStmt) parseAndAnalyzeStmt(sql, ctx);
+        ShowExecutor executor = new ShowExecutor(ctx, stmt);
+        return executor.execute();
+    }
+
+    public static ShowResultSet showCreateTableByName(ConnectContext ctx, String table) throws Exception {
+        String sql = "show create table " + table;
+        return showCreateTable(ctx, sql);
+    }
+
+    public static ShowResultSet showPartitions(ConnectContext ctx, String sql) throws Exception {
+        ShowPartitionsStmt stmt = (ShowPartitionsStmt) parseAndAnalyzeStmt(sql, ctx);
+        ShowExecutor executor = new ShowExecutor(ctx, stmt);
+        return executor.execute();
+    }
+
+    public static ShowResultSet showPartitionsByName(ConnectContext ctx, String table) throws Exception {
+        String sql = "show partitions from " + table;
+        return showPartitions(ctx, sql);
+    }
+
+    private static void updateReplicaPathHash() {
+        com.google.common.collect.Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex()
+                .getReplicaMetaTable();
+        for (com.google.common.collect.Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) {
+            long beId = cell.getColumnKey();
+            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+            if (be == null) {
+                continue;
+            }
+            Replica replica = cell.getValue();
+            TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
+            ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
+            for (DiskInfo diskInfo : diskMap.values()) {
+                if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) {
+                    replica.setPathHash(diskInfo.getPathHash());
+                    break;
+                }
+            }
+        }
+    }
 }
diff --git a/regression-test/suites/autobucket/test_autobucket.groovy b/regression-test/suites/autobucket/test_autobucket.groovy
new file mode 100644
index 0000000000..29945e0f9a
--- /dev/null
+++ b/regression-test/suites/autobucket/test_autobucket.groovy
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_autobucket") {
+    sql "drop table if exists autobucket_test"
+    result = sql """
+        CREATE TABLE `autobucket_test` (
+          `user_id` largeint(40) NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`user_id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        )
+        """
+
+    result = sql "show create table autobucket_test"
+    assertTrue(result.toString().containsIgnoreCase("BUCKETS AUTO"))
+
+    result = sql "show partitions from autobucket_test"
+    logger.info("${result}")
+    // XXX: buckets at pos(8), next maybe impl by sql meta
+    assertEquals(Integer.valueOf(result.get(0).get(8)), 10)
+
+    sql "drop table if exists autobucket_test"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/05: [fix](fe)fix anti join bug (#15955)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5e97bce8c01ff98118c7625074aa4a9be5882d60
Author: starocean999 <40...@users.noreply.github.com>
AuthorDate: Tue Jan 17 20:25:00 2023 +0800

    [fix](fe)fix anti join bug (#15955)
    
    * [fix](fe)fix anti join bug
    
    * fix fe ut
---
 .../java/org/apache/doris/analysis/Analyzer.java   | 93 ++++++++++++++++------
 .../org/apache/doris/analysis/JoinOperator.java    | 13 +++
 .../apache/doris/planner/SingleNodePlanner.java    |  9 +--
 .../java/org/apache/doris/planner/PlannerTest.java |  2 +-
 .../org/apache/doris/planner/QueryPlanTest.java    |  2 +-
 regression-test/data/query_p0/join/test_join.out   | 80 +++++++++++++++++++
 .../suites/query_p0/join/test_join.groovy          | 10 ++-
 7 files changed, 178 insertions(+), 31 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 946718b2c6..649f054e39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -358,7 +358,9 @@ public class Analyzer {
         // corresponding value could be an empty list. There is no entry for non-outer joins.
         public final Map<TupleId, List<ExprId>> conjunctsByOjClause = Maps.newHashMap();
 
-        public final Map<TupleId, List<ExprId>> conjunctsByAntiJoinClause = Maps.newHashMap();
+        public final Map<TupleId, List<ExprId>> conjunctsByAntiJoinNullAwareClause = Maps.newHashMap();
+
+        public final Map<TupleId, List<ExprId>> conjunctsBySemiAntiJoinNoNullAwareClause = Maps.newHashMap();
 
         // map from registered conjunct to its containing outer join On clause (represented
         // by its right-hand side table ref); only conjuncts that can only be correctly
@@ -1387,10 +1389,27 @@ public class Analyzer {
      * Return all unassigned conjuncts of the anti join referenced by
      * right-hand side table ref.
      */
-    public List<Expr> getUnassignedAntiJoinConjuncts(TableRef ref) {
-        Preconditions.checkState(ref.getJoinOp().isAntiJoin());
+    public List<Expr> getUnassignedAntiJoinNullAwareConjuncts(TableRef ref) {
+        Preconditions.checkState(ref.getJoinOp().isAntiJoinNullAware());
+        List<Expr> result = Lists.newArrayList();
+        List<ExprId> candidates = globalState.conjunctsByAntiJoinNullAwareClause.get(ref.getId());
+        if (candidates == null) {
+            return result;
+        }
+        for (ExprId conjunctId : candidates) {
+            if (!globalState.assignedConjuncts.contains(conjunctId)) {
+                Expr e = globalState.conjuncts.get(conjunctId);
+                Preconditions.checkState(e != null);
+                result.add(e);
+            }
+        }
+        return result;
+    }
+
+    public List<Expr> getUnassignedSemiAntiJoinNoNullAwareConjuncts(TableRef ref) {
+        Preconditions.checkState(ref.getJoinOp().isSemiOrAntiJoinNoNullAware());
         List<Expr> result = Lists.newArrayList();
-        List<ExprId> candidates = globalState.conjunctsByAntiJoinClause.get(ref.getId());
+        List<ExprId> candidates = globalState.conjunctsBySemiAntiJoinNoNullAwareClause.get(ref.getId());
         if (candidates == null) {
             return result;
         }
@@ -1480,6 +1499,30 @@ public class Analyzer {
         return (tblRef.getJoinOp().isAntiJoin()) ? tblRef : null;
     }
 
+    public boolean isAntiJoinedNullAwareConjunct(Expr e) {
+        return getAntiJoinNullAwareRef(e) != null;
+    }
+
+    private TableRef getAntiJoinNullAwareRef(Expr e) {
+        TableRef tblRef = globalState.sjClauseByConjunct.get(e.getId());
+        if (tblRef == null) {
+            return null;
+        }
+        return (tblRef.getJoinOp().isAntiJoinNullAware()) ? tblRef : null;
+    }
+
+    public boolean isAntiJoinedNoNullAwareConjunct(Expr e) {
+        return getAntiJoinNoNullAwareRef(e) != null;
+    }
+
+    public TableRef getAntiJoinNoNullAwareRef(Expr e) {
+        TableRef tblRef = globalState.sjClauseByConjunct.get(e.getId());
+        if (tblRef == null) {
+            return null;
+        }
+        return (tblRef.getJoinOp().isAntiJoinNoNullAware()) ? tblRef : null;
+    }
+
     public boolean isFullOuterJoined(TupleId tid) {
         return globalState.fullOuterJoinedTupleIds.containsKey(tid);
     }
@@ -1697,11 +1740,14 @@ public class Analyzer {
                 globalState.ojClauseByConjunct.put(conjunct.getId(), rhsRef);
                 ojConjuncts.add(conjunct.getId());
             }
-            if (rhsRef.getJoinOp().isSemiJoin()) {
+            if (rhsRef.getJoinOp().isSemiAntiJoin()) {
                 globalState.sjClauseByConjunct.put(conjunct.getId(), rhsRef);
-                if (rhsRef.getJoinOp().isAntiJoin()) {
-                    globalState.conjunctsByAntiJoinClause.computeIfAbsent(rhsRef.getId(), k -> Lists.newArrayList())
-                            .add(conjunct.getId());
+                if (rhsRef.getJoinOp().isAntiJoinNullAware()) {
+                    globalState.conjunctsByAntiJoinNullAwareClause.computeIfAbsent(rhsRef.getId(),
+                            k -> Lists.newArrayList()).add(conjunct.getId());
+                } else {
+                    globalState.conjunctsBySemiAntiJoinNoNullAwareClause.computeIfAbsent(rhsRef.getId(),
+                            k -> Lists.newArrayList()).add(conjunct.getId());
                 }
             }
             if (rhsRef.getJoinOp().isInnerJoin()) {
@@ -1721,7 +1767,7 @@ public class Analyzer {
      */
     private void markConstantConjunct(Expr conjunct, boolean fromHavingClause)
             throws AnalysisException {
-        if (!conjunct.isConstant() || isOjConjunct(conjunct) || isAntiJoinedConjunct(conjunct)) {
+        if (!conjunct.isConstant() || isOjConjunct(conjunct)) {
             return;
         }
         if ((!fromHavingClause && !hasEmptySpjResultSet)
@@ -1738,24 +1784,25 @@ public class Analyzer {
                     conjunct.analyze(this);
                 }
                 final Expr newConjunct = conjunct.getResultValue();
-                if (newConjunct instanceof BoolLiteral) {
-                    final BoolLiteral value = (BoolLiteral) newConjunct;
-                    if (!value.getValue()) {
-                        if (fromHavingClause) {
-                            hasEmptyResultSet = true;
-                        } else {
-                            hasEmptySpjResultSet = true;
-                        }
+                if (newConjunct instanceof BoolLiteral || newConjunct instanceof NullLiteral) {
+                    boolean evalResult = true;
+                    if (newConjunct instanceof BoolLiteral) {
+                        evalResult = ((BoolLiteral) newConjunct).getValue();
+                    } else {
+                        evalResult = false;
                     }
-                    markConjunctAssigned(conjunct);
-                }
-                if (newConjunct instanceof NullLiteral) {
                     if (fromHavingClause) {
-                        hasEmptyResultSet = true;
+                        hasEmptyResultSet = !evalResult;
                     } else {
-                        hasEmptySpjResultSet = true;
+                        if (isAntiJoinedNoNullAwareConjunct(conjunct)) {
+                            hasEmptySpjResultSet = evalResult;
+                        } else {
+                            hasEmptySpjResultSet = !evalResult;
+                        }
+                    }
+                    if (hasEmptyResultSet || hasEmptySpjResultSet) {
+                        markConjunctAssigned(conjunct);
                     }
-                    markConjunctAssigned(conjunct);
                 }
             } catch (AnalysisException ex) {
                 throw new AnalysisException("Error evaluating \"" + conjunct.toSql() + "\"", ex);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
index de33961db1..08c0321b25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
@@ -71,6 +71,19 @@ public enum JoinOperator {
                 || this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
     }
 
+    public boolean isSemiOrAntiJoinNoNullAware() {
+        return this == JoinOperator.LEFT_SEMI_JOIN || this == JoinOperator.LEFT_ANTI_JOIN
+                || this == JoinOperator.RIGHT_SEMI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN;
+    }
+
+    public boolean isAntiJoinNullAware() {
+        return this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
+    }
+
+    public boolean isAntiJoinNoNullAware() {
+        return this == JoinOperator.LEFT_ANTI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN;
+    }
+
     public boolean isLeftSemiJoin() {
         return this.thriftJoinOp == TJoinOp.LEFT_SEMI_JOIN;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index e6ab97d2a4..854da5ddcc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -2069,11 +2069,10 @@ public class SingleNodePlanner {
             // Also assign conjuncts from On clause. All remaining unassigned conjuncts
             // that can be evaluated by this join are assigned in createSelectPlan().
             ojConjuncts = analyzer.getUnassignedOjConjuncts(innerRef);
-        } else if (innerRef.getJoinOp().isAntiJoin()) {
-            ojConjuncts = analyzer.getUnassignedAntiJoinConjuncts(innerRef);
-        } else if (innerRef.getJoinOp().isSemiJoin()) {
-            final List<TupleId> tupleIds = innerRef.getAllTupleIds();
-            ojConjuncts = analyzer.getUnassignedConjuncts(tupleIds, false);
+        } else if (innerRef.getJoinOp().isAntiJoinNullAware()) {
+            ojConjuncts = analyzer.getUnassignedAntiJoinNullAwareConjuncts(innerRef);
+        } else if (innerRef.getJoinOp().isSemiOrAntiJoinNoNullAware()) {
+            ojConjuncts = analyzer.getUnassignedSemiAntiJoinNoNullAwareConjuncts(innerRef);
         }
         analyzer.markConjunctsAssigned(ojConjuncts);
         if (eqJoinConjuncts.isEmpty() || innerRef.isMark()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
index 3d2293c670..2b5e0c3266 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
@@ -405,7 +405,7 @@ public class PlannerTest extends TestWithFeService {
         compare.accept("select * from db1.tbl2 where k1 = 2.0", "select * from db1.tbl2 where k1 = 2");
         compare.accept("select * from db1.tbl2 where k1 = 2.1", "select * from db1.tbl2 where 2 = 2.1");
         compare.accept("select * from db1.tbl2 where k1 != 2.0", "select * from db1.tbl2 where k1 != 2");
-        compare.accept("select * from db1.tbl2 where k1 != 2.1", "select * from db1.tbl2");
+        compare.accept("select * from db1.tbl2 where k1 != 2.1", "select * from db1.tbl2 where TRUE");
         compare.accept("select * from db1.tbl2 where k1 <= 2.0", "select * from db1.tbl2 where k1 <= 2");
         compare.accept("select * from db1.tbl2 where k1 <= 2.1", "select * from db1.tbl2 where k1 <= 2");
         compare.accept("select * from db1.tbl2 where k1 >= 2.0", "select * from db1.tbl2 where k1 >= 2");
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 4f41e70c45..9d8e560c11 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1025,7 +1025,7 @@ public class QueryPlanTest extends TestWithFeService {
         String explainString = getSQLPlanOrErrorMsg("explain " + sql);
         Assert.assertTrue(explainString.contains("PLAN FRAGMENT"));
         Assert.assertTrue(explainString.contains("NESTED LOOP JOIN"));
-        Assert.assertTrue(!explainString.contains("PREDICATES"));
+        Assert.assertTrue(!explainString.contains("PREDICATES") || explainString.contains("PREDICATES: TRUE"));
     }
 
     @Test
diff --git a/regression-test/data/query_p0/join/test_join.out b/regression-test/data/query_p0/join/test_join.out
index c19bf29042..94d5e31dae 100644
--- a/regression-test/data/query_p0/join/test_join.out
+++ b/regression-test/data/query_p0/join/test_join.out
@@ -2714,3 +2714,83 @@ false	true	true	false	false
 5	2.200	null	\N	2019-09-09T00:00	8.9	2	\N	2	\N	\N	8.9
 5	2.200	null	\N	2019-09-09T00:00	8.9	3	\N	null	2019-09-09	\N	8.9
 
+-- !sql --
+\N
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+
+-- !sql --
+
+-- !sql --
+
+-- !sql --
+\N
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+
+-- !sql --
+\N
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+
+-- !sql --
+
+-- !sql --
+
+-- !sql --
+\N
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+
diff --git a/regression-test/suites/query_p0/join/test_join.groovy b/regression-test/suites/query_p0/join/test_join.groovy
index 146ab61de4..5137156f6a 100644
--- a/regression-test/suites/query_p0/join/test_join.groovy
+++ b/regression-test/suites/query_p0/join/test_join.groovy
@@ -1219,7 +1219,15 @@ suite("test_join", "query,p0") {
     sql"""drop table ${table_3}"""
     sql"""drop table ${table_4}"""
 
-
+    qt_sql """select k1 from baseall left semi join test on true order by k1;"""
+    qt_sql """select k1 from baseall left semi join test on false order by k1;"""
+    qt_sql """select k1 from baseall left anti join test on true order by k1;"""
+    qt_sql """select k1 from baseall left anti join test on false order by k1;"""
+
+    qt_sql """select k1 from test right semi join baseall on true order by k1;"""
+    qt_sql """select k1 from test right semi join baseall on false order by k1;"""
+    qt_sql """select k1 from test right anti join baseall on true order by k1;"""
+    qt_sql """select k1 from test right anti join baseall on false order by k1;"""
 
     // test bucket shuffle join, github issue #6171
     sql"""create database if not exists test_issue_6171"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/05: [fix](planner)wrong result when has order by under join (#15974)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5d5c2e95415feb34f7af30d0482af1c3ad8cb73a
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Tue Jan 17 20:20:56 2023 +0800

    [fix](planner)wrong result when has order by under join (#15974)
---
 .../org/apache/doris/planner/ExchangeNode.java     |  5 +--
 .../query_p0/limit/OffsetInSubqueryWithJoin.groovy | 42 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 0629af2295..d613fd9102 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -84,9 +84,10 @@ public class ExchangeNode extends PlanNode {
         if (inputNode.getFragment().isPartitioned()) {
             limit = inputNode.limit;
         }
-        offset = inputNode.offset;
+        if (!(inputNode instanceof ExchangeNode)) {
+            offset = inputNode.offset;
+        }
         computeTupleIds();
-
     }
 
     public boolean isMergingExchange() {
diff --git a/regression-test/suites/query_p0/limit/OffsetInSubqueryWithJoin.groovy b/regression-test/suites/query_p0/limit/OffsetInSubqueryWithJoin.groovy
new file mode 100644
index 0000000000..da0c7231f4
--- /dev/null
+++ b/regression-test/suites/query_p0/limit/OffsetInSubqueryWithJoin.groovy
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_offset_in_subquery_with_join", "query") {
+    // define a sql table
+    def testTable = "test_offset_in_subquery_with_join"
+
+    sql """
+        drop table if exists ${testTable}
+    """
+
+    sql """
+        create table if not exists ${testTable}(id int) distributed by hash(id) properties('replication_num'='1')
+    """
+
+    sql """
+        insert into ${testTable} values (1), (1);
+    """
+
+    test {
+        sql "select * from $testTable where id in (select id from $testTable order by id limit 1, 1)"
+        result([
+                [1],
+                [1]
+        ])
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/05: [fix](regression-test) Fix the build for Java UDF Case (#15851)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 784e8d03312cbebe66e19385088246dda0f44ae6
Author: Adonis Ling <ad...@gmail.com>
AuthorDate: Tue Jan 17 20:25:53 2023 +0800

    [fix](regression-test) Fix the build for Java UDF Case (#15851)
    
    After opening the project in Intellij Idea, we can see the cause. It is because Apache Maven of which the version is 3.8.1 or newer blocks http repositories by default. Therefore, we can fix this issue by adding a https repository which contains this package in pom.xml.
---
 regression-test/java-udf-src/pom.xml | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/regression-test/java-udf-src/pom.xml b/regression-test/java-udf-src/pom.xml
index d0cd56f498..2b01da10ad 100644
--- a/regression-test/java-udf-src/pom.xml
+++ b/regression-test/java-udf-src/pom.xml
@@ -30,6 +30,13 @@ under the License.
         <maven.compiler.target>8</maven.compiler.target>
     </properties>
 
+    <repositories>
+        <repository>
+            <id>MapR</id>
+            <url>https://repository.mapr.com/nexus/content/groups/mapr-public/conjars</url>
+        </repository>
+    </repositories>
+
     <dependencies>
         <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
         <dependency>
@@ -68,6 +75,7 @@ under the License.
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.10.1</version>
                 <configuration>
                     <source>8</source>
                     <target>8</target>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 04/05: [enhance](planner)convert 'or' into 'in-predicate' (#15737)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6ec00a340caa1f77876472bc5d18e6ca540b8153
Author: minghong <en...@gmail.com>
AuthorDate: Wed Jan 18 12:33:20 2023 +0800

    [enhance](planner)convert 'or' into 'in-predicate' (#15737)
    
    in previous [PR 12872](https://github.com/apache/doris/pull/12872), we convert multi equals on same slot into `in predicate`. for example, `a =1 or a = 2` => `a in (1, 2)`
    
    This pr makes 4 changes about convert or to in:
    1. fix a bug: `Not IN`  is merged with equal. `a =1 or a not in (2, 3)` => `a in (1, 2, 3)`
    2. extends this rule on more cases
      - merge for more than one slot: 'a =1 or a = 2 or b = 3 or b = 4' => `a in (1, 2) or b in (3, 4)`
      - merge skip not-equal and not-in: 'a =1 or a = 2 or b !=3 or c not in (1, 2)' => 'a in (1, 2) or b!=3 or c not in (1,2)`
    3. rewrite recursively.
    4. OrToIn is implemented in ExtractCommonFactorsRule. This rule will generate new exprs. OrToIn should apply on such generated exprs. for example `(a=1 and b=2) or (a=3 and b=4)` => `(a=1 or a=3) and (b=2 or b=4) and [(a=1 and b=2) or (a=3 and b=4)]` => `a in (1,3) and b in (2 ,4) and [(a=1 and b=2) or (a=3 and b=4)]`
    
    In addition, this pr add toString() for some Expr.
---
 .../org/apache/doris/analysis/BinaryPredicate.java |   7 ++
 .../apache/doris/analysis/CompoundPredicate.java   |  26 +++++
 .../org/apache/doris/analysis/LiteralExpr.java     |   5 +
 .../java/org/apache/doris/analysis/SlotRef.java    |  12 ++
 .../doris/rewrite/ExtractCommonFactorsRule.java    | 128 +++++++++++++++------
 .../org/apache/doris/analysis/SelectStmtTest.java  |  26 +++--
 .../org/apache/doris/planner/QueryPlanTest.java    |  57 +++++++--
 .../ExtractCommonFactorsRuleFunctionTest.java      |  12 +-
 .../data/performance_p0/redundant_conjuncts.out    |   2 +-
 9 files changed, 215 insertions(+), 60 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
index 6a0d1002e4..e1f24c4530 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
@@ -218,6 +218,13 @@ public class BinaryPredicate extends Predicate implements Writable {
         this.op = op;
     }
 
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append(children.get(0)).append(" ").append(op).append(" ").append(children.get(1));
+        return builder.toString();
+    }
+
     @Override
     public Expr negate() {
         Operator newOp = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CompoundPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CompoundPredicate.java
index 7ce22bb732..f1e41ad0b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CompoundPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CompoundPredicate.java
@@ -213,6 +213,27 @@ public class CompoundPredicate extends Predicate {
         return conjunctivePred;
     }
 
+    /**
+     * Creates a disjunctive predicate from a list of exprs,
+     * reserve the expr order
+     */
+    public static Expr createDisjunctivePredicate(List<Expr> disjunctions) {
+        Expr result = null;
+        for (Expr expr : disjunctions) {
+            if (result == null) {
+                result = expr;
+                continue;
+            }
+            result = new CompoundPredicate(CompoundPredicate.Operator.OR, result, expr);
+        }
+        return result;
+    }
+
+    public static boolean isOr(Expr expr) {
+        return expr instanceof CompoundPredicate
+                && ((CompoundPredicate) expr).getOp() == Operator.OR;
+    }
+
     @Override
     public Expr getResultValue() throws AnalysisException {
         recursiveResetChildrenResult();
@@ -261,4 +282,9 @@ public class CompoundPredicate extends Predicate {
     public void finalizeImplForNereids() throws AnalysisException {
 
     }
+
+    @Override
+    public String toString() {
+        return toSqlImpl();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
index 0528e33eed..022f1b41d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
@@ -262,4 +262,9 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
     public void finalizeImplForNereids() throws AnalysisException {
 
     }
+
+    @Override
+    public String toString() {
+        return getStringValue();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
index 2c9db0fdad..0650acfb28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
@@ -484,4 +484,16 @@ public class SlotRef extends Expr {
     public void finalizeImplForNereids() throws AnalysisException {
 
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        if (tblName != null) {
+            builder.append(tblName).append(".");
+        }
+        if (label != null) {
+            builder.append(label);
+        }
+        return builder.toString();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExtractCommonFactorsRule.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExtractCommonFactorsRule.java
index 74130d52af..d28fde13ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExtractCommonFactorsRule.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExtractCommonFactorsRule.java
@@ -43,11 +43,13 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This rule extracts common predicate from multiple disjunctions when it is applied
@@ -113,7 +115,8 @@ public class ExtractCommonFactorsRule implements ExprRewriteRule {
      * 4. Construct new expr:
      * @return: a and b' and (b or (e and f))
      */
-    private Expr extractCommonFactors(List<List<Expr>> exprs, Analyzer analyzer, ExprRewriter.ClauseType clauseType) {
+    private Expr extractCommonFactors(List<List<Expr>> exprs, Analyzer analyzer, ExprRewriter.ClauseType clauseType)
+            throws AnalysisException {
         if (exprs.size() < 2) {
             return null;
         }
@@ -187,12 +190,19 @@ public class ExtractCommonFactorsRule implements ExprRewriteRule {
         }
         Expr result = null;
         if (CollectionUtils.isNotEmpty(commonFactorList)) {
+            commonFactorList = commonFactorList.stream().map(expr -> {
+                try {
+                    return apply(expr, analyzer, clauseType);
+                } catch (AnalysisException e) {
+                    throw new RuntimeException(e);
+                }
+            }).collect(Collectors.toList());
             result = new CompoundPredicate(CompoundPredicate.Operator.AND,
                     makeCompound(commonFactorList, CompoundPredicate.Operator.AND),
-                    makeCompoundRemaining(remainingOrClause, CompoundPredicate.Operator.OR));
+                    makeCompoundRemaining(remainingOrClause, CompoundPredicate.Operator.OR, analyzer, clauseType));
             result.setPrintSqlInParens(true);
         } else {
-            result = makeCompoundRemaining(remainingOrClause, CompoundPredicate.Operator.OR);
+            result = makeCompoundRemaining(remainingOrClause, CompoundPredicate.Operator.OR, analyzer, clauseType);
         }
         if (LOG.isDebugEnabled()) {
             LOG.debug("equal ors: " + result.toSql());
@@ -430,7 +440,8 @@ public class ExtractCommonFactorsRule implements ExprRewriteRule {
         return result;
     }
 
-    private Expr makeCompoundRemaining(List<Expr> exprs, CompoundPredicate.Operator op) {
+    private Expr makeCompoundRemaining(List<Expr> exprs, CompoundPredicate.Operator op,
+            Analyzer analyzer, ExprRewriter.ClauseType clauseType) throws AnalysisException {
         if (CollectionUtils.isEmpty(exprs)) {
             return null;
         }
@@ -441,7 +452,7 @@ public class ExtractCommonFactorsRule implements ExprRewriteRule {
         Expr rewritePredicate = null;
         // only OR will be rewrite to IN
         if (op == CompoundPredicate.Operator.OR) {
-            rewritePredicate = rewriteOrToIn(exprs);
+            rewritePredicate = rewriteOrToIn(exprs, analyzer, clauseType);
             // IF rewrite finished, rewritePredicate will not be null
             // IF not rewrite, do compoundPredicate
             if (rewritePredicate != null) {
@@ -457,60 +468,109 @@ public class ExtractCommonFactorsRule implements ExprRewriteRule {
         return result;
     }
 
-    private Expr rewriteOrToIn(List<Expr> exprs) {
+    private Expr rewriteOrToIn(List<Expr> exprs, Analyzer analyzer, ExprRewriter.ClauseType clauseType)
+            throws AnalysisException {
         // remainingOR  expr = BP IP
         InPredicate inPredicate = null;
-        boolean isOrToInAllowed = true;
-        Set<String> slotSet = new LinkedHashSet<>();
-
         int rewriteThreshold;
         if (ConnectContext.get() == null) {
             rewriteThreshold = 2;
         } else {
             rewriteThreshold = ConnectContext.get().getSessionVariable().getRewriteOrToInPredicateThreshold();
         }
+        List<Expr> notMergedExprs = Lists.newArrayList();
+        /**
+         * col1= 1 or col1=2 or col2=3 or col2=4 or col1 != 5 or col1 not in (2)
+         * ==>
+         * slotNameToMergeExprsMap:
+         *  {
+         *      col1:[col1=1, col1=2],
+         *      col2:[col2=3, col2=4]
+         *  }
+         * notMergedExprs: [col1 != 5, col1 not in (2)]
+         */
+        Map<String, List<Expr>> slotNameToMergeExprsMap = new HashMap<>();
+        /*
+        slotNameForMerge is keys of slotNameToMergeExprsMap, but reserves slot orders in original expr.
+        To reserve orders, we can get stable output, and hence good for unit/regression test.
+         */
+        List<String> slotNameForMerge = Lists.newArrayList();
 
         for (int i = 0; i < exprs.size(); i++) {
             Expr predicate = exprs.get(i);
-            if (!(predicate instanceof BinaryPredicate) && !(predicate instanceof InPredicate)) {
-                isOrToInAllowed = false;
-                break;
+            if (predicate instanceof CompoundPredicate
+                    && ((CompoundPredicate) predicate).getOp() == Operator.AND) {
+                CompoundPredicate and = (CompoundPredicate) predicate;
+                Expr left = and.getChild(0);
+                if (left instanceof CompoundPredicate) {
+                    left = apply(and.getChild(0), analyzer, clauseType);
+                    if (CompoundPredicate.isOr(left)) {
+                        left.setPrintSqlInParens(true);
+                    }
+                }
+                Expr right = and.getChild(1);
+                if (right instanceof CompoundPredicate) {
+                    right = apply(and.getChild(1), analyzer, clauseType);
+                    if (CompoundPredicate.isOr(right)) {
+                        right.setPrintSqlInParens(true);
+                    }
+                }
+                notMergedExprs.add(new CompoundPredicate(Operator.AND, left, right));
+            } else if (!(predicate instanceof BinaryPredicate) && !(predicate instanceof InPredicate)) {
+                notMergedExprs.add(predicate);
             } else if (!(predicate.getChild(0) instanceof SlotRef)) {
-                isOrToInAllowed = false;
-                break;
+                notMergedExprs.add(predicate);
             } else if (!(predicate.getChild(1) instanceof LiteralExpr)) {
-                isOrToInAllowed = false;
-                break;
+                notMergedExprs.add(predicate);
             } else if (predicate instanceof BinaryPredicate
                     && ((BinaryPredicate) predicate).getOp() != BinaryPredicate.Operator.EQ) {
-                isOrToInAllowed = false;
-                break;
+                notMergedExprs.add(predicate);
+            } else if (predicate instanceof InPredicate
+                    && ((InPredicate) predicate).isNotIn()) {
+                notMergedExprs.add(predicate);
             } else {
                 TableName tableName = ((SlotRef) predicate.getChild(0)).getTableName();
+                String columnWithTable;
                 if (tableName != null) {
                     String tblName = tableName.toString();
-                    String columnWithTable = tblName + "." + ((SlotRef) predicate.getChild(0)).getColumnName();
-                    slotSet.add(columnWithTable);
+                    columnWithTable = tblName + "." + ((SlotRef) predicate.getChild(0)).getColumnName();
                 } else {
-                    slotSet.add(((SlotRef) predicate.getChild(0)).getColumnName());
+                    columnWithTable = ((SlotRef) predicate.getChild(0)).getColumnName();
                 }
+                slotNameToMergeExprsMap.computeIfAbsent(columnWithTable, key -> {
+                    slotNameForMerge.add(columnWithTable);
+                    return Lists.newArrayList();
+                });
+
+                slotNameToMergeExprsMap.get(columnWithTable).add(predicate);
             }
         }
-
-        // isOrToInAllowed : true, means can rewrite
-        // slotSet.size : nums of columnName in exprs, should be 1
-        if (isOrToInAllowed && slotSet.size() == 1) {
-            if (exprs.size() < rewriteThreshold) {
-                return null;
+        Expr notMerged = null;
+        if (!notMergedExprs.isEmpty()) {
+            notMerged = CompoundPredicate.createDisjunctivePredicate(notMergedExprs);
+        }
+        List<Expr> rewritten = Lists.newArrayList();
+        if (!slotNameToMergeExprsMap.isEmpty()) {
+            for (String columnNameWithTable : slotNameForMerge) {
+                List<Expr> toMerge = slotNameToMergeExprsMap.get(columnNameWithTable);
+                if (toMerge.size() < rewriteThreshold) {
+                    rewritten.addAll(toMerge);
+                } else {
+                    List<Expr> deduplicationExprs = getDeduplicationList(toMerge);
+                    inPredicate = new InPredicate(deduplicationExprs.get(0),
+                            deduplicationExprs.subList(1, deduplicationExprs.size()), false);
+                    rewritten.add(inPredicate);
+                }
             }
-
-            // get deduplication list
-            List<Expr> deduplicationExprs = getDeduplicationList(exprs);
-            inPredicate = new InPredicate(deduplicationExprs.get(0),
-                    deduplicationExprs.subList(1, deduplicationExprs.size()), false);
         }
-
-        return inPredicate;
+        if (rewritten.isEmpty()) {
+            return notMerged;
+        } else {
+            if (notMerged != null) {
+                rewritten.add(notMerged);
+            }
+            return CompoundPredicate.createDisjunctivePredicate(rewritten);
+        }
     }
 
     public List<Expr> getDeduplicationList(List<Expr> exprs) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
index 7e0137e6bd..39dc947fe1 100755
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
@@ -289,7 +289,6 @@ public class SelectStmtTest {
         String betweenExpanded3 = "`t1`.`k4` >= 50 AND `t1`.`k4` <= 250";
 
         String rewrittenSql = stmt.toSql();
-        System.out.println(rewrittenSql);
         Assert.assertTrue(rewrittenSql.contains(commonExpr1));
         Assert.assertEquals(rewrittenSql.indexOf(commonExpr1), rewrittenSql.lastIndexOf(commonExpr1));
         Assert.assertTrue(rewrittenSql.contains(commonExpr2));
@@ -330,13 +329,18 @@ public class SelectStmtTest {
                 + ")";
         SelectStmt stmt2 = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql2, ctx);
         stmt2.rewriteExprs(new Analyzer(ctx.getEnv(), ctx).getExprRewriter());
-        String fragment3 = "((`t1`.`k1` = `t2`.`k3` AND `t2`.`k2` = 'United States'"
-                + " AND `t2`.`k3` IN ('CO', 'IL', 'MN') "
-                + "AND `t1`.`k4` >= 100 AND `t1`.`k4` <= 200) "
-                + "OR (`t1`.`k1` = `t2`.`k1` AND `t2`.`k2` = 'United States1' "
-                + "AND `t2`.`k3` IN ('OH', 'MT', 'NM') AND `t1`.`k4` >= 150 AND `t1`.`k4` <= 300) "
-                + "OR (`t1`.`k1` = `t2`.`k1` AND `t2`.`k2` = 'United States' AND `t2`.`k3` IN ('TX', 'MO', 'MI') "
-                + "AND `t1`.`k4` >= 50 AND `t1`.`k4` <= 250))";
+        String fragment3 =
+                "(((`t1`.`k4` >= 50 AND `t1`.`k4` <= 300) AND `t2`.`k2` IN ('United States', 'United States1') "
+                        + "AND `t2`.`k3` IN ('CO', 'IL', 'MN', 'OH', 'MT', 'NM', 'TX', 'MO', 'MI')) "
+                        + "AND `t1`.`k1` = `t2`.`k3` AND `t2`.`k2` = 'United States' "
+                        + "AND `t2`.`k3` IN ('CO', 'IL', 'MN') AND `t1`.`k4` >= 100 AND `t1`.`k4` <= 200 "
+                        + "OR "
+                        + "`t1`.`k1` = `t2`.`k1` AND `t2`.`k2` = 'United States1' "
+                        + "AND `t2`.`k3` IN ('OH', 'MT', 'NM') AND `t1`.`k4` >= 150 AND `t1`.`k4` <= 300 "
+                        + "OR "
+                        + "`t1`.`k1` = `t2`.`k1` AND `t2`.`k2` = 'United States' "
+                        + "AND `t2`.`k3` IN ('TX', 'MO', 'MI') "
+                        + "AND `t1`.`k4` >= 50 AND `t1`.`k4` <= 250)";
         Assert.assertTrue(stmt2.toSql().contains(fragment3));
 
         String sql3 = "select\n"
@@ -396,7 +400,7 @@ public class SelectStmtTest {
         SelectStmt stmt7 = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql7, ctx);
         stmt7.rewriteExprs(new Analyzer(ctx.getEnv(), ctx).getExprRewriter());
         Assert.assertTrue(stmt7.toSql()
-                .contains("`t2`.`k1` IS NOT NULL OR (`t1`.`k1` IS NOT NULL " + "AND `t1`.`k2` IS NOT NULL)"));
+                .contains("`t2`.`k1` IS NOT NULL OR `t1`.`k1` IS NOT NULL AND `t1`.`k2` IS NOT NULL"));
 
         String sql8 = "select\n"
                 + "   avg(t1.k4)\n"
@@ -408,13 +412,13 @@ public class SelectStmtTest {
         SelectStmt stmt8 = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql8, ctx);
         stmt8.rewriteExprs(new Analyzer(ctx.getEnv(), ctx).getExprRewriter());
         Assert.assertTrue(stmt8.toSql()
-                .contains("`t2`.`k1` IS NOT NULL AND `t1`.`k1` IS NOT NULL" + " AND `t1`.`k1` IS NOT NULL"));
+                .contains("`t2`.`k1` IS NOT NULL AND `t1`.`k1` IS NOT NULL AND `t1`.`k1` IS NOT NULL"));
 
         String sql9 = "select * from db1.tbl1 where (k1='shutdown' and k4<1) or (k1='switchOff' and k4>=1)";
         SelectStmt stmt9 = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql9, ctx);
         stmt9.rewriteExprs(new Analyzer(ctx.getEnv(), ctx).getExprRewriter());
         Assert.assertTrue(
-                stmt9.toSql().contains("(`k1` = 'shutdown' AND `k4` < 1)" + " OR (`k1` = 'switchOff' AND `k4` >= 1)"));
+                stmt9.toSql().contains("`k1` = 'shutdown' AND `k4` < 1 OR `k1` = 'switchOff' AND `k4` >= 1"));
     }
 
     @Test
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 9d8e560c11..5b403f95b4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -2220,40 +2220,81 @@ public class QueryPlanTest extends TestWithFeService {
         Assert.assertTrue(explainString.contains("PREAGGREGATION: ON"));
     }
 
+    /*
+    NOTE:
+    explainString.contains("PREDICATES: xxx\n")
+    add '\n' at the end of line to ensure there are no other predicates
+     */
     @Test
     public void testRewriteOrToIn() throws Exception {
         connectContext.setDatabase("default_cluster:test");
         String sql = "SELECT * from test1 where query_time = 1 or query_time = 2 or query_time in (3, 4)";
         String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2, 3, 4)"));
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2, 3, 4)\n"));
 
         sql = "SELECT * from test1 where (query_time = 1 or query_time = 2) and query_time in (3, 4)";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2), `query_time` IN (3, 4)"));
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2), `query_time` IN (3, 4)\n"));
 
         sql = "SELECT * from test1 where (query_time = 1 or query_time = 2 or scan_bytes = 2) and scan_bytes in (2, 3)";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("PREDICATES: (`query_time` = 1 OR `query_time` = 2 OR `scan_bytes` = 2), `scan_bytes` IN (2, 3)"));
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2) OR `scan_bytes` = 2, `scan_bytes` IN (2, 3)\n"));
 
         sql = "SELECT * from test1 where (query_time = 1 or query_time = 2) and (scan_bytes = 2 or scan_bytes = 3)";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2), `scan_bytes` IN (2, 3)"));
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2), `scan_bytes` IN (2, 3)\n"));
 
         sql = "SELECT * from test1 where query_time = 1 or query_time = 2 or query_time = 3 or query_time = 1";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2, 3)"));
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2, 3)\n"));
 
         sql = "SELECT * from test1 where query_time = 1 or query_time = 2 or query_time in (3, 2)";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2, 3)"));
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2, 3)\n"));
 
         connectContext.getSessionVariable().setRewriteOrToInPredicateThreshold(100);
         sql = "SELECT * from test1 where query_time = 1 or query_time = 2 or query_time in (3, 4)";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("PREDICATES: (`query_time` = 1 OR `query_time` = 2 OR `query_time` IN (3, 4))"));
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` = 1 OR `query_time` = 2 OR `query_time` IN (3, 4)\n"));
+        connectContext.getSessionVariable().setRewriteOrToInPredicateThreshold(2);
 
         sql = "SELECT * from test1 where (query_time = 1 or query_time = 2) and query_time in (3, 4)";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("PREDICATES: (`query_time` = 1 OR `query_time` = 2), `query_time` IN (3, 4)"));
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2), `query_time` IN (3, 4)\n"));
+
+        //test we can handle `!=` and `not in`
+        sql = "select * from test1 where (query_time = 1 or query_time = 2 or query_time!= 3 or query_time not in (5, 6))";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2) OR `query_time` != 3 OR `query_time` NOT IN (5, 6)\n"));
+
+        //test we can handle merge 2 or more columns
+        sql = "select * from test1 where (query_time = 1 or query_time = 2 or scan_rows = 3 or scan_rows = 4)";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2) OR `scan_rows` IN (3, 4)"));
+
+        //merge in-pred or in-pred
+        sql = "select * from test1 where (query_time = 1 or query_time = 2 or query_time = 3 or query_time = 4)";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
+        Assert.assertTrue(explainString.contains("PREDICATES: `query_time` IN (1, 2, 3, 4)\n"));
+
+        //rewrite recursively
+        sql = "select * from test1 "
+                + "where query_id=client_ip "
+                + "      and (stmt_id=1 or stmt_id=2 or stmt_id=3 "
+                + "           or (user='abc' and (state = 'a' or state='b' or state in ('c', 'd'))))"
+                + "      or (db not in ('x', 'y')) ";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
+        Assert.assertTrue(explainString.contains(
+                "PREDICATES: `query_id` = `client_ip` "
+                        + "AND (`stmt_id` IN (1, 2, 3) OR `user` = 'abc' AND `state` IN ('a', 'b', 'c', 'd')) "
+                        + "OR (`db` NOT IN ('x', 'y'))\n"));
+
+        //ExtractCommonFactorsRule may generate more expr, test the rewriteOrToIn applied on generated exprs
+        sql = "select * from test1 where (stmt_id=1 and state='a') or (stmt_id=2 and state='b')";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
+        Assert.assertTrue(explainString.contains(
+                "PREDICATES: `state` IN ('a', 'b'), `stmt_id` IN (1, 2),"
+                        + " `stmt_id` = 1 AND `state` = 'a' OR `stmt_id` = 2 AND `state` = 'b'\n"
+        ));
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/rewrite/ExtractCommonFactorsRuleFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/rewrite/ExtractCommonFactorsRuleFunctionTest.java
index 2f994be85b..5458f30043 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/rewrite/ExtractCommonFactorsRuleFunctionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/rewrite/ExtractCommonFactorsRuleFunctionTest.java
@@ -100,15 +100,15 @@ public class ExtractCommonFactorsRuleFunctionTest {
     public void testWideCommonFactorsWithOrPredicate() throws Exception {
         String query = "select * from tb1 where tb1.k1 > 1000 or tb1.k1 < 200 or tb1.k1 = 300";
         String planString = dorisAssert.query(query).explainQuery();
-        Assert.assertTrue(planString.contains("PREDICATES: (`tb1`.`k1` > 1000 OR `tb1`.`k1` < 200 OR `tb1`.`k1` = 300)"));
+        Assert.assertTrue(planString.contains("PREDICATES: `tb1`.`k1` = 300 OR `tb1`.`k1` > 1000 OR `tb1`.`k1` < 200"));
     }
 
     @Test
     public void testWideCommonFactorsWithEqualPredicate() throws Exception {
         String query = "select * from tb1, tb2 where (tb1.k1=1 and tb2.k1=1) or (tb1.k1 =2 and tb2.k1=2)";
         String planString = dorisAssert.query(query).explainQuery();
-        Assert.assertTrue(planString.contains("(`tb1`.`k1` = 1 OR `tb1`.`k1` = 2)"));
-        Assert.assertTrue(planString.contains("(`tb2`.`k1` = 1 OR `tb2`.`k1` = 2)"));
+        Assert.assertTrue(planString.contains("`tb1`.`k1` IN (1, 2)"));
+        Assert.assertTrue(planString.contains("`tb2`.`k1` IN (1, 2)"));
         Assert.assertTrue(planString.contains("NESTED LOOP JOIN"));
     }
 
@@ -259,10 +259,10 @@ public class ExtractCommonFactorsRuleFunctionTest {
         Assert.assertTrue(planString.contains("`l_partkey` = `p_partkey`"));
         Assert.assertTrue(planString.contains("`l_shipmode` IN ('AIR', 'AIR REG')"));
         Assert.assertTrue(planString.contains("`l_shipinstruct` = 'DELIVER IN PERSON'"));
-        Assert.assertTrue(planString.contains("((`l_quantity` >= 9 AND `l_quantity` <= 19) "
-                + "OR (`l_quantity` >= 20 AND `l_quantity` <= 36))"));
+        Assert.assertTrue(planString.contains("`l_quantity` >= 9 AND `l_quantity` <= 19 "
+                + "OR `l_quantity` >= 20 AND `l_quantity` <= 36"));
         Assert.assertTrue(planString.contains("`p_size` >= 1"));
-        Assert.assertTrue(planString.contains("(`p_brand` = 'Brand#11' OR `p_brand` = 'Brand#21' OR `p_brand` = 'Brand#32')"));
+        Assert.assertTrue(planString.contains("`p_brand` IN ('Brand#11', 'Brand#21', 'Brand#32')"));
         Assert.assertTrue(planString.contains("`p_size` <= 15"));
         Assert.assertTrue(planString.contains("`p_container` IN ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG', 'MED BAG', "
                 + "'MED BOX', 'MED PKG', 'MED PACK', 'LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')"));
diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out b/regression-test/data/performance_p0/redundant_conjuncts.out
index 98178f31aa..3baa5b3d93 100644
--- a/regression-test/data/performance_p0/redundant_conjuncts.out
+++ b/regression-test/data/performance_p0/redundant_conjuncts.out
@@ -23,7 +23,7 @@ PLAN FRAGMENT 0
 
   0:VOlapScanNode
      TABLE: default_cluster:regression_test_performance_p0.redundant_conjuncts(redundant_conjuncts), PREAGGREGATION: OFF. Reason: No AggregateInfo
-     PREDICATES: (`k1` = 1 OR `k1` = 2)
+     PREDICATES: `k1` = 1 OR `k1` = 2
      partitions=0/1, tablets=0/0, tabletList=
      cardinality=0, avgRowSize=8.0, numNodes=1
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org