You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/01/03 15:45:16 UTC

[incubator-doris] branch master updated: [Dynamic Partition] Support for automatically adding partitions

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new af9529a  [Dynamic Partition] Support for automatically adding partitions
af9529a is described below

commit af9529a2071415830447ff8d1a417c8f542b8aed
Author: WingC <10...@qq.com>
AuthorDate: Fri Jan 3 09:45:04 2020 -0600

    [Dynamic Partition] Support for automatically adding partitions
    
    In some scenarios, when a user creates an olap table that is range partition by time, the user needs to periodically add and remove partitions to ensure that the data is valid. As a result, adding and removing partitions dynamically can be very useful for users.
---
 fe/src/main/cup/sql_parser.cup                     |   9 +-
 fe/src/main/java/org/apache/doris/alter/Alter.java |   4 +
 .../apache/doris/alter/SchemaChangeHandler.java    |   4 +
 .../analysis/ModifyTablePropertiesClause.java      |   7 +-
 .../doris/analysis/ShowDynamicPartitionStmt.java   |  87 ++++
 .../java/org/apache/doris/catalog/Catalog.java     |  72 ++-
 .../doris/catalog/DynamicPartitionProperty.java    |  82 ++++
 .../java/org/apache/doris/catalog/OlapTable.java   |  35 ++
 .../org/apache/doris/catalog/TableProperty.java    |  89 ++++
 .../doris/clone/DynamicPartitionScheduler.java     | 292 +++++++++++++
 .../main/java/org/apache/doris/common/Config.java  |  12 +
 .../java/org/apache/doris/common/ErrorCode.java    |  20 +-
 .../org/apache/doris/common/FeMetaVersion.java     |   6 +-
 .../doris/common/util/DynamicPartitionUtil.java    | 267 ++++++++++++
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../java/org/apache/doris/persist/EditLog.java     |  11 +-
 .../doris/persist/ModifyDynamicPartitionInfo.java  |  66 +++
 .../org/apache/doris/persist/OperationType.java    |   3 +
 .../java/org/apache/doris/qe/ShowExecutor.java     |  49 +++
 fe/src/main/jflex/sql_scanner.flex                 |   1 +
 .../apache/doris/alter/SchemaChangeJobV2Test.java  |  95 +++-
 .../doris/catalog/DynamicPartitionTableTest.java   | 482 +++++++++++++++++++++
 .../java/org/apache/doris/catalog/FakeEditLog.java |   6 +
 .../apache/doris/catalog/TablePropertyTest.java    |  74 ++++
 .../persist/ModifyDynamicPartitionInfoTest.java    |  68 +++
 25 files changed, 1836 insertions(+), 11 deletions(-)

diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup
index 9cc533a..9593dc9 100644
--- a/fe/src/main/cup/sql_parser.cup
+++ b/fe/src/main/cup/sql_parser.cup
@@ -196,7 +196,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A
     KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED,
     KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CURRENT, KW_CURRENT_USER,
     KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_TIME, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE,
-    KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE,
+    KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE,
     KW_ELSE, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXISTS, KW_EXPORT, KW_EXTERNAL, KW_EXTRACT,
     KW_FALSE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FILE, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION,
     KW_GLOBAL, KW_GRANT, KW_GRANTS, KW_GROUP,
@@ -1939,6 +1939,11 @@ show_param ::=
     {:
         RESULT = new ShowDbStmt(parser.wild, parser.where);
     :}
+    /* Dynamic Partition */
+    | KW_DYNAMIC KW_PARTITION KW_TABLES opt_db:db
+    {:
+        RESULT = new ShowDynamicPartitionStmt(db);
+    :}
     /* Columns */
     | opt_full KW_COLUMNS from_or_in table_name:table opt_db:db opt_wild_where
     {:
@@ -4341,6 +4346,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_STOP:id
     {: RESULT = id; :}
+    | KW_DYNAMIC:id
+    {: RESULT = id; :}
     ;
 
 // Identifier that contain keyword
diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java b/fe/src/main/java/org/apache/doris/alter/Alter.java
index 062ad68..2767106 100644
--- a/fe/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/src/main/java/org/apache/doris/alter/Alter.java
@@ -56,6 +56,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.persist.AlterViewInfo;
 import org.apache.doris.qe.ConnectContext;
 
@@ -298,8 +299,10 @@ public class Alter {
                 Preconditions.checkState(alterClauses.size() == 1);
                 AlterClause alterClause = alterClauses.get(0);
                 if (alterClause instanceof DropPartitionClause) {
+                    DynamicPartitionUtil.checkAlterAllowed(olapTable);
                     Catalog.getInstance().dropPartition(db, olapTable, ((DropPartitionClause) alterClause));
                 } else if (alterClause instanceof ModifyPartitionClause) {
+                    DynamicPartitionUtil.checkAlterAllowed(olapTable);
                     Catalog.getInstance().modifyPartition(db, olapTable, ((ModifyPartitionClause) alterClause));
                 } else {
                     hasAddPartition = true;
@@ -316,6 +319,7 @@ public class Alter {
             Preconditions.checkState(alterClauses.size() == 1);
             AlterClause alterClause = alterClauses.get(0);
             if (alterClause instanceof AddPartitionClause) {
+                DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName));
                 Catalog.getInstance().addPartition(db, tableName, (AddPartitionClause) alterClause);
             } else {
                 Preconditions.checkState(false);
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 152b71a..92edff6 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -60,6 +60,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
@@ -1341,6 +1342,9 @@ public class SchemaChangeHandler extends AlterHandler {
                      */
                     sendClearAlterTask(db, olapTable);
                     return;
+                } else if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
+                    Catalog.getCurrentCatalog().modifyTableDynamicPartition(db, olapTable, properties);
+                    return;
                 }
             }
 
diff --git a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index c7678dd..e9b3018 100644
--- a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -17,8 +17,10 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.catalog.TableProperty;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.PropertyAnalyzer;
 
@@ -39,7 +41,8 @@ public class ModifyTablePropertiesClause extends AlterTableClause {
             throw new AnalysisException("Properties is not set");
         }
 
-        if (properties.size() != 1) {
+        if (properties.size() != 1
+                && !TableProperty.isSamePrefixProperties(properties, TableProperty.DYNAMIC_PARTITION_PROPERTY_PREFIX)) {
             throw new AnalysisException("Can only set one table property at a time");
         }
 
@@ -71,6 +74,8 @@ public class ModifyTablePropertiesClause extends AlterTableClause {
                 throw new AnalysisException(
                         "Property " + PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT + " should be v2");
             }
+        } else if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
+            // do nothing, dynamic properties will be analyzed in SchemaChangeHandler.process
         } else {
             throw new AnalysisException("Unknown table property: " + properties.keySet());
         }
diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowDynamicPartitionStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowDynamicPartitionStmt.java
new file mode 100644
index 0000000..30bfaa4
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/analysis/ShowDynamicPartitionStmt.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import com.google.common.base.Strings;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.qe.ShowResultSetMetaData;
+
+public class ShowDynamicPartitionStmt extends ShowStmt {
+    private String db;
+    private static final ShowResultSetMetaData SHOW_DYNAMIC_PARTITION_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TableName", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Enable", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TimeUnit", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("End", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Prefix", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Buckets", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LastUpdateTime", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LastSchedulerTime", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("State", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Msg", ScalarType.createVarchar(20)))
+                    .build();
+
+    ShowDynamicPartitionStmt(String db) {
+        this.db = db;
+    }
+
+    public String getDb() {
+        return db;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException {
+        if (Strings.isNullOrEmpty(db)) {
+            db = analyzer.getDefaultDb();
+            if (Strings.isNullOrEmpty(db)) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+        } else {
+            db = ClusterNamespace.getFullName(analyzer.getClusterName(), db);
+        }
+
+        // we do not check db privs here. because user may not have any db privs,
+        // but if it has privs of tbls inside this db,it should be allowed to see this db.
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("SHOW DYNAMIC PARTITION TABLES");
+        if (!Strings.isNullOrEmpty(db)) {
+            sb.append(" FROM ").append(db);
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
+    @Override
+    public ShowResultSetMetaData getMetaData() {
+        return SHOW_DYNAMIC_PARTITION_META_DATA;
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 08a06ed..704193d 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -85,6 +85,7 @@ import org.apache.doris.catalog.OlapTable.OlapTableState;
 import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.Table.TableType;
 import org.apache.doris.clone.ColocateTableBalancer;
+import org.apache.doris.clone.DynamicPartitionScheduler;
 import org.apache.doris.clone.TabletChecker;
 import org.apache.doris.clone.TabletScheduler;
 import org.apache.doris.clone.TabletSchedulerStat;
@@ -104,12 +105,14 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.Daemon;
+import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.KuduUtil;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.QueryableReentrantLock;
 import org.apache.doris.common.util.SmallFileMgr;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.consistency.ConsistencyChecker;
 import org.apache.doris.deploy.DeployManager;
@@ -154,6 +157,7 @@ import org.apache.doris.persist.DatabaseInfo;
 import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
 import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.ModifyDynamicPartitionInfo;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.ModifyPartitionInfo;
 import org.apache.doris.persist.PartitionPersistInfo;
@@ -370,6 +374,8 @@ public class Catalog {
 
     private SmallFileMgr smallFileMgr;
 
+    private DynamicPartitionScheduler dynamicPartitionScheduler;
+
     public List<Frontend> getFrontends(FrontendNodeType nodeType) {
         if (nodeType == null) {
             // get all
@@ -418,6 +424,10 @@ public class Catalog {
         return metaReplayState;
     }
 
+    public DynamicPartitionScheduler getDynamicPartitionScheduler() {
+        return this.dynamicPartitionScheduler;
+    }
+
     private static class SingletonHolder {
         private static final Catalog INSTANCE = new Catalog();
     }
@@ -491,6 +501,9 @@ public class Catalog {
         this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager);
 
         this.smallFileMgr = new SmallFileMgr();
+
+        this.dynamicPartitionScheduler = new DynamicPartitionScheduler("DynamicPartitionScheduler",
+                Config.dynamic_partition_check_interval_seconds * 1000L);
     }
 
     public static void destroyCheckpoint() {
@@ -1178,6 +1191,8 @@ public class Catalog {
         // start routine load scheduler
         routineLoadScheduler.start();
         routineLoadTaskScheduler.start();
+        // start dynamic partition task
+        dynamicPartitionScheduler.start();
     }
 
     // start threads that should running on all FE
@@ -3090,6 +3105,7 @@ public class Catalog {
     }
 
     public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause clause) throws DdlException {
+        DynamicPartitionUtil.checkAlterAllowed(olapTable);
         Preconditions.checkArgument(db.isWriteLockHeldByCurrentThread());
 
         String partitionName = clause.getPartitionName();
@@ -3387,6 +3403,9 @@ public class Catalog {
             }
             partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId);
         } else {
+            if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) {
+                throw new DdlException("Only support dynamic partition properties on range partition table");
+            }
             long partitionId = getNextId();
             // use table name as single partition name
             partitionNameToId.put(tableName, partitionId);
@@ -3556,6 +3575,8 @@ public class Catalog {
                     PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_HDD_DATA_PROPERTY);
                     PropertyAnalyzer.analyzeReplicationNum(properties, FeConstants.default_replication_num);
 
+                    DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties);
+
                     if (properties != null && !properties.isEmpty()) {
                         // here, all properties should be checked
                         throw new DdlException("Unknown properties: " + properties);
@@ -3588,7 +3609,7 @@ public class Catalog {
             if (!db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists())) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exists");
             }
-            
+
             // we have added these index to memory, only need to persist here
             if (getColocateTableIndex().isColocateTable(tableId)) {
                 GroupId groupId = getColocateTableIndex().getGroup(tableId);
@@ -3596,8 +3617,11 @@ public class Catalog {
                 ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
                 editLog.logColocateAddTable(info);
             }
-            
             LOG.info("successfully create table[{};{}]", tableName, tableId);
+            // register or remove table from DynamicPartition after table created
+            DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable);
+            dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
+                    tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
         } catch (DdlException e) {
             for (Long tabletId : tabletIdSet) {
                 Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
@@ -3879,6 +3903,11 @@ public class Catalog {
                 sb.append(colocateTable).append("\"");
             }
 
+            // 6. dynamic partition
+            if (olapTable.dynamicPartitionExists()) {
+                sb.append(olapTable.getTableProperty().getDynamicPartitionProperty().toString());
+            }
+
             sb.append("\n)");
         } else if (table.getType() == TableType.MYSQL) {
             MysqlTable mysqlTable = (MysqlTable) table;
@@ -4033,6 +4062,7 @@ public class Catalog {
                         }
                     }
                 } // end for partitions
+                DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable);
             }
         }
     }
@@ -5057,6 +5087,42 @@ public class Catalog {
         throw new DdlException("not implmented");
     }
 
+    public void modifyTableDynamicPartition(Database db, OlapTable table, Map<String, String> properties) throws DdlException {
+        TableProperty tableProperty = table.getTableProperty();
+        if (tableProperty == null) {
+            DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(table, properties);
+        } else {
+            Map<String, String> analyzedDynamicPartition = DynamicPartitionUtil.analyzeDynamicPartition(properties);
+            tableProperty.modifyTableProperties(analyzedDynamicPartition);
+        }
+
+        DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), table);
+        dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
+                table.getName(), DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
+        ModifyDynamicPartitionInfo info = new ModifyDynamicPartitionInfo(db.getId(), table.getId(), table.getTableProperty().getProperties());
+        editLog.logDynamicPartition(info);
+    }
+
+    public void replayModifyTableDynamicPartition(ModifyDynamicPartitionInfo info) {
+        long dbId = info.getDbId();
+        long tableId = info.getTableId();
+        Map<String, String> properties = info.getProperties();
+
+        Database db = getDb(dbId);
+        db.writeLock();
+        try {
+            OlapTable olapTable = (OlapTable) db.getTable(tableId);
+            TableProperty tableProperty = olapTable.getTableProperty();
+            if (tableProperty == null) {
+                olapTable.setTableProperty(new TableProperty(properties).buildDynamicProperty());
+            } else {
+                tableProperty.modifyTableProperties(properties);
+            }
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
     /*
      * used for handling AlterClusterStmt
      * (for client is the ALTER CLUSTER command).
@@ -6193,7 +6259,7 @@ public class Catalog {
                 throw new DdlException("Table " + tbl.getName() + " is not random distributed");
             }
             TableInfo tableInfo = TableInfo.createForModifyDistribution(db.getId(), tbl.getId());
-            editLog.logModifyDitrubutionType(tableInfo);
+            editLog.logModifyDistributionType(tableInfo);
             LOG.info("finished to modify distribution type of table: " + tbl.getName());
         } finally {
             db.writeUnlock();
diff --git a/fe/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java b/fe/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
new file mode 100644
index 0000000..7906cae
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import java.util.Map;
+
+public class DynamicPartitionProperty{
+    public static final String TIME_UNIT = "dynamic_partition.time_unit";
+    public static final String END = "dynamic_partition.end";
+    public static final String PREFIX = "dynamic_partition.prefix";
+    public static final String BUCKETS = "dynamic_partition.buckets";
+    public static final String ENABLE = "dynamic_partition.enable";
+
+    private boolean exist;
+
+    private boolean enable;
+    private String timeUnit;
+    private int end;
+    private String prefix;
+    private int buckets;
+
+    DynamicPartitionProperty(Map<String ,String> properties) {
+        if (properties != null && !properties.isEmpty()) {
+            this.exist = true;
+            this.enable = Boolean.parseBoolean(properties.get(ENABLE));
+            this.timeUnit = properties.get(TIME_UNIT);
+            this.end = Integer.parseInt(properties.get(END));
+            this.prefix = properties.get(PREFIX);
+            this.buckets = Integer.parseInt(properties.get(BUCKETS));
+        } else {
+            this.exist = false;
+        }
+    }
+
+    public boolean isExist() {
+        return exist;
+    }
+
+    public String getTimeUnit() {
+        return timeUnit;
+    }
+
+    public int getEnd() {
+        return end;
+    }
+
+    public String getPrefix() {
+        return prefix;
+    }
+
+    public int getBuckets() {
+        return buckets;
+    }
+
+    public boolean getEnable() {
+        return enable;
+    }
+
+    @Override
+    public String toString() {
+        return ",\n\"" + ENABLE + "\" = \"" + enable + "\"" +
+                ",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\"" +
+                ",\n\"" + END + "\" = \"" + end + "\"" +
+                ",\n\"" + PREFIX + "\" = \"" + prefix + "\"" +
+                ",\n\"" + BUCKETS + "\" = \"" + buckets + "\"";
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index ea2b3d9..459d023 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -123,6 +123,8 @@ public class OlapTable extends Table {
     // The init value is -1, which means there is not partition and index at all.
     private long baseIndexId = -1;
 
+    private TableProperty tableProperty;
+
     public OlapTable() {
         // for persist
         super(TableType.OLAP);
@@ -144,6 +146,8 @@ public class OlapTable extends Table {
         this.colocateGroup = null;
 
         this.indexes = null;
+      
+        this.tableProperty = null;
     }
 
     public OlapTable(long id, String tableName, List<Column> baseSchema, KeysType keysType,
@@ -177,11 +181,28 @@ public class OlapTable extends Table {
         this.bfFpp = 0;
 
         this.colocateGroup = null;
+        
         if (indexes == null) {
             this.indexes = null;
         } else {
             this.indexes = indexes;
         }
+
+        this.tableProperty = null;
+    }
+
+    public void setTableProperty(TableProperty tableProperty) {
+        this.tableProperty = tableProperty;
+    }
+
+    public TableProperty getTableProperty() {
+        return this.tableProperty;
+    }
+
+    public boolean dynamicPartitionExists() {
+        return tableProperty != null
+                && tableProperty.getDynamicPartitionProperty() != null
+                && tableProperty.getDynamicPartitionProperty().isExist();
     }
 
     public void setBaseIndexId(long baseIndexId) {
@@ -876,6 +897,14 @@ public class OlapTable extends Table {
         } else {
             out.writeBoolean(false);
         }
+      
+        //dynamicProperties
+        if (tableProperty == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            tableProperty.write(out);
+        }
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -976,6 +1005,12 @@ public class OlapTable extends Table {
                 this.indexes = TableIndexes.read(in);
             }
         }
+        // dynamic partition
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_71) {
+            if (in.readBoolean()) {
+                tableProperty = TableProperty.read(in);
+            }
+        }
     }
 
     public boolean equals(Table table) {
diff --git a/fe/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
new file mode 100644
index 0000000..ed9d966
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**  TableProperty contains additional information about OlapTable
+ *  TableProperty includes properties to persistent the additional information
+ *  Different properties is recognized by prefix such as dynamic_partition
+ *  If there is different type properties is added.Write a method such as buildDynamicProperty to build it.
+ */
+public class TableProperty implements Writable {
+    public static final String DYNAMIC_PARTITION_PROPERTY_PREFIX = "dynamic_partition";
+
+    @SerializedName(value = "properties")
+    private Map<String, String> properties;
+
+    private DynamicPartitionProperty dynamicPartitionProperty;
+
+    public TableProperty(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    public static boolean isSamePrefixProperties(Map<String, String> properties, String prefix) {
+        for (String value : properties.keySet()) {
+            if (!value.startsWith(prefix)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public TableProperty buildDynamicProperty() {
+        HashMap<String, String> dynamicPartitionProperties = new HashMap<>();
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (entry.getKey().startsWith(DYNAMIC_PARTITION_PROPERTY_PREFIX)) {
+                dynamicPartitionProperties.put(entry.getKey(), entry.getValue());
+            }
+        }
+        dynamicPartitionProperty = new DynamicPartitionProperty(dynamicPartitionProperties);
+        return this;
+    }
+
+    void modifyTableProperties(Map<String, String> modifyProperties) {
+        properties.putAll(modifyProperties);
+        buildDynamicProperty();
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public DynamicPartitionProperty getDynamicPartitionProperty() {
+        return dynamicPartitionProperty;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static TableProperty read(DataInput in) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(in), TableProperty.class).buildDynamicProperty();
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
new file mode 100644
index 0000000..19b87d3
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import org.apache.doris.analysis.AddPartitionClause;
+import org.apache.doris.analysis.DistributionDesc;
+import org.apache.doris.analysis.HashDistributionDesc;
+import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.analysis.SingleRangePartitionDesc;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DynamicPartitionProperty;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableProperty;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DynamicPartitionUtil;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to periodically add or drop partition on an olapTable which specify dynamic partition properties
+ * Config.dynamic_partition_enable determine whether this feature is enable, Config.dynamic_partition_check_interval_seconds
+ * determine how often the task is performed
+ */
+public class DynamicPartitionScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(DynamicPartitionScheduler.class);
+    public static final String LAST_SCHEDULER_TIME = "lastSchedulerTime";
+    public static final String LAST_UPDATE_TIME = "lastUpdateTime";
+    public static final String DYNAMIC_PARTITION_STATE = "dynamicPartitionState";
+    public static final String MSG = "msg";
+
+    private final String DEFAULT_RUNTIME_VALUE = "N/A";
+
+    private Map<String, Map<String, String>> runtimeInfos = Maps.newConcurrentMap();
+    private Set<Pair<Long, Long>> dynamicPartitionTableInfo = Sets.newConcurrentHashSet();
+    private boolean initialize;
+
+    public enum State {
+        NORMAL,
+        ERROR
+    }
+
+
+    public DynamicPartitionScheduler(String name, long intervalMs) {
+        super(name, intervalMs);
+        this.initialize = false;
+    }
+
+    public void registerDynamicPartitionTable(Long dbId, Long tableId) {
+        dynamicPartitionTableInfo.add(new Pair<>(dbId, tableId));
+    }
+
+    public void removeDynamicPartitionTable(Long dbId, Long tableId) {
+        dynamicPartitionTableInfo.remove(new Pair<>(dbId, tableId));
+    }
+
+    public String getRuntimeInfo(String tableName, String key) {
+        Map<String, String> tableRuntimeInfo = runtimeInfos.getOrDefault(tableName, createDefaultRuntimeInfo());
+        return tableRuntimeInfo.getOrDefault(key, DEFAULT_RUNTIME_VALUE);
+    }
+
+    public void removeRuntimeInfo(String tableName) {
+        runtimeInfos.remove(tableName);
+    }
+
+    public void createOrUpdateRuntimeInfo(String tableName, String key, String value) {
+        Map<String, String> runtimeInfo = runtimeInfos.get(tableName);
+        if (runtimeInfo == null) {
+            runtimeInfo = createDefaultRuntimeInfo();
+            runtimeInfo.put(key, value);
+            runtimeInfos.put(tableName, runtimeInfo);
+        } else {
+            runtimeInfo.put(key, value);
+        }
+    }
+
+    private Map<String, String> createDefaultRuntimeInfo() {
+        Map<String, String> defaultRuntimeInfo = Maps.newConcurrentMap();
+        defaultRuntimeInfo.put(LAST_UPDATE_TIME, DEFAULT_RUNTIME_VALUE);
+        defaultRuntimeInfo.put(LAST_SCHEDULER_TIME, DEFAULT_RUNTIME_VALUE);
+        defaultRuntimeInfo.put(DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
+        defaultRuntimeInfo.put(MSG, DEFAULT_RUNTIME_VALUE);
+        return defaultRuntimeInfo;
+    }
+
+    private void dynamicAddPartition() {
+        Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfo.iterator();
+        while (iterator.hasNext()) {
+            Pair<Long, Long> tableInfo = iterator.next();
+            Long dbId = tableInfo.first;
+            Long tableId = tableInfo.second;
+            Database db = Catalog.getInstance().getDb(dbId);
+            if (db == null) {
+                iterator.remove();
+                continue;
+            }
+            String tableName;
+            ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
+            db.readLock();
+            try {
+                // Only OlapTable has DynamicPartitionProperty
+                OlapTable olapTable = (OlapTable) db.getTable(tableId);
+                if (olapTable == null
+                        || !olapTable.dynamicPartitionExists()
+                        || !olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) {
+                    iterator.remove();
+                    continue;
+                }
+
+                if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
+                    String errorMsg = "Table[" + olapTable.getName() + "]'s state is not NORMAL."
+                            + "Do not allow doing dynamic add partition. table state=" + olapTable.getState();
+                    recordFailedMsg(olapTable.getName(), errorMsg);
+                    LOG.info(errorMsg);
+                    continue;
+                }
+
+                // Determine the partition column type
+                // if column type is Date, format partition name as yyyyMMdd
+                // if column type is DateTime, format partition name as yyyyMMddHHssmm
+                // scheduler time should be record even no partition added
+                createOrUpdateRuntimeInfo(olapTable.getName(), LAST_SCHEDULER_TIME, TimeUtils.getCurrentFormatTime());
+                RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
+                Column partitionColumn = rangePartitionInfo.getPartitionColumns().get(0);
+                String partitionFormat;
+                try {
+                    partitionFormat = DynamicPartitionUtil.getPartitionFormat(partitionColumn);
+                } catch (DdlException e) {
+                    recordFailedMsg(olapTable.getName(), e.getMessage());
+                    continue;
+                }
+
+                Calendar calendar = Calendar.getInstance();
+                TableProperty tableProperty = olapTable.getTableProperty();
+                DynamicPartitionProperty dynamicPartitionProperty = tableProperty.getDynamicPartitionProperty();
+
+                for (int i = 0; i <= dynamicPartitionProperty.getEnd(); i++) {
+                    String dynamicPartitionPrefix = dynamicPartitionProperty.getPrefix();
+                    String prevBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
+                            i, (Calendar) calendar.clone(), partitionFormat);
+                    String partitionName = dynamicPartitionPrefix + DynamicPartitionUtil.getFormattedPartitionName(prevBorder);
+
+                    // continue if partition already exists
+                    String nextBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
+                            i + 1, (Calendar) calendar.clone(), partitionFormat);
+                    PartitionValue lowerValue = new PartitionValue(prevBorder);
+                    PartitionValue upperValue = new PartitionValue(nextBorder);
+                    PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+                    RangePartitionInfo info = (RangePartitionInfo) (partitionInfo);
+                    boolean isPartitionExists = false;
+                    Range<PartitionKey> addPartitionKeyRange = null;
+                    try {
+                        PartitionKey lowerBound = PartitionKey.createPartitionKey(Collections.singletonList(lowerValue), Collections.singletonList(partitionColumn));
+                        PartitionKey upperBound = PartitionKey.createPartitionKey(Collections.singletonList(upperValue), Collections.singletonList(partitionColumn));
+                        addPartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
+                    } catch (AnalysisException e) {
+                        // keys.size is always equal to column.size, cannot reach this exception
+                        LOG.error("Keys size is not equl to column size.");
+                        continue;
+                    }
+                    for (Range<PartitionKey> partitionKeyRange : info.getIdToRange().values()) {
+                        // only support single column partition now
+                        try {
+                            RangePartitionInfo.checkRangeIntersect(partitionKeyRange, addPartitionKeyRange);
+                        } catch (DdlException e) {
+                            isPartitionExists = true;
+                            if (addPartitionKeyRange.equals(partitionKeyRange)) {
+                                clearFailedMsg(olapTable.getName());
+                            } else {
+                                recordFailedMsg(olapTable.getName(), e.getMessage());
+                            }
+                            break;
+                        }
+                    }
+                    if (isPartitionExists) {
+                        continue;
+                    }
+
+                    // construct partition desc
+                    PartitionKeyDesc partitionKeyDesc = new PartitionKeyDesc(Collections.singletonList(lowerValue), Collections.singletonList(upperValue));
+                    HashMap<String, String> partitionProperties = new HashMap<>(1);
+                    partitionProperties.put("replication_num", String.valueOf(DynamicPartitionUtil.estimateReplicateNum(olapTable)));
+                    SingleRangePartitionDesc rangePartitionDesc = new SingleRangePartitionDesc(true, partitionName,
+                            partitionKeyDesc, partitionProperties);
+
+                    // construct distribution desc
+                    HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) olapTable.getDefaultDistributionInfo();
+                    List<String> distColumnNames = new ArrayList<>();
+                    for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
+                        distColumnNames.add(distributionColumn.getName());
+                    }
+                    DistributionDesc distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames);
+
+                    // add partition according to partition desc and distribution desc
+                    addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null));
+                }
+                tableName = olapTable.getName();
+            } finally {
+                db.readUnlock();
+            }
+            for (AddPartitionClause addPartitionClause : addPartitionClauses) {
+                 try {
+                     Catalog.getCurrentCatalog().addPartition(db, tableName, addPartitionClause);
+                     clearFailedMsg(tableName);
+                 } catch (DdlException e) {
+                     recordFailedMsg(tableName, e.getMessage());
+                 }
+            }
+        }
+    }
+
+    private void recordFailedMsg(String tableName, String msg) {
+        LOG.warn("dynamic add partition failed: " + msg);
+        createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
+        createOrUpdateRuntimeInfo(tableName, MSG, msg);
+    }
+
+    private void clearFailedMsg(String tableName) {
+        createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
+        createOrUpdateRuntimeInfo(tableName, MSG, DEFAULT_RUNTIME_VALUE);
+    }
+
+    private void initDynamicPartitionTable() {
+        for (Long dbId : Catalog.getInstance().getDbIds()) {
+            Database db = Catalog.getInstance().getDb(dbId);
+            if (db == null) {
+                continue;
+            }
+            db.readLock();
+            try {
+                for (Table table : Catalog.getInstance().getDb(dbId).getTables()) {
+                    if (DynamicPartitionUtil.isDynamicPartitionTable(table)) {
+                        registerDynamicPartitionTable(db.getId(), table.getId());
+                    }
+                }
+            } finally {
+                db.readUnlock();
+            }
+        }
+        initialize = true;
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (!initialize) {
+            // check Dynamic Partition tables only when FE start
+            initDynamicPartitionTable();
+        }
+        if (Config.dynamic_partition_enable) {
+            dynamicAddPartition();
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java
index 753c5de..3aa4b95 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -945,5 +945,17 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true)
     public static boolean disable_cluster_feature = true;
+
+    /*
+     * Decide how often to check dynamic partition
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int dynamic_partition_check_interval_seconds = 600;
+
+    /*
+     * If set to true, dynamic partition feature will open
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean dynamic_partition_enable = false;
 }
 
diff --git a/fe/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/src/main/java/org/apache/doris/common/ErrorCode.java
index 9b2ab9f..21a7204 100644
--- a/fe/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -214,7 +214,25 @@ public enum ErrorCode {
             "Colocate tables distribution columns must have the same data type: %s should be %s"), 
     ERR_COLOCATE_NOT_COLOCATE_TABLE(5064, new byte[] { '4', '2', '0', '0', '0' },
             "Table %s is not a colocated table"),
-    ERR_INVALID_OPERATION(5065, new byte[] { '4', '2', '0', '0', '0' }, "Operation %s is invalid");
+    ERR_INVALID_OPERATION(5065, new byte[] { '4', '2', '0', '0', '0' }, "Operation %s is invalid"),
+    ERROR_DYNAMIC_PARTITION_TIME_UNIT(5065, new byte[] {'4', '2', '0', '0', '0'},
+            "Unsupported time unit %s. Expect DAY WEEK MONTH."),
+    ERROR_DYNAMIC_PARTITION_END_ZERO(5066, new byte[] {'4', '2', '0', '0', '0'},
+            "Dynamic partition end must greater than 0"),
+    ERROR_DYNAMIC_PARTITION_END_FORMAT(5066, new byte[] {'4', '2', '0', '0', '0'},
+            "Invalid dynamic partition end %s"),
+    ERROR_DYNAMIC_PARTITION_END_EMPTY(5066, new byte[] {'4', '2', '0', '0', '0'},
+            "Dynamic partition end is empty"),
+    ERROR_DYNAMIC_PARTITION_BUCKETS_ZERO(5067, new byte[] {'4', '2', '0', '0', '0'},
+            "Dynamic partition buckets must greater than 0"),
+    ERROR_DYNAMIC_PARTITION_BUCKETS_FORMAT(5067, new byte[] {'4', '2', '0', '0', '0'},
+            "Invalid dynamic partition buckets %s"),
+    ERROR_DYNAMIC_PARTITION_BUCKETS_EMPTY(5066, new byte[] {'4', '2', '0', '0', '0'},
+            "Dynamic partition buckets is empty"),
+    ERROR_DYNAMIC_PARTITION_ENABLE(5068, new byte[] {'4', '2', '0', '0', '0'},
+            "Invalid dynamic partition enable: %s. Expected true or false"),
+    ERROR_DYNAMIC_PARTITION_PREFIX(5069, new byte[] {'4', '2', '0', '0', '0'},
+            "Invalid dynamic partition prefix: %s.");
 
     ErrorCode(int code, byte[] sqlState, String errorMsg) {
         this.code = code;
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 834062e..2a9539c 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -147,10 +147,12 @@ public final class FeMetaVersion {
     public static final int VERSION_67 = 67;
     // for es table context
     public static final int VERSION_68 = 68;
-    // modofy password checking logic
+    // modify password checking logic
     public static final int VERSION_69 = 69;
     // for indexes
     public static final int VERSION_70 = 70;
+    // dynamic partition
+    public static final int VERSION_71 = 71;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_70;
+    public static final int VERSION_CURRENT = VERSION_71;
 }
diff --git a/fe/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
new file mode 100644
index 0000000..83f5097
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.common.util;
+
+import com.google.common.base.Strings;
+import org.apache.doris.analysis.TimestampArithmeticExpr.TimeUnit;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DynamicPartitionProperty;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableProperty;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeNameFormat;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DynamicPartitionUtil {
+    private static final String TIMESTAMP_FORMAT = "yyyyMMdd";
+    private static final String DATE_FORMAT = "yyyy-MM-dd";
+    private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+    public static void checkTimeUnit(String timeUnit) throws DdlException {
+        if (Strings.isNullOrEmpty(timeUnit)
+                || !(timeUnit.equalsIgnoreCase(TimeUnit.DAY.toString())
+                || timeUnit.equalsIgnoreCase(TimeUnit.WEEK.toString())
+                || timeUnit.equalsIgnoreCase(TimeUnit.MONTH.toString()))) {
+            ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_TIME_UNIT, timeUnit);
+        }
+    }
+
+    private static void checkPrefix(String prefix) throws DdlException {
+        try {
+            FeNameFormat.checkPartitionName(prefix);
+        } catch (AnalysisException e) {
+            ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_PREFIX, prefix);
+        }
+    }
+
+    private static void checkEnd(String end) throws DdlException {
+        if (Strings.isNullOrEmpty(end)) {
+            ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_END_EMPTY);
+        }
+        try {
+            if (Integer.parseInt(end) <= 0) {
+                ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_END_ZERO, end);
+            }
+        } catch (NumberFormatException e) {
+            ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_END_FORMAT, end);
+        }
+    }
+
+    private static void checkBuckets(String buckets) throws DdlException {
+        if (Strings.isNullOrEmpty(buckets)) {
+            ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_BUCKETS_EMPTY);
+        }
+        try {
+            if (Integer.parseInt(buckets) <= 0) {
+                ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_BUCKETS_ZERO, buckets);
+            }
+        } catch (NumberFormatException e) {
+            ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_BUCKETS_FORMAT, buckets);
+        }
+    }
+
+    private static void checkEnable(String enable) throws DdlException {
+        if (Strings.isNullOrEmpty(enable)
+                || (!Boolean.TRUE.toString().equalsIgnoreCase(enable) && !Boolean.FALSE.toString().equalsIgnoreCase(enable))) {
+            ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_ENABLE, enable);
+        }
+    }
+
+    public static boolean checkDynamicPartitionPropertiesExist(Map<String, String> properties) {
+        if (properties == null) {
+            return false;
+        }
+        return properties.containsKey(DynamicPartitionProperty.TIME_UNIT) ||
+                properties.containsKey(DynamicPartitionProperty.END) ||
+                properties.containsKey(DynamicPartitionProperty.PREFIX) ||
+                properties.containsKey(DynamicPartitionProperty.BUCKETS) ||
+                properties.containsKey(DynamicPartitionProperty.ENABLE);
+    }
+
+    public static boolean checkInputDynamicPartitionProperties(Map<String, String> properties, PartitionInfo partitionInfo) throws DdlException{
+        if (properties == null || properties.isEmpty()) {
+            return false;
+        }
+        if (partitionInfo.getType() != PartitionType.RANGE || partitionInfo.isMultiColumnPartition()) {
+            throw new DdlException("Dynamic partition only support single-column range partition");
+        }
+        String timeUnit = properties.get(DynamicPartitionProperty.TIME_UNIT);
+        String prefix = properties.get(DynamicPartitionProperty.PREFIX);
+        String end = properties.get(DynamicPartitionProperty.END);
+        String buckets = properties.get(DynamicPartitionProperty.BUCKETS);
+        String enable = properties.get(DynamicPartitionProperty.ENABLE);
+        if (!((Strings.isNullOrEmpty(enable) &&
+                Strings.isNullOrEmpty(timeUnit) &&
+                Strings.isNullOrEmpty(prefix) &&
+                Strings.isNullOrEmpty(end) &&
+                Strings.isNullOrEmpty(buckets)))) {
+            if (Strings.isNullOrEmpty(enable)) {
+                throw new DdlException("Must assign dynamic_partition.enable properties");
+            }
+            if (Strings.isNullOrEmpty(timeUnit)) {
+                throw new DdlException("Must assign dynamic_partition.time_unit properties");
+            }
+            if (Strings.isNullOrEmpty(prefix)) {
+                throw new DdlException("Must assign dynamic_partition.prefix properties");
+            }
+            if (Strings.isNullOrEmpty(end)) {
+                throw new DdlException("Must assign dynamic_partition.end properties");
+            }
+            if (Strings.isNullOrEmpty(buckets)) {
+                throw new DdlException("Must assign dynamic_partition.buckets properties");
+            }
+        }
+        return true;
+    }
+
+    public static void registerOrRemoveDynamicPartitionTable(long dbId, OlapTable olapTable) {
+        if (olapTable.getTableProperty() != null
+                && olapTable.getTableProperty().getDynamicPartitionProperty() != null) {
+            if (olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) {
+                Catalog.getCurrentCatalog().getDynamicPartitionScheduler().registerDynamicPartitionTable(dbId, olapTable.getId());
+            } else {
+                Catalog.getCurrentCatalog().getDynamicPartitionScheduler().removeDynamicPartitionTable(dbId, olapTable.getId());
+            }
+        }
+    }
+
+    public static Map<String, String> analyzeDynamicPartition(Map<String, String> properties) throws DdlException {
+        // properties should not be empty, check properties before call this function
+        Map<String, String> analyzedProperties = new HashMap<>();
+        if (properties.containsKey(DynamicPartitionProperty.TIME_UNIT)) {
+            String timeUnitValue = properties.get(DynamicPartitionProperty.TIME_UNIT);
+            checkTimeUnit(timeUnitValue);
+            properties.remove(DynamicPartitionProperty.TIME_UNIT);
+            analyzedProperties.put(DynamicPartitionProperty.TIME_UNIT, timeUnitValue);
+        }
+        if (properties.containsKey(DynamicPartitionProperty.PREFIX)) {
+            String prefixValue = properties.get(DynamicPartitionProperty.PREFIX);
+            checkPrefix(prefixValue);
+            properties.remove(DynamicPartitionProperty.PREFIX);
+            analyzedProperties.put(DynamicPartitionProperty.PREFIX, prefixValue);
+        }
+        if (properties.containsKey(DynamicPartitionProperty.END)) {
+            String endValue = properties.get(DynamicPartitionProperty.END);
+            checkEnd(endValue);
+            properties.remove(DynamicPartitionProperty.END);
+            analyzedProperties.put(DynamicPartitionProperty.END, endValue);
+        }
+        if (properties.containsKey(DynamicPartitionProperty.BUCKETS)) {
+            String bucketsValue = properties.get(DynamicPartitionProperty.BUCKETS);
+            checkBuckets(bucketsValue);
+            properties.remove(DynamicPartitionProperty.BUCKETS);
+            analyzedProperties.put(DynamicPartitionProperty.BUCKETS, bucketsValue);
+        }
+        if (properties.containsKey(DynamicPartitionProperty.ENABLE)) {
+            String enableValue = properties.get(DynamicPartitionProperty.ENABLE);
+            checkEnable(enableValue);
+            properties.remove(DynamicPartitionProperty.ENABLE);
+            analyzedProperties.put(DynamicPartitionProperty.ENABLE, enableValue);
+        }
+        return analyzedProperties;
+    }
+
+    public static void checkAlterAllowed(OlapTable olapTable) throws DdlException {
+        TableProperty tableProperty = olapTable.getTableProperty();
+        if (tableProperty != null &&
+                tableProperty.getDynamicPartitionProperty().isExist() &&
+                tableProperty.getDynamicPartitionProperty().getEnable()) {
+            throw new DdlException("Cannot modify partition on a Dynamic Partition Table, set `dynamic_partition.enable` to false firstly.");
+        }
+    }
+
+    public static boolean isDynamicPartitionTable(Table table) {
+        if (!(table instanceof OlapTable) ||
+                !(((OlapTable) table).getPartitionInfo().getType().equals(PartitionType.RANGE))) {
+            return false;
+        }
+        RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) ((OlapTable) table).getPartitionInfo();
+        TableProperty tableProperty = ((OlapTable) table).getTableProperty();
+        if (tableProperty == null || !tableProperty.getDynamicPartitionProperty().isExist()) {
+            return false;
+        }
+
+        return rangePartitionInfo.getPartitionColumns().size() == 1 && tableProperty.getDynamicPartitionProperty().getEnable();
+    }
+
+    /**
+     * properties should be checked before call this method
+     */
+    public static void checkAndSetDynamicPartitionProperty(OlapTable olapTable, Map<String, String> properties) throws DdlException {
+        if (DynamicPartitionUtil.checkInputDynamicPartitionProperties(properties, olapTable.getPartitionInfo())) {
+            Map<String, String> dynamicPartitionProperties = DynamicPartitionUtil.analyzeDynamicPartition(properties);
+            olapTable.setTableProperty(new TableProperty(dynamicPartitionProperties).buildDynamicProperty());
+        }
+    }
+
+    public static String getPartitionFormat(Column column) throws DdlException {
+        if (column.getDataType().equals(PrimitiveType.DATE)) {
+            return DATE_FORMAT;
+        } else if (column.getDataType().equals(PrimitiveType.DATETIME)) {
+            return DATETIME_FORMAT;
+        } else if (PrimitiveType.getIntegerTypes().contains(column.getDataType())) {
+            // TODO: For Integer Type, only support format it as yyyyMMdd now
+            return TIMESTAMP_FORMAT;
+        } else {
+            throw new DdlException("Dynamic Partition Only Support DATE, DATETIME and INTEGER Type Now.");
+        }
+    }
+
+    public static String getFormattedPartitionName(String name) {
+        return name.replace("-", "").replace(":", "").replace(" ", "");
+    }
+
+    public static String getPartitionRange(String timeUnit, int offset, Calendar calendar, String format) {
+        if (timeUnit.equalsIgnoreCase(TimeUnit.DAY.toString())) {
+            calendar.add(Calendar.DAY_OF_MONTH, offset);
+        } else if (timeUnit.equalsIgnoreCase(TimeUnit.WEEK.toString())) {
+            calendar.add(Calendar.WEEK_OF_MONTH, offset);
+        } else {
+            calendar.add(Calendar.MONTH, offset);
+        }
+        SimpleDateFormat dateFormat = new SimpleDateFormat(format);
+        return dateFormat.format(calendar.getTime());
+    }
+
+    public static int estimateReplicateNum(OlapTable table) {
+        int replicateNum = 3;
+        long maxPartitionId = 0;
+        for (Partition partition: table.getPartitions()) {
+            if (partition.getId() > maxPartitionId) {
+                maxPartitionId = partition.getId();
+                replicateNum = table.getPartitionInfo().getReplicationNum(partition.getId());
+            }
+        }
+        return replicateNum;
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index 4282468..ddffb73 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -54,6 +54,7 @@ import org.apache.doris.persist.DatabaseInfo;
 import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
 import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.ModifyDynamicPartitionInfo;
 import org.apache.doris.persist.HbPackage;
 import org.apache.doris.persist.ModifyPartitionInfo;
 import org.apache.doris.persist.OperationType;
@@ -497,6 +498,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_DYNAMIC_PARTITION: {
+                data = ModifyDynamicPartitionInfo.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 3d6c016..2e26de1 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -697,6 +697,11 @@ public class EditLog {
                     catalog.replayConvertDistributionType(tableInfo);
                     break;
                 }
+                case OperationType.OP_DYNAMIC_PARTITION: {
+                    ModifyDynamicPartitionInfo modifyDynamicPartitionInfo = (ModifyDynamicPartitionInfo) journal.getData();
+                    catalog.replayModifyTableDynamicPartition(modifyDynamicPartitionInfo);
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1195,7 +1200,11 @@ public class EditLog {
         logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
     }
 
-    public void logModifyDitrubutionType(TableInfo tableInfo) {
+    public void logModifyDistributionType(TableInfo tableInfo) {
         logEdit(OperationType.OP_MODIFY_DISTRIBUTION_TYPE, tableInfo);
     }
+
+    public void logDynamicPartition(ModifyDynamicPartitionInfo info) {
+        logEdit(OperationType.OP_DYNAMIC_PARTITION, info);
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/persist/ModifyDynamicPartitionInfo.java b/fe/src/main/java/org/apache/doris/persist/ModifyDynamicPartitionInfo.java
new file mode 100644
index 0000000..90051be
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/persist/ModifyDynamicPartitionInfo.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ModifyDynamicPartitionInfo implements Writable {
+
+    @SerializedName(value = "dbId")
+    private long dbId;
+    @SerializedName(value = "tableId")
+    private long tableId;
+    @SerializedName(value = "properties")
+    private Map<String, String> properties = new HashMap<>();
+
+    public ModifyDynamicPartitionInfo(long dbId, long tableId, Map<String, String> properties) {
+        this.dbId = dbId;
+        this.tableId = tableId;
+        this.properties = properties;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static ModifyDynamicPartitionInfo read(DataInput in) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(in), ModifyDynamicPartitionInfo.class);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index f6a8352..0e117a0 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -153,4 +153,7 @@ public class OperationType {
     // small files 251~260
     public static final short OP_CREATE_SMALL_FILE = 251;
     public static final short OP_DROP_SMALL_FILE = 252;
+
+    // dynamic partition 261~265
+    public static final short OP_DYNAMIC_PARTITION = 261;
 }
diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java
index bc2ce5c..3f5f3ec 100644
--- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -35,6 +35,7 @@ import org.apache.doris.analysis.ShowCreateTableStmt;
 import org.apache.doris.analysis.ShowDataStmt;
 import org.apache.doris.analysis.ShowDbStmt;
 import org.apache.doris.analysis.ShowDeleteStmt;
+import org.apache.doris.analysis.ShowDynamicPartitionStmt;
 import org.apache.doris.analysis.ShowEnginesStmt;
 import org.apache.doris.analysis.ShowExportStmt;
 import org.apache.doris.analysis.ShowFrontendsStmt;
@@ -69,6 +70,7 @@ import org.apache.doris.catalog.AggregateFunction;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DynamicPartitionProperty;
 import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -83,6 +85,7 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.View;
+import org.apache.doris.clone.DynamicPartitionScheduler;
 import org.apache.doris.cluster.BaseParam;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
@@ -238,6 +241,8 @@ public class ShowExecutor {
             handleAdminShowConfig();
         } else if (stmt instanceof ShowSmallFilesStmt) {
             handleShowSmallFiles();
+        } else if (stmt instanceof ShowDynamicPartitionStmt) {
+            handleShowDynamicPartition();
         } else if (stmt instanceof ShowIndexStmt) {
             handleShowIndex();
         } else {
@@ -1438,6 +1443,50 @@ public class ShowExecutor {
         resultSet = new ShowResultSet(showStmt.getMetaData(), results);
     }
 
+    private void handleShowDynamicPartition() {
+        ShowDynamicPartitionStmt showDynamicPartitionStmt = (ShowDynamicPartitionStmt) stmt;
+        List<List<String>> rows = Lists.newArrayList();
+        Database db = ctx.getCatalog().getDb(showDynamicPartitionStmt.getDb());
+        if (db != null) {
+            db.readLock();
+            try {
+                for (Table tbl : db.getTables()) {
+                    if (!(tbl instanceof OlapTable)) {
+                        continue;
+                    }
+
+                    DynamicPartitionScheduler dynamicPartitionScheduler = Catalog.getCurrentCatalog().getDynamicPartitionScheduler();
+                    OlapTable olapTable = (OlapTable) tbl;
+                    if (!olapTable.dynamicPartitionExists()) {
+                        dynamicPartitionScheduler.removeRuntimeInfo(olapTable.getName());
+                        continue;
+                    }
+                    // check tbl privs
+                    if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
+                            db.getFullName(), olapTable.getName(),
+                            PrivPredicate.SHOW)) {
+                        continue;
+                    }
+                    DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
+                    String tableName = olapTable.getName();
+                    rows.add(Lists.newArrayList(
+                            tableName,
+                            String.valueOf(dynamicPartitionProperty.getEnable()),
+                            dynamicPartitionProperty.getTimeUnit().toUpperCase(),
+                            String.valueOf(dynamicPartitionProperty.getEnd()),
+                            dynamicPartitionProperty.getPrefix(),
+                            String.valueOf(dynamicPartitionProperty.getBuckets()),
+                            dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME),
+                            dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.LAST_SCHEDULER_TIME),
+                            dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.DYNAMIC_PARTITION_STATE),
+                            dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.MSG)));
+                }
+            } finally {
+                db.readUnlock();
+            }
+            resultSet = new ShowResultSet(showDynamicPartitionStmt.getMetaData(), rows);
+        }
+    }
 }
 
 
diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex
index 56a9368..53322e7 100644
--- a/fe/src/main/jflex/sql_scanner.flex
+++ b/fe/src/main/jflex/sql_scanner.flex
@@ -160,6 +160,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("distinctpcsa", new Integer(SqlParserSymbols.KW_DISTINCTPCSA));
         keywordMap.put("distributed", new Integer(SqlParserSymbols.KW_DISTRIBUTED));
         keywordMap.put("distribution", new Integer(SqlParserSymbols.KW_DISTRIBUTION));
+        keywordMap.put("dynamic", new Integer(SqlParserSymbols.KW_DYNAMIC));
         keywordMap.put("buckets", new Integer(SqlParserSymbols.KW_BUCKETS));
         keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV));
         keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE));
diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index 15ce167..6d6705e 100644
--- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -27,11 +27,14 @@ import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.ColumnDef;
 import org.apache.doris.analysis.ColumnDef.DefaultValue;
 import org.apache.doris.analysis.ColumnPosition;
+import org.apache.doris.analysis.ModifyTablePropertiesClause;
 import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.backup.CatalogMocker;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DynamicPartitionProperty;
 import org.apache.doris.catalog.FakeCatalog;
 import org.apache.doris.catalog.FakeEditLog;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -46,6 +49,7 @@ import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.UserException;
@@ -58,10 +62,13 @@ import org.apache.doris.transaction.GlobalTransactionMgr;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -80,6 +87,9 @@ public class SchemaChangeJobV2Test {
             false, AggregateType.MAX, false, new DefaultValue(true, "1"), "");
     private static AddColumnClause addColumnClause = new AddColumnClause(newCol, new ColumnPosition("v"), null, null);
 
+    @Rule
+    public ExpectedException expectedEx = ExpectedException.none();
+
     @Before
     public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException,
             InvocationTargetException, NoSuchMethodException, SecurityException, AnalysisException {
@@ -161,7 +171,7 @@ public class SchemaChangeJobV2Test {
         Assert.assertEquals(2, testPartition.getMaterializedIndices(IndexExtState.ALL).size());
         Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size());
         Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.SHADOW).size());
-        
+
         // runWaitingTxnJob
         schemaChangeHandler.runAfterCatalogReady();
         Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState());
@@ -187,9 +197,90 @@ public class SchemaChangeJobV2Test {
                 shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(), shadowReplica.getRowCount());
             }
         }
-        
+
         schemaChangeHandler.runAfterCatalogReady();
         Assert.assertEquals(JobState.FINISHED, schemaChangeJob.getJobState());
     }
 
+    @Test
+    public void testModifyDynamicPartitionNormal() throws UserException {
+        FakeCatalog.setCatalog(masterCatalog);
+        SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler();
+        ArrayList<AlterClause> alterClauses = new ArrayList<>();
+        Map<String, String> properties = new HashMap<>();
+        properties.put(DynamicPartitionProperty.ENABLE, "true");
+        properties.put(DynamicPartitionProperty.TIME_UNIT, "day");
+        properties.put(DynamicPartitionProperty.END, "3");
+        properties.put(DynamicPartitionProperty.PREFIX, "p");
+        properties.put(DynamicPartitionProperty.BUCKETS, "30");
+        alterClauses.add(new ModifyTablePropertiesClause(properties));
+        Database db = CatalogMocker.mockDb();
+        OlapTable olapTable = (OlapTable) db.getTable(CatalogMocker.TEST_TBL2_ID);
+        schemaChangeHandler.process(alterClauses, "default_cluster", db, olapTable);
+        Assert.assertTrue(olapTable.getTableProperty().getDynamicPartitionProperty().isExist());
+        Assert.assertTrue(olapTable.getTableProperty().getDynamicPartitionProperty().getEnable());
+        Assert.assertEquals("day", olapTable.getTableProperty().getDynamicPartitionProperty().getTimeUnit());
+        Assert.assertEquals(3, olapTable.getTableProperty().getDynamicPartitionProperty().getEnd());
+        Assert.assertEquals("p", olapTable.getTableProperty().getDynamicPartitionProperty().getPrefix());
+        Assert.assertEquals(30, olapTable.getTableProperty().getDynamicPartitionProperty().getBuckets());
+
+
+        // set dynamic_partition.enable = false
+        ArrayList<AlterClause> tmpAlterClauses = new ArrayList<>();
+        properties.put(DynamicPartitionProperty.ENABLE, "false");
+        tmpAlterClauses.add(new ModifyTablePropertiesClause(properties));
+        schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable);
+        Assert.assertFalse(olapTable.getTableProperty().getDynamicPartitionProperty().getEnable());
+        // set dynamic_partition.time_unit = week
+        tmpAlterClauses = new ArrayList<>();
+        properties.put(DynamicPartitionProperty.TIME_UNIT, "week");
+        tmpAlterClauses.add(new ModifyTablePropertiesClause(properties));
+        schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable);
+        Assert.assertEquals("week", olapTable.getTableProperty().getDynamicPartitionProperty().getTimeUnit());
+        // set dynamic_partition.end = 10
+        tmpAlterClauses = new ArrayList<>();
+        properties.put(DynamicPartitionProperty.END, "10");
+        tmpAlterClauses.add(new ModifyTablePropertiesClause(properties));
+        schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable);
+        Assert.assertEquals(10, olapTable.getTableProperty().getDynamicPartitionProperty().getEnd());
+        // set dynamic_partition.prefix = p1
+        tmpAlterClauses = new ArrayList<>();
+        properties.put(DynamicPartitionProperty.PREFIX, "p1");
+        tmpAlterClauses.add(new ModifyTablePropertiesClause(properties));
+        schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable);
+        Assert.assertEquals("p1", olapTable.getTableProperty().getDynamicPartitionProperty().getPrefix());
+        // set dynamic_partition.buckets = 3
+        tmpAlterClauses = new ArrayList<>();
+        properties.put(DynamicPartitionProperty.BUCKETS, "3");
+        tmpAlterClauses.add(new ModifyTablePropertiesClause(properties));
+        schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable);
+        Assert.assertEquals(3, olapTable.getTableProperty().getDynamicPartitionProperty().getBuckets());
+    }
+
+    public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue, String missPropertyKey)
+            throws UserException {
+        FakeCatalog.setCatalog(masterCatalog);
+        SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler();
+        ArrayList<AlterClause> alterClauses = new ArrayList<>();
+        Map<String, String> properties = new HashMap<>();
+        properties.put(propertyKey, propertyValue);
+        alterClauses.add(new ModifyTablePropertiesClause(properties));
+
+        Database db = CatalogMocker.mockDb();
+        OlapTable olapTable = (OlapTable) db.getTable(CatalogMocker.TEST_TBL2_ID);
+
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage(String.format("Must assign %s properties", missPropertyKey));
+
+        schemaChangeHandler.process(alterClauses, "default_cluster", db, olapTable);
+    }
+
+    @Test
+    public void testModifyDynamicPartitionWithoutTableProperty() throws UserException {
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false", DynamicPartitionProperty.TIME_UNIT);
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day", DynamicPartitionProperty.ENABLE);
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3", DynamicPartitionProperty.ENABLE);
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE);
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE);
+    }
 }
diff --git a/fe/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
new file mode 100644
index 0000000..9f98a41
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
@@ -0,0 +1,482 @@
+package org.apache.doris.catalog;
+
+import com.google.common.collect.Lists;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.HashDistributionDesc;
+import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.analysis.RangePartitionDesc;
+import org.apache.doris.analysis.SingleRangePartitionDesc;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentBatchTask;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class DynamicPartitionTableTest {
+    private TableName dbTableName;
+    private String dbName = "testDb";
+    private String tableName = "testTable";
+    private String clusterName = "default";
+    private List<Long> beIds = Lists.newArrayList();
+    private List<String> columnNames = Lists.newArrayList();
+    private List<ColumnDef> columnDefs = Lists.newArrayList();
+
+    private Catalog catalog = Catalog.getInstance();
+    private Database db = new Database();
+    private Analyzer analyzer;
+
+    private Map<String, String> properties;
+    private List<SingleRangePartitionDesc> singleRangePartitionDescs;
+
+    @Injectable
+    ConnectContext connectContext;
+
+    @Rule
+    public ExpectedException expectedEx = ExpectedException.none();
+
+    @Before
+    public void setUp() throws Exception {
+        dbTableName = new TableName(dbName, tableName);
+
+        beIds.add(1L);
+        beIds.add(2L);
+        beIds.add(3L);
+
+        columnNames.add("key1");
+        columnNames.add("key2");
+        columnNames.add("key3");
+
+        columnDefs.add(new ColumnDef("key1", new TypeDef(ScalarType.createType(PrimitiveType.INT))));
+        columnDefs.add(new ColumnDef("key2", new TypeDef(ScalarType.createType(PrimitiveType.INT))));
+        columnDefs.add(new ColumnDef("key3", new TypeDef(ScalarType.createVarchar(10))));
+
+        analyzer = new Analyzer(catalog, connectContext);
+
+        properties = new HashMap<>();
+        properties.put(DynamicPartitionProperty.ENABLE, "true");
+        properties.put(DynamicPartitionProperty.PREFIX, "p");
+        properties.put(DynamicPartitionProperty.TIME_UNIT, "day");
+        properties.put(DynamicPartitionProperty.END, "3");
+        properties.put(DynamicPartitionProperty.BUCKETS, "30");
+
+        singleRangePartitionDescs = new LinkedList<>();
+        singleRangePartitionDescs.add(new SingleRangePartitionDesc(false, "p1",
+                new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("-128"))), null));
+
+        new MockUp<AgentBatchTask>() {
+            @Mock
+            void run() {
+                return;
+            }
+        };
+
+        new MockUp<CountDownLatch>() {
+            @Mock
+            boolean await(long timeout, TimeUnit unit) {
+                return true;
+            }
+        };
+
+        new Expectations(analyzer, catalog) {{
+            analyzer.getClusterName();
+            minTimes = 0;
+            result = clusterName;
+        }};
+
+        dbTableName.analyze(analyzer);
+    }
+
+    @Test
+    public void testNormal(@Injectable SystemInfoService systemInfoService,
+                           @Injectable PaloAuth paloAuth,
+                           @Injectable EditLog editLog) throws UserException {
+        new Expectations(catalog) {
+            {
+                catalog.getDb(dbTableName.getDb());
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.checkClusterCapacity(anyString);
+                minTimes = 0;
+                systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
+                minTimes = 0;
+                result = beIds;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+
+        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
+                new KeysDesc(KeysType.AGG_KEYS, columnNames),
+                new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs),
+                new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
+        stmt.analyze(analyzer);
+
+        catalog.createTable(stmt);
+    }
+
+    @Test
+    public void testMissEnable(@Injectable SystemInfoService systemInfoService,
+                               @Injectable PaloAuth paloAuth,
+                               @Injectable EditLog editLog) throws UserException {
+        new Expectations(catalog) {
+            {
+                catalog.getDb(dbTableName.getDb());
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.checkClusterCapacity(anyString);
+                minTimes = 0;
+                systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
+                minTimes = 0;
+                result = beIds;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+
+        properties.remove(DynamicPartitionProperty.ENABLE);
+
+        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
+                new KeysDesc(KeysType.AGG_KEYS, columnNames),
+                new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs),
+                new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
+        stmt.analyze(analyzer);
+
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("Must assign dynamic_partition.enable properties");
+
+        catalog.createTable(stmt);
+    }
+
+    @Test
+    public void testMissPrefix(@Injectable SystemInfoService systemInfoService,
+                               @Injectable PaloAuth paloAuth,
+                               @Injectable EditLog editLog) throws UserException {
+        new Expectations(catalog) {
+            {
+                catalog.getDb(dbTableName.getDb());
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.checkClusterCapacity(anyString);
+                minTimes = 0;
+                systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
+                minTimes = 0;
+                result = beIds;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+
+        properties.remove(DynamicPartitionProperty.PREFIX);
+
+        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
+                new KeysDesc(KeysType.AGG_KEYS, columnNames),
+                new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs),
+                new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
+        stmt.analyze(analyzer);
+
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("Must assign dynamic_partition.prefix properties");
+
+        catalog.createTable(stmt);
+    }
+
+    @Test
+    public void testMissTimeUnit(@Injectable SystemInfoService systemInfoService,
+                                 @Injectable PaloAuth paloAuth,
+                                 @Injectable EditLog editLog) throws UserException {
+        new Expectations(catalog) {
+            {
+                catalog.getDb(dbTableName.getDb());
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.checkClusterCapacity(anyString);
+                minTimes = 0;
+                systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
+                minTimes = 0;
+                result = beIds;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+
+        properties.remove(DynamicPartitionProperty.TIME_UNIT);
+
+        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
+                new KeysDesc(KeysType.AGG_KEYS, columnNames),
+                new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs),
+                new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
+        stmt.analyze(analyzer);
+
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("Must assign dynamic_partition.time_unit properties");
+
+        catalog.createTable(stmt);
+    }
+
+    @Test
+    public void testMissEnd(@Injectable SystemInfoService systemInfoService,
+                            @Injectable PaloAuth paloAuth,
+                            @Injectable EditLog editLog) throws UserException {
+        new Expectations(catalog) {
+            {
+                catalog.getDb(dbTableName.getDb());
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.checkClusterCapacity(anyString);
+                minTimes = 0;
+                systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
+                minTimes = 0;
+                result = beIds;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+
+        properties.remove(DynamicPartitionProperty.END);
+
+        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
+                new KeysDesc(KeysType.AGG_KEYS, columnNames),
+                new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs),
+                new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
+        stmt.analyze(analyzer);
+
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("Must assign dynamic_partition.end properties");
+
+        catalog.createTable(stmt);
+    }
+
+    @Test
+    public void testMissBuckets(@Injectable SystemInfoService systemInfoService,
+                                @Injectable PaloAuth paloAuth,
+                                @Injectable EditLog editLog) throws UserException {
+        new Expectations(catalog) {
+            {
+                catalog.getDb(dbTableName.getDb());
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.checkClusterCapacity(anyString);
+                minTimes = 0;
+                systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
+                minTimes = 0;
+                result = beIds;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+
+        properties.remove(DynamicPartitionProperty.BUCKETS);
+
+        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
+                new KeysDesc(KeysType.AGG_KEYS, columnNames),
+                new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs),
+                new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
+        stmt.analyze(analyzer);
+
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("Must assign dynamic_partition.buckets properties");
+
+        catalog.createTable(stmt);
+    }
+
+    @Test
+    public void testNotAllowed(@Injectable SystemInfoService systemInfoService,
+                               @Injectable PaloAuth paloAuth,
+                               @Injectable EditLog editLog) throws UserException {
+        new Expectations(catalog) {
+            {
+                catalog.getDb(dbTableName.getDb());
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.checkClusterCapacity(anyString);
+                minTimes = 0;
+                systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
+                minTimes = 0;
+                result = beIds;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+
+        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
+                new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
+                new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
+        stmt.analyze(analyzer);
+
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("Only support dynamic partition properties on range partition table");
+
+        catalog.createTable(stmt);
+    }
+
+    @Test
+    public void testNotAllowedInMultiPartitions(@Injectable SystemInfoService systemInfoService,
+                                                @Injectable PaloAuth paloAuth,
+                                                @Injectable EditLog editLog) throws UserException {
+        new Expectations(catalog) {
+            {
+                catalog.getDb(dbTableName.getDb());
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                systemInfoService.checkClusterCapacity(anyString);
+                minTimes = 0;
+                systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
+                minTimes = 0;
+                result = beIds;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+            }
+        };
+
+        List<SingleRangePartitionDesc> rangePartitionDescs = new LinkedList<>();
+        rangePartitionDescs.add(new SingleRangePartitionDesc(false, "p1",
+                new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("-128"), new PartitionValue("100"))), null));
+
+        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
+                new KeysDesc(KeysType.AGG_KEYS, columnNames),
+                new RangePartitionDesc(Lists.newArrayList("key1", "key2"), singleRangePartitionDescs),
+                new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
+        stmt.analyze(analyzer);
+
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("Dynamic partition only support single-column range partition");
+
+        catalog.createTable(stmt);
+    }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index 4d45afc..3c179f1 100644
--- a/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -22,6 +22,7 @@ import org.apache.doris.alter.RollupJob;
 import org.apache.doris.alter.SchemaChangeJob;
 import org.apache.doris.cluster.Cluster;
 import org.apache.doris.persist.EditLog;
+import org.apache.doris.persist.ModifyDynamicPartitionInfo;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.transaction.TransactionState;
 
@@ -97,6 +98,11 @@ public class FakeEditLog extends MockUp<EditLog> {
 
     }
 
+    @Mock
+    public void logDynamicPartition(ModifyDynamicPartitionInfo info) {
+
+    }
+
     public TransactionState getTransaction(long transactionId) {
         return allTransactionState.get(transactionId);
     }
diff --git a/fe/src/test/java/org/apache/doris/catalog/TablePropertyTest.java b/fe/src/test/java/org/apache/doris/catalog/TablePropertyTest.java
new file mode 100644
index 0000000..ca69bd5
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/catalog/TablePropertyTest.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+public class TablePropertyTest {
+    private static String fileName = "./TablePropertyTest";
+
+    @After
+    public void tearDown() {
+        File file = new File(fileName);
+        file.delete();
+    }
+
+    @Test
+    public void testNormal() throws IOException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        HashMap<String, String> properties = new HashMap<>();
+        properties.put(DynamicPartitionProperty.ENABLE, "true");
+        properties.put(DynamicPartitionProperty.TIME_UNIT, "day");
+        properties.put(DynamicPartitionProperty.END, "3");
+        properties.put(DynamicPartitionProperty.PREFIX, "p");
+        properties.put(DynamicPartitionProperty.BUCKETS, "30");
+        properties.put("otherProperty", "unknownProperty");
+        TableProperty tableProperty = new TableProperty(properties);
+        tableProperty.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+        TableProperty readTableProperty = TableProperty.read(in);
+        DynamicPartitionProperty readDynamicPartitionProperty = readTableProperty.getDynamicPartitionProperty();
+        DynamicPartitionProperty dynamicPartitionProperty = new DynamicPartitionProperty(properties);
+        Assert.assertEquals(readTableProperty.getProperties(), properties);
+        Assert.assertEquals(readDynamicPartitionProperty.getEnable(), dynamicPartitionProperty.getEnable());
+        Assert.assertEquals(readDynamicPartitionProperty.getBuckets(), dynamicPartitionProperty.getBuckets());
+        Assert.assertEquals(readDynamicPartitionProperty.getPrefix(), dynamicPartitionProperty.getPrefix());
+        Assert.assertEquals(readDynamicPartitionProperty.getEnd(), dynamicPartitionProperty.getEnd());
+        Assert.assertEquals(readDynamicPartitionProperty.getTimeUnit(), dynamicPartitionProperty.getTimeUnit());
+        in.close();
+    }
+}
diff --git a/fe/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java b/fe/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
new file mode 100644
index 0000000..27406e2
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.catalog.DynamicPartitionProperty;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+public class ModifyDynamicPartitionInfoTest {
+    private String fileName = "./ModifyDynamicPartitionInfoTest";
+
+    @After
+    public void tearDown() {
+        File file = new File(fileName);
+        file.delete();
+    }
+
+    @Test
+    public void testNormal() throws IOException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        HashMap<String, String> properties = new HashMap<>();
+        properties.put(DynamicPartitionProperty.ENABLE, "true");
+        properties.put(DynamicPartitionProperty.TIME_UNIT, "day");
+        properties.put(DynamicPartitionProperty.END, "3");
+        properties.put(DynamicPartitionProperty.PREFIX, "p");
+        properties.put(DynamicPartitionProperty.BUCKETS, "30");
+        ModifyDynamicPartitionInfo modifyDynamicPartitionInfo = new ModifyDynamicPartitionInfo(100L, 200L, properties);
+        modifyDynamicPartitionInfo.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+        ModifyDynamicPartitionInfo readModifyDynamicPartitionInfo = ModifyDynamicPartitionInfo.read(in);
+        Assert.assertEquals(readModifyDynamicPartitionInfo.getDbId(), 100L);
+        Assert.assertEquals(readModifyDynamicPartitionInfo.getTableId(), 200L);
+        Assert.assertEquals(readModifyDynamicPartitionInfo.getProperties(), properties);
+        in.close();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org