You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/08 14:02:28 UTC

[incubator-doris] branch master updated: [feature-wip](multi-catalog)(step2) Introduce Internal Data Source (#9953)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f56e17ef2 [feature-wip](multi-catalog)(step2) Introduce Internal Data Source (#9953)
5f56e17ef2 is described below

commit 5f56e17ef215e932990da880182d8ad421bc62c1
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Jun 8 22:02:22 2022 +0800

    [feature-wip](multi-catalog)(step2) Introduce Internal Data Source (#9953)
---
 .../java/org/apache/doris/catalog/Catalog.java     | 2784 +----------------
 .../java/org/apache/doris/catalog/Database.java    |    4 +-
 .../java/org/apache/doris/catalog/DatabaseIf.java  |   17 +-
 .../main/java/org/apache/doris/common/Config.java  |    7 +
 .../java/org/apache/doris/common/MetaReader.java   |    2 +-
 .../org/apache/doris/datasource/DataSourceIf.java  |   31 +-
 .../org/apache/doris/datasource/DataSourceMgr.java |  116 +-
 .../doris/datasource/DataSourceMgrProperty.java    |   48 +
 .../doris/datasource/DataSourceProperty.java       |   45 +
 .../doris/datasource/EsExternalDataSource.java     |   10 -
 .../doris/datasource/ExternalDataSource.java       |   96 +-
 .../doris/datasource/HMSExternalDataSource.java    |   12 -
 .../doris/datasource/InternalDataSource.java       | 3157 +++++++++++++++++++-
 .../org/apache/doris/httpv2/rest/ShowAction.java   |    9 +-
 .../java/org/apache/doris/catalog/FakeEditLog.java |    1 -
 15 files changed, 3514 insertions(+), 2825 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 94c42a4786..a42add1079 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -20,19 +20,16 @@ package org.apache.doris.catalog;
 import org.apache.doris.alter.Alter;
 import org.apache.doris.alter.AlterJobV2;
 import org.apache.doris.alter.AlterJobV2.JobType;
-import org.apache.doris.alter.DecommissionType;
 import org.apache.doris.alter.MaterializedViewHandler;
 import org.apache.doris.alter.SchemaChangeHandler;
 import org.apache.doris.alter.SystemHandler;
 import org.apache.doris.analysis.AddPartitionClause;
-import org.apache.doris.analysis.AddRollupClause;
 import org.apache.doris.analysis.AdminCheckTabletsStmt;
 import org.apache.doris.analysis.AdminCheckTabletsStmt.CheckType;
 import org.apache.doris.analysis.AdminCleanTrashStmt;
 import org.apache.doris.analysis.AdminCompactTableStmt;
 import org.apache.doris.analysis.AdminSetConfigStmt;
 import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
-import org.apache.doris.analysis.AlterClause;
 import org.apache.doris.analysis.AlterClusterStmt;
 import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
 import org.apache.doris.analysis.AlterDatabaseQuotaStmt.QuotaType;
@@ -40,13 +37,10 @@ import org.apache.doris.analysis.AlterDatabaseRename;
 import org.apache.doris.analysis.AlterSystemStmt;
 import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.AlterViewStmt;
-import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BackupStmt;
 import org.apache.doris.analysis.CancelAlterSystemStmt;
 import org.apache.doris.analysis.CancelAlterTableStmt;
 import org.apache.doris.analysis.CancelBackupStmt;
-import org.apache.doris.analysis.ColumnDef;
-import org.apache.doris.analysis.ColumnDef.DefaultValue;
 import org.apache.doris.analysis.ColumnRenameClause;
 import org.apache.doris.analysis.CreateClusterStmt;
 import org.apache.doris.analysis.CreateDbStmt;
@@ -55,11 +49,8 @@ import org.apache.doris.analysis.CreateMaterializedViewStmt;
 import org.apache.doris.analysis.CreateTableAsSelectStmt;
 import org.apache.doris.analysis.CreateTableLikeStmt;
 import org.apache.doris.analysis.CreateTableStmt;
-import org.apache.doris.analysis.CreateUserStmt;
 import org.apache.doris.analysis.CreateViewStmt;
-import org.apache.doris.analysis.DataSortInfo;
 import org.apache.doris.analysis.DdlStmt;
-import org.apache.doris.analysis.DecommissionBackendClause;
 import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.DropClusterStmt;
 import org.apache.doris.analysis.DropDbStmt;
@@ -67,17 +58,12 @@ import org.apache.doris.analysis.DropFunctionStmt;
 import org.apache.doris.analysis.DropMaterializedViewStmt;
 import org.apache.doris.analysis.DropPartitionClause;
 import org.apache.doris.analysis.DropTableStmt;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.FunctionName;
-import org.apache.doris.analysis.HashDistributionDesc;
 import org.apache.doris.analysis.InstallPluginStmt;
-import org.apache.doris.analysis.KeysDesc;
 import org.apache.doris.analysis.LinkDbStmt;
 import org.apache.doris.analysis.MigrateDbStmt;
 import org.apache.doris.analysis.ModifyDistributionClause;
-import org.apache.doris.analysis.PartitionDesc;
 import org.apache.doris.analysis.PartitionRenameClause;
-import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.RecoverDbStmt;
 import org.apache.doris.analysis.RecoverPartitionStmt;
 import org.apache.doris.analysis.RecoverTableStmt;
@@ -85,24 +71,15 @@ import org.apache.doris.analysis.ReplacePartitionClause;
 import org.apache.doris.analysis.RestoreStmt;
 import org.apache.doris.analysis.RollupRenameClause;
 import org.apache.doris.analysis.ShowAlterStmt.AlterType;
-import org.apache.doris.analysis.SinglePartitionDesc;
-import org.apache.doris.analysis.TableName;
-import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TableRenameClause;
 import org.apache.doris.analysis.TruncateTableStmt;
-import org.apache.doris.analysis.TypeDef;
 import org.apache.doris.analysis.UninstallPluginStmt;
-import org.apache.doris.analysis.UserDesc;
-import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.backup.BackupHandler;
 import org.apache.doris.blockrule.SqlBlockRuleMgr;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
-import org.apache.doris.catalog.Database.DbState;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
-import org.apache.doris.catalog.MaterializedIndex.IndexState;
 import org.apache.doris.catalog.OlapTable.OlapTableState;
-import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.Replica.ReplicaStatus;
 import org.apache.doris.catalog.Table.TableType;
 import org.apache.doris.clone.ColocateTableCheckerAndBalancer;
@@ -112,7 +89,6 @@ import org.apache.doris.clone.TabletScheduler;
 import org.apache.doris.clone.TabletSchedulerStat;
 import org.apache.doris.cluster.BaseParam;
 import org.apache.doris.cluster.Cluster;
-import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
@@ -122,8 +98,6 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
-import org.apache.doris.common.FeNameFormat;
-import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaHeader;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.MetaReader;
@@ -141,19 +115,17 @@ import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.QueryableReentrantLock;
 import org.apache.doris.common.util.SmallFileMgr;
-import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.consistency.ConsistencyChecker;
+import org.apache.doris.datasource.DataSourceMgr;
+import org.apache.doris.datasource.InternalDataSource;
 import org.apache.doris.deploy.DeployManager;
 import org.apache.doris.deploy.impl.AmbariDeployManager;
 import org.apache.doris.deploy.impl.K8sDeployManager;
 import org.apache.doris.deploy.impl.LocalFileDeployManager;
 import org.apache.doris.external.elasticsearch.EsRepository;
-import org.apache.doris.external.hudi.HudiProperty;
 import org.apache.doris.external.hudi.HudiTable;
-import org.apache.doris.external.hudi.HudiUtils;
-import org.apache.doris.external.iceberg.IcebergCatalogMgr;
 import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
 import org.apache.doris.ha.BDBHA;
 import org.apache.doris.ha.FrontendNodeType;
@@ -194,10 +166,7 @@ import org.apache.doris.persist.BackendIdsUpdateInfo;
 import org.apache.doris.persist.BackendReplicasInfo;
 import org.apache.doris.persist.BackendTabletsInfo;
 import org.apache.doris.persist.ClusterInfo;
-import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.persist.DatabaseInfo;
-import org.apache.doris.persist.DropDbInfo;
-import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
 import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.EditLog;
@@ -233,25 +202,18 @@ import org.apache.doris.statistics.StatisticsJobScheduler;
 import org.apache.doris.statistics.StatisticsManager;
 import org.apache.doris.statistics.StatisticsTaskScheduler;
 import org.apache.doris.system.Backend;
-import org.apache.doris.system.Backend.BackendState;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.system.HeartbeatMgr;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
-import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.CompactionTask;
-import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
-import org.apache.doris.thrift.TStorageType;
-import org.apache.doris.thrift.TTabletType;
-import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.PublishVersionDaemon;
@@ -264,13 +226,11 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
 import com.sleepycat.je.rep.InsufficientLogException;
 import com.sleepycat.je.rep.NetworkRestore;
 import com.sleepycat.je.rep.NetworkRestoreConfig;
+import lombok.Setter;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -287,7 +247,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -327,16 +286,10 @@ public class Catalog {
     // Using QueryableReentrantLock to print owner thread in debug mode.
     private QueryableReentrantLock lock;
 
-    private ConcurrentHashMap<Long, Database> idToDb;
-    private ConcurrentHashMap<String, Database> fullNameToDb;
-
-    private ConcurrentHashMap<Long, Cluster> idToCluster;
-    private ConcurrentHashMap<String, Cluster> nameToCluster;
-
+    private DataSourceMgr dataSourceMgr;
     private Load load;
     private LoadManager loadManager;
     private StreamLoadRecordMgr streamLoadRecordMgr;
-    private IcebergTableCreationRecordMgr icebergTableCreationRecordMgr;
     private RoutineLoadManager routineLoadManager;
     private SqlBlockRuleMgr sqlBlockRuleMgr;
     private ExportMgr exportMgr;
@@ -355,7 +308,6 @@ public class Catalog {
     private Daemon replayer;
     private Daemon timePrinter;
     private Daemon listener;
-    private EsRepository esRepository;  // it is a daemon, so add it here
 
     private boolean isFirstTimeStartUp = false;
     private boolean isElectable;
@@ -369,6 +321,7 @@ public class Catalog {
     private BlockingQueue<FrontendNodeType> typeTransferQueue;
 
     // false if default_cluster is not created.
+    @Setter
     private boolean isDefaultClusterCreated = false;
 
     // node name is used for bdbje NodeName.
@@ -522,6 +475,14 @@ public class Catalog {
         return this.dynamicPartitionScheduler;
     }
 
+    public DataSourceMgr getDataSourceMgr() {
+        return dataSourceMgr;
+    }
+
+    public InternalDataSource getInternalDataSource() {
+        return dataSourceMgr.getInternalDataSource();
+    }
+
     private static class SingletonHolder {
         private static final Catalog INSTANCE = new Catalog();
     }
@@ -532,8 +493,7 @@ public class Catalog {
 
     // if isCheckpointCatalog is true, it means that we should not collect thread pool metric
     private Catalog(boolean isCheckpointCatalog) {
-        this.idToDb = new ConcurrentHashMap<>();
-        this.fullNameToDb = new ConcurrentHashMap<>();
+        this.dataSourceMgr = new DataSourceMgr();
         this.load = new Load();
         this.routineLoadManager = new RoutineLoadManager();
         this.sqlBlockRuleMgr = new SqlBlockRuleMgr();
@@ -575,9 +535,6 @@ public class Catalog {
 
         this.metaReplayState = new MetaReplayState();
 
-        this.idToCluster = new ConcurrentHashMap<>();
-        this.nameToCluster = new ConcurrentHashMap<>();
-
         this.isDefaultClusterCreated = false;
 
         this.brokerMgr = new BrokerMgr();
@@ -595,8 +552,6 @@ public class Catalog {
         this.auth = new PaloAuth();
         this.domainResolver = new DomainResolver(auth);
 
-        this.esRepository = new EsRepository();
-
         this.metaContext = new MetaContext();
         this.metaContext.setThreadLocalInfo();
 
@@ -617,7 +572,6 @@ public class Catalog {
         this.loadJobScheduler = new LoadJobScheduler();
         this.loadManager = new LoadManager(loadJobScheduler);
         this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000);
-        this.icebergTableCreationRecordMgr = new IcebergTableCreationRecordMgr();
         this.loadEtlChecker = new LoadEtlChecker(loadManager);
         this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
         this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
@@ -696,10 +650,6 @@ public class Catalog {
         return tabletChecker;
     }
 
-    public ConcurrentHashMap<String, Database> getFullNameToDb() {
-        return fullNameToDb;
-    }
-
     public AuditEventProcessor getAuditEventProcessor() {
         return auditEventProcessor;
     }
@@ -766,7 +716,6 @@ public class Catalog {
         return statisticsTaskScheduler;
     }
 
-
     // Use tryLock to avoid potential dead lock
     private boolean tryLock(boolean mustLock) {
         while (true) {
@@ -1408,7 +1357,7 @@ public class Catalog {
         // start daemon thread to update global partition in memory information periodically
         partitionInMemoryInfoCollector.start();
         streamLoadRecordMgr.start();
-        icebergTableCreationRecordMgr.start();
+        getInternalDataSource().getIcebergTableCreationRecordMgr().start();
     }
 
     // start threads that should running on all FE
@@ -1416,8 +1365,8 @@ public class Catalog {
         tabletStatMgr.start();
         // load and export job label cleaner thread
         labelCleaner.start();
-        // ES state store
-        esRepository.start();
+        // es repository
+        getInternalDataSource().getEsRepository().start();
         // domain resolver
         domainResolver.start();
     }
@@ -1627,44 +1576,6 @@ public class Catalog {
         MetaReader.read(curFile, this);
     }
 
-    public void recreateTabletInvertIndex() {
-        if (isCheckpointThread()) {
-            return;
-        }
-
-        // create inverted index
-        TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
-        for (Database db : this.fullNameToDb.values()) {
-            long dbId = db.getId();
-            for (Table table : db.getTables()) {
-                if (table.getType() != TableType.OLAP) {
-                    continue;
-                }
-
-                OlapTable olapTable = (OlapTable) table;
-                long tableId = olapTable.getId();
-                Collection<Partition> allPartitions = olapTable.getAllPartitions();
-                for (Partition partition : allPartitions) {
-                    long partitionId = partition.getId();
-                    TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(
-                            partitionId).getStorageMedium();
-                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
-                        long indexId = index.getId();
-                        int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
-                        for (Tablet tablet : index.getTablets()) {
-                            long tabletId = tablet.getId();
-                            invertedIndex.addTablet(tabletId, tabletMeta);
-                            for (Replica replica : tablet.getReplicas()) {
-                                invertedIndex.addReplica(tabletId, replica);
-                            }
-                        }
-                    } // end for indices
-                } // end for partitions
-            } // end for tables
-        } // end for dbs
-    }
-
     public long loadHeader(DataInputStream dis, MetaHeader metaHeader, long checksum) throws IOException, DdlException {
         switch (metaHeader.getMetaFormat()) {
             case COR1:
@@ -1721,21 +1632,7 @@ public class Catalog {
     }
 
     public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlException {
-        int dbCount = dis.readInt();
-        long newChecksum = checksum ^ dbCount;
-        for (long i = 0; i < dbCount; ++i) {
-            Database db = new Database();
-            db.readFields(dis);
-            newChecksum ^= db.getId();
-            idToDb.put(db.getId(), db);
-            fullNameToDb.put(db.getFullName(), db);
-            if (db.getDbState() == DbState.LINK) {
-                fullNameToDb.put(db.getAttachDb(), db);
-            }
-            globalTransactionMgr.addDatabaseTransactionMgr(db.getId());
-        }
-        LOG.info("finished replay databases from image");
-        return newChecksum;
+        return getInternalDataSource().loadDb(dis, checksum);
     }
 
     public long loadLoadJob(DataInputStream dis, long checksum) throws IOException, DdlException {
@@ -2044,19 +1941,7 @@ public class Catalog {
     }
 
     public long saveDb(CountingDataOutputStream dos, long checksum) throws IOException {
-        int dbCount = idToDb.size() - nameToCluster.keySet().size();
-        checksum ^= dbCount;
-        dos.writeInt(dbCount);
-        for (Map.Entry<Long, Database> entry : idToDb.entrySet()) {
-            Database db = entry.getValue();
-            String dbName = db.getFullName();
-            // Don't write information_schema db meta
-            if (!InfoSchemaDb.isInfoSchemaDb(dbName)) {
-                checksum ^= entry.getKey();
-                db.write(dos);
-            }
-        }
-        return checksum;
+        return getInternalDataSource().saveDb(dos, checksum);
     }
 
     public long saveLoadJob(CountingDataOutputStream dos, long checksum) throws IOException {
@@ -2592,312 +2477,47 @@ public class Catalog {
         return null;
     }
 
-
     // The interface which DdlExecutor needs.
     public void createDb(CreateDbStmt stmt) throws DdlException {
-        final String clusterName = stmt.getClusterName();
-        String fullDbName = stmt.getFullDbName();
-        Map<String, String> properties = stmt.getProperties();
-
-        long id = getNextId();
-        Database db = new Database(id, fullDbName);
-        db.setClusterName(clusterName);
-        // check and analyze database properties before create database
-        db.setDbProperties(new DatabaseProperty(properties).checkAndBuildProperties());
-
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        try {
-            if (!nameToCluster.containsKey(clusterName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_SELECT_CLUSTER, clusterName);
-            }
-            if (fullNameToDb.containsKey(fullDbName)) {
-                if (stmt.isSetIfNotExists()) {
-                    LOG.info("create database[{}] which already exists", fullDbName);
-                    return;
-                } else {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, fullDbName);
-                }
-            } else {
-                unprotectCreateDb(db);
-                editLog.logCreateDb(db);
-            }
-        } finally {
-            unlock();
-        }
-        LOG.info("createDb dbName = " + fullDbName + ", id = " + id);
-
-        // create tables in iceberg database
-        if (db.getDbProperties().getIcebergProperty().isExist()) {
-            icebergTableCreationRecordMgr.registerDb(db);
-        }
+        getInternalDataSource().createDb(stmt);
     }
 
     // For replay edit log, need't lock metadata
     public void unprotectCreateDb(Database db) {
-        idToDb.put(db.getId(), db);
-        fullNameToDb.put(db.getFullName(), db);
-        final Cluster cluster = nameToCluster.get(db.getClusterName());
-        cluster.addDb(db.getFullName(), db.getId());
-        globalTransactionMgr.addDatabaseTransactionMgr(db.getId());
+        getInternalDataSource().unprotectCreateDb(db);
     }
 
     // for test
     public void addCluster(Cluster cluster) {
-        nameToCluster.put(cluster.getName(), cluster);
-        idToCluster.put(cluster.getId(), cluster);
+        getInternalDataSource().addCluster(cluster);
     }
 
     public void replayCreateDb(Database db) {
-        tryLock(true);
-        try {
-            unprotectCreateDb(db);
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().replayCreateDb(db);
     }
 
     public void dropDb(DropDbStmt stmt) throws DdlException {
-        String dbName = stmt.getDbName();
-
-        // 1. check if database exists
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        try {
-            if (!fullNameToDb.containsKey(dbName)) {
-                if (stmt.isSetIfExists()) {
-                    LOG.info("drop database[{}] which does not exist", dbName);
-                    return;
-                } else {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
-                }
-            }
-
-            // 2. drop tables in db
-            Database db = this.fullNameToDb.get(dbName);
-            db.writeLock();
-            try {
-                if (!stmt.isForceDrop()) {
-                    if (Catalog.getCurrentCatalog().getGlobalTransactionMgr().existCommittedTxns(db.getId(), null, null)) {
-                        throw new DdlException("There are still some transactions in the COMMITTED state waiting to be completed. "
-                                + "The database [" + dbName + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
-                                + " please use \"DROP database FORCE\".");
-                    }
-                }
-                if (db.getDbState() == DbState.LINK && dbName.equals(db.getAttachDb())) {
-                    // We try to drop a hard link.
-                    final DropLinkDbAndUpdateDbInfo info = new DropLinkDbAndUpdateDbInfo();
-                    fullNameToDb.remove(db.getAttachDb());
-                    db.setDbState(DbState.NORMAL);
-                    info.setUpdateDbState(DbState.NORMAL);
-                    final Cluster cluster = nameToCluster
-                            .get(ClusterNamespace.getClusterNameFromFullName(db.getAttachDb()));
-                    final BaseParam param = new BaseParam();
-                    param.addStringParam(db.getAttachDb());
-                    param.addLongParam(db.getId());
-                    cluster.removeLinkDb(param);
-                    info.setDropDbCluster(cluster.getName());
-                    info.setDropDbId(db.getId());
-                    info.setDropDbName(db.getAttachDb());
-                    editLog.logDropLinkDb(info);
-                    return;
-                }
-
-                if (db.getDbState() == DbState.LINK && dbName.equals(db.getFullName())) {
-                    // We try to drop a db which other dbs attach to it,
-                    // which is not allowed.
-                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
-                            ClusterNamespace.getNameFromFullName(dbName));
-                    return;
-                }
-
-                if (dbName.equals(db.getAttachDb()) && db.getDbState() == DbState.MOVE) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
-                            ClusterNamespace.getNameFromFullName(dbName));
-                    return;
-                }
-
-                // save table names for recycling
-                Set<String> tableNames = db.getTableNamesWithLock();
-                List<Table> tableList = db.getTablesOnIdOrder();
-                MetaLockUtils.writeLockTables(tableList);
-                try {
-                    if (!stmt.isForceDrop()) {
-                        for (Table table : tableList) {
-                            if (table.getType() == TableType.OLAP) {
-                                OlapTable olapTable = (OlapTable) table;
-                                if (olapTable.getState() != OlapTableState.NORMAL) {
-                                    throw new DdlException("The table [" + olapTable.getState() + "]'s state is " + olapTable.getState() + ", cannot be dropped."
-                                            + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered),"
-                                            + " please use \"DROP table FORCE\".");
-                                }
-                            }
-                        }
-                    }
-                    unprotectDropDb(db, stmt.isForceDrop(), false);
-                } finally {
-                    MetaLockUtils.writeUnlockTables(tableList);
-                }
-
-                if (!stmt.isForceDrop()) {
-                    Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
-                } else {
-                    Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
-                }
-            } finally {
-                db.writeUnlock();
-            }
-
-            // 3. remove db from catalog
-            idToDb.remove(db.getId());
-            fullNameToDb.remove(db.getFullName());
-            final Cluster cluster = nameToCluster.get(db.getClusterName());
-            cluster.removeDb(dbName, db.getId());
-            DropDbInfo info = new DropDbInfo(dbName, stmt.isForceDrop());
-            editLog.logDropDb(info);
-        } finally {
-            unlock();
-        }
-
-        LOG.info("finish drop database[{}], is force : {}", dbName, stmt.isForceDrop());
-    }
-
-    public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay) {
-        // drop Iceberg database table creation records
-        if (db.getDbProperties().getIcebergProperty().isExist()) {
-            icebergTableCreationRecordMgr.deregisterDb(db);
-        }
-        for (Table table : db.getTables()) {
-            unprotectDropTable(db, table, isForeDrop, isReplay);
-        }
-        db.markDropped();
+        getInternalDataSource().dropDb(stmt);
     }
 
     public void replayDropLinkDb(DropLinkDbAndUpdateDbInfo info) {
-        tryLock(true);
-        try {
-            final Database db = this.fullNameToDb.remove(info.getDropDbName());
-            db.setDbState(info.getUpdateDbState());
-            final Cluster cluster = nameToCluster
-                    .get(info.getDropDbCluster());
-            final BaseParam param = new BaseParam();
-            param.addStringParam(db.getAttachDb());
-            param.addLongParam(db.getId());
-            cluster.removeLinkDb(param);
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().replayDropLinkDb(info);
     }
 
     public void replayDropDb(String dbName, boolean isForceDrop) throws DdlException {
-        tryLock(true);
-        try {
-            Database db = fullNameToDb.get(dbName);
-            db.writeLock();
-            try {
-                Set<String> tableNames = db.getTableNamesWithLock();
-                List<Table> tableList = db.getTablesOnIdOrder();
-                MetaLockUtils.writeLockTables(tableList);
-                try {
-                    unprotectDropDb(db, isForceDrop, true);
-                } finally {
-                    MetaLockUtils.writeUnlockTables(tableList);
-                }
-                if (!isForceDrop) {
-                    Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
-                } else {
-                    Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
-                }
-            } finally {
-                db.writeUnlock();
-            }
-
-            fullNameToDb.remove(dbName);
-            idToDb.remove(db.getId());
-            final Cluster cluster = nameToCluster.get(db.getClusterName());
-            cluster.removeDb(dbName, db.getId());
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().replayDropDb(dbName, isForceDrop);
     }
 
     public void recoverDatabase(RecoverDbStmt recoverStmt) throws DdlException {
-        // check is new db with same name already exist
-        if (getDb(recoverStmt.getDbName()).isPresent()) {
-            throw new DdlException("Database[" + recoverStmt.getDbName() + "] already exist.");
-        }
-
-        Database db = Catalog.getCurrentRecycleBin().recoverDatabase(recoverStmt.getDbName());
-
-        // add db to catalog
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        db.writeLock();
-        List<Table> tableList = db.getTablesOnIdOrder();
-        MetaLockUtils.writeLockTables(tableList);
-        try {
-            if (fullNameToDb.containsKey(db.getFullName())) {
-                throw new DdlException("Database[" + db.getFullName() + "] already exist.");
-                // it's ok that we do not put db back to CatalogRecycleBin
-                // cause this db cannot recover any more
-            }
-
-            fullNameToDb.put(db.getFullName(), db);
-            idToDb.put(db.getId(), db);
-            final Cluster cluster = nameToCluster.get(db.getClusterName());
-            cluster.addDb(db.getFullName(), db.getId());
-
-            // log
-            RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L);
-            editLog.logRecoverDb(recoverInfo);
-            db.unmarkDropped();
-        } finally {
-            MetaLockUtils.writeUnlockTables(tableList);
-            db.writeUnlock();
-            unlock();
-        }
-
-        LOG.info("recover database[{}]", db.getId());
+        getInternalDataSource().recoverDatabase(recoverStmt);
     }
 
     public void recoverTable(RecoverTableStmt recoverStmt) throws DdlException {
-        String dbName = recoverStmt.getDbName();
-        String tableName = recoverStmt.getTableName();
-
-        Database db = this.getDbOrDdlException(dbName);
-        db.writeLockOrDdlException();
-        try {
-            if (db.getTable(tableName).isPresent()) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-            }
-            if (!Catalog.getCurrentRecycleBin().recoverTable(db, tableName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
-            }
-        } finally {
-            db.writeUnlock();
-        }
+        getInternalDataSource().recoverTable(recoverStmt);
     }
 
     public void recoverPartition(RecoverPartitionStmt recoverStmt) throws DdlException {
-        String dbName = recoverStmt.getDbName();
-        String tableName = recoverStmt.getTableName();
-
-        Database db = this.getDbOrDdlException(dbName);
-        OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
-        olapTable.writeLockOrDdlException();
-        try {
-            String partitionName = recoverStmt.getPartitionName();
-            if (olapTable.getPartition(partitionName) != null) {
-                throw new DdlException("partition[" + partitionName + "] already exist in table[" + tableName + "]");
-            }
-
-            Catalog.getCurrentRecycleBin().recoverPartition(db.getId(), olapTable, partitionName);
-        } finally {
-            olapTable.writeUnlock();
-        }
+        getInternalDataSource().recoverPartition(recoverStmt);
     }
 
     public void replayEraseDatabase(long dbId) throws DdlException {
@@ -2915,104 +2535,19 @@ public class Catalog {
     }
 
     public void alterDatabaseQuota(AlterDatabaseQuotaStmt stmt) throws DdlException {
-        String dbName = stmt.getDbName();
-        Database db = this.getDbOrDdlException(dbName);
-        QuotaType quotaType = stmt.getQuotaType();
-        db.writeLockOrDdlException();
-        try {
-            if (quotaType == QuotaType.DATA) {
-                db.setDataQuota(stmt.getQuota());
-            } else if (quotaType == QuotaType.REPLICA) {
-                db.setReplicaQuota(stmt.getQuota());
-            }
-            long quota = stmt.getQuota();
-            DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType);
-            editLog.logAlterDb(dbInfo);
-        } finally {
-            db.writeUnlock();
-        }
+        getInternalDataSource().alterDatabaseQuota(stmt);
     }
 
     public void replayAlterDatabaseQuota(String dbName, long quota, QuotaType quotaType) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(dbName);
-        db.writeLock();
-        try {
-            if (quotaType == QuotaType.DATA) {
-                db.setDataQuota(quota);
-            } else if (quotaType == QuotaType.REPLICA) {
-                db.setReplicaQuota(quota);
-            }
-        } finally {
-            db.writeUnlock();
-        }
+        getInternalDataSource().replayAlterDatabaseQuota(dbName, quota, quotaType);
     }
 
     public void renameDatabase(AlterDatabaseRename stmt) throws DdlException {
-        String fullDbName = stmt.getDbName();
-        String newFullDbName = stmt.getNewDbName();
-        String clusterName = stmt.getClusterName();
-
-        if (fullDbName.equals(newFullDbName)) {
-            throw new DdlException("Same database name");
-        }
-
-        Database db = null;
-        Cluster cluster = null;
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        try {
-            cluster = nameToCluster.get(clusterName);
-            if (cluster == null) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
-            }
-            // check if db exists
-            db = fullNameToDb.get(fullDbName);
-            if (db == null) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, fullDbName);
-            }
-
-            if (db.getDbState() == DbState.LINK || db.getDbState() == DbState.MOVE) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_RENAME_DB_ERR, fullDbName);
-            }
-            // check if name is already used
-            if (fullNameToDb.get(newFullDbName) != null) {
-                throw new DdlException("Database name[" + newFullDbName + "] is already used");
-            }
-
-            cluster.removeDb(db.getFullName(), db.getId());
-            cluster.addDb(newFullDbName, db.getId());
-            // 1. rename db
-            db.setNameWithLock(newFullDbName);
-
-            // 2. add to meta. check again
-            fullNameToDb.remove(fullDbName);
-            fullNameToDb.put(newFullDbName, db);
-
-            DatabaseInfo dbInfo = new DatabaseInfo(fullDbName, newFullDbName, -1L, QuotaType.NONE);
-            editLog.logDatabaseRename(dbInfo);
-        } finally {
-            unlock();
-        }
-
-        LOG.info("rename database[{}] to [{}]", fullDbName, newFullDbName);
+        getInternalDataSource().renameDatabase(stmt);
     }
 
     public void replayRenameDatabase(String dbName, String newDbName) {
-        tryLock(true);
-        try {
-            Database db = fullNameToDb.get(dbName);
-            Cluster cluster = nameToCluster.get(db.getClusterName());
-            cluster.removeDb(db.getFullName(), db.getId());
-            db.setName(newDbName);
-            cluster.addDb(newDbName, db.getId());
-            fullNameToDb.remove(dbName);
-            fullNameToDb.put(newDbName, db);
-        } finally {
-            unlock();
-        }
-
-        LOG.info("replay rename database {} to {}", dbName, newDbName);
+        getInternalDataSource().replayRenameDatabase(dbName, newDbName);
     }
 
     /**
@@ -3035,1142 +2570,39 @@ public class Catalog {
      * 11. add this table to ColocateGroup if necessary
      */
     public void createTable(CreateTableStmt stmt) throws UserException {
-        String engineName = stmt.getEngineName();
-        String dbName = stmt.getDbName();
-        String tableName = stmt.getTableName();
-
-        // check if db exists
-        Database db = this.getDbOrDdlException(dbName);
-
-        // only internal table should check quota and cluster capacity
-        if (!stmt.isExternal()) {
-            // check cluster capacity
-            Catalog.getCurrentSystemInfo().checkClusterCapacity(stmt.getClusterName());
-            // check db quota
-            db.checkQuota();
-        }
-
-        // check if table exists in db
-        if (db.getTable(tableName).isPresent()) {
-            if (stmt.isSetIfNotExists()) {
-                LOG.info("create table[{}] which already exists", tableName);
-                return;
-            } else {
-                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-            }
-        }
-
-        if (engineName.equals("olap")) {
-            createOlapTable(db, stmt);
-            return;
-        } else if (engineName.equals("odbc")) {
-            createOdbcTable(db, stmt);
-            return;
-        } else if (engineName.equals("mysql")) {
-            createMysqlTable(db, stmt);
-            return;
-        } else if (engineName.equals("broker")) {
-            createBrokerTable(db, stmt);
-            return;
-        } else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
-            createEsTable(db, stmt);
-            return;
-        } else if (engineName.equalsIgnoreCase("hive")) {
-            createHiveTable(db, stmt);
-            return;
-        } else if (engineName.equalsIgnoreCase("iceberg")) {
-            IcebergCatalogMgr.createIcebergTable(db, stmt);
-            return;
-        } else if (engineName.equalsIgnoreCase("hudi")) {
-            createHudiTable(db, stmt);
-            return;
-        } else {
-            ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
-        }
-        Preconditions.checkState(false);
+        getInternalDataSource().createTable(stmt);
     }
 
     public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
-        try {
-            Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getExistedDbName());
-            Table table = db.getTableOrDdlException(stmt.getExistedTableName());
-
-            if (table.getType() == TableType.VIEW) {
-                throw new DdlException("Not support create table from a View");
-            }
-
-            List<String> createTableStmt = Lists.newArrayList();
-            table.readLock();
-            try {
-                if (table.getType() == TableType.OLAP) {
-                    if (!CollectionUtils.isEmpty(stmt.getRollupNames())) {
-                        OlapTable olapTable = (OlapTable) table;
-                        for (String rollupIndexName : stmt.getRollupNames()) {
-                            if (!olapTable.hasMaterializedIndex(rollupIndexName)) {
-                                throw new DdlException("Rollup index[" + rollupIndexName + "] not exists in Table[" + olapTable.getName() + "]");
-                            }
-                        }
-                    }
-                } else if (!CollectionUtils.isEmpty(stmt.getRollupNames()) || stmt.isWithAllRollup()) {
-                    throw new DdlException("Table[" + table.getName() + "] is external, not support rollup copy");
-                }
-
-                Catalog.getDdlStmt(stmt, stmt.getDbName(), table, createTableStmt, null, null, false, false, true);
-                if (createTableStmt.isEmpty()) {
-                    ErrorReport.reportDdlException(ErrorCode.ERROR_CREATE_TABLE_LIKE_EMPTY, "CREATE");
-                }
-            } finally {
-                table.readUnlock();
-            }
-            CreateTableStmt parsedCreateTableStmt = (CreateTableStmt) SqlParserUtils.parseAndAnalyzeStmt(createTableStmt.get(0), ConnectContext.get());
-            parsedCreateTableStmt.setTableName(stmt.getTableName());
-            parsedCreateTableStmt.setIfNotExists(stmt.isIfNotExists());
-            createTable(parsedCreateTableStmt);
-        } catch (UserException e) {
-            throw new DdlException("Failed to execute CREATE TABLE LIKE " + stmt.getExistedTableName() + ". Reason: " + e.getMessage());
-        }
+        getInternalDataSource().createTableLike(stmt);
     }
 
     public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlException {
-        try {
-            List<String> columnNames = stmt.getColumnNames();
-            CreateTableStmt createTableStmt = stmt.getCreateTableStmt();
-            QueryStmt queryStmt = stmt.getQueryStmt();
-            ArrayList<Expr> resultExprs = queryStmt.getResultExprs();
-            ArrayList<String> colLabels = queryStmt.getColLabels();
-            int size = resultExprs.size();
-            // Check columnNames
-            int colNameIndex = 0;
-            for (int i = 0; i < size; ++i) {
-                String name;
-                if (columnNames != null) {
-                    // use custom column names
-                    name = columnNames.get(i);
-                } else {
-                    name = colLabels.get(i);
-                }
-                try {
-                    FeNameFormat.checkColumnName(name);
-                } catch (AnalysisException exception) {
-                    name = "_col" + (colNameIndex++);
-                }
-                TypeDef typeDef;
-                Expr resultExpr = resultExprs.get(i);
-                if (resultExpr.getType().isStringType() && resultExpr.getType().getLength() < 0) {
-                    typeDef = new TypeDef(Type.STRING);
-                } else {
-                    typeDef = new TypeDef(resultExpr.getType());
-                }
-                ColumnDef columnDef;
-                if (resultExpr.getSrcSlotRef() == null) {
-                    columnDef = new ColumnDef(name, typeDef, false, null, true, new DefaultValue(false, null), "");
-                } else {
-                    Column column = resultExpr.getSrcSlotRef().getDesc().getColumn();
-                    boolean setDefault = StringUtils.isNotBlank(column.getDefaultValue());
-                    columnDef = new ColumnDef(name, typeDef, column.isKey(),
-                            column.getAggregationType(), column.isAllowNull(),
-                            new DefaultValue(setDefault, column.getDefaultValue()), column.getComment());
-                }
-                createTableStmt.addColumnDef(columnDef);
-                // set first column as default distribution
-                if (createTableStmt.getDistributionDesc() == null && i == 0) {
-                    createTableStmt.setDistributionDesc(new HashDistributionDesc(10, Lists.newArrayList(name)));
-                }
-            }
-            Analyzer dummyRootAnalyzer = new Analyzer(this, ConnectContext.get());
-            createTableStmt.analyze(dummyRootAnalyzer);
-            createTable(createTableStmt);
-        } catch (UserException e) {
-            throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
-        }
+        getInternalDataSource().createTableAsSelect(stmt);
     }
 
     public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
-        SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
-        DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
-        boolean isTempPartition = addPartitionClause.isTempPartition();
+        getInternalDataSource().addPartition(db, tableName, addPartitionClause);
+    }
 
-        DistributionInfo distributionInfo;
-        Map<Long, MaterializedIndexMeta> indexIdToMeta;
-        Set<String> bfColumns;
-        String partitionName = singlePartitionDesc.getPartitionName();
-
-        // check
-        Table table = db.getOlapTableOrDdlException(tableName);
-        // check state
-        OlapTable olapTable = (OlapTable) table;
-
-        olapTable.readLock();
-        try {
-            if (olapTable.getState() != OlapTableState.NORMAL) {
-                throw new DdlException("Table[" + tableName + "]'s state is not NORMAL");
-            }
-
-            // check partition type
-            PartitionInfo partitionInfo = olapTable.getPartitionInfo();
-            if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
-                throw new DdlException("Only support adding partition to range and list partitioned table");
-            }
-
-            // check partition name
-            if (olapTable.checkPartitionNameExist(partitionName)) {
-                if (singlePartitionDesc.isSetIfNotExists()) {
-                    LOG.info("add partition[{}] which already exists", partitionName);
-                    return;
-                } else {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
-                }
-            }
-
-            Map<String, String> properties = singlePartitionDesc.getProperties();
-            // partition properties should inherit table properties
-            ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation();
-            if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)
-                    && !properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
-                properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt());
-            }
-            if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
-                properties.put(PropertyAnalyzer.PROPERTIES_INMEMORY, olapTable.isInMemory().toString());
-            }
-
-            singlePartitionDesc.analyze(partitionInfo.getPartitionColumns().size(), properties);
-            partitionInfo.createAndCheckPartitionItem(singlePartitionDesc, isTempPartition);
-
-            // get distributionInfo
-            List<Column> baseSchema = olapTable.getBaseSchema();
-            DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
-            if (distributionDesc != null) {
-                distributionInfo = distributionDesc.toDistributionInfo(baseSchema);
-                // for now. we only support modify distribution's bucket num
-                if (distributionInfo.getType() != defaultDistributionInfo.getType()) {
-                    throw new DdlException("Cannot assign different distribution type. default is: "
-                            + defaultDistributionInfo.getType());
-                }
-
-                if (distributionInfo.getType() == DistributionInfoType.HASH) {
-                    HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
-                    List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
-                    List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo)
-                            .getDistributionColumns();
-                    if (!newDistriCols.equals(defaultDistriCols)) {
-                        throw new DdlException("Cannot assign hash distribution with different distribution cols. "
-                                + "default is: " + defaultDistriCols);
-                    }
-                    if (hashDistributionInfo.getBucketNum() <= 0) {
-                        throw new DdlException("Cannot assign hash distribution buckets less than 1");
-                    }
-                }
-            } else {
-                // make sure partition-dristribution-info is deep copied from default-distribution-info
-                distributionInfo = defaultDistributionInfo.toDistributionDesc().toDistributionInfo(baseSchema);
-            }
-
-            // check colocation
-            if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
-                String fullGroupName = db.getId() + "_" + olapTable.getColocateGroup();
-                ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName);
-                Preconditions.checkNotNull(groupSchema);
-                groupSchema.checkDistribution(distributionInfo);
-                groupSchema.checkReplicaAllocation(singlePartitionDesc.getReplicaAlloc());
-            }
-
-            indexIdToMeta = olapTable.getCopiedIndexIdToMeta();
-            bfColumns = olapTable.getCopiedBfColumns();
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        } finally {
-            olapTable.readUnlock();
-        }
-
-        Preconditions.checkNotNull(distributionInfo);
-        Preconditions.checkNotNull(olapTable);
-        Preconditions.checkNotNull(indexIdToMeta);
-
-        // create partition outside db lock
-        DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty();
-        Preconditions.checkNotNull(dataProperty);
-        // check replica quota if this operation done
-        long indexNum = indexIdToMeta.size();
-        long bucketNum = distributionInfo.getBucketNum();
-        long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum();
-        long totalReplicaNum = indexNum * bucketNum * replicaNum;
-        if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
-            throw new DdlException("Database " + db.getFullName() + " table " + tableName
-                    + " add partition increasing " + totalReplicaNum
-                    + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
-        }
-        Set<Long> tabletIdSet = new HashSet<Long>();
-        try {
-            long partitionId = getNextId();
-            Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
-                    olapTable.getId(),
-                    olapTable.getBaseIndexId(),
-                    partitionId, partitionName,
-                    indexIdToMeta,
-                    distributionInfo,
-                    dataProperty.getStorageMedium(),
-                    singlePartitionDesc.getReplicaAlloc(),
-                    singlePartitionDesc.getVersionInfo(),
-                    bfColumns, olapTable.getBfFpp(),
-                    tabletIdSet, olapTable.getCopiedIndexes(),
-                    singlePartitionDesc.isInMemory(),
-                    olapTable.getStorageFormat(),
-                    singlePartitionDesc.getTabletType(),
-                    olapTable.getCompressionType(),
-                    olapTable.getDataSortInfo()
-            );
-
-            // check again
-            table = db.getOlapTableOrDdlException(tableName);
-            table.writeLockOrDdlException();
-            try {
-                olapTable = (OlapTable) table;
-                if (olapTable.getState() != OlapTableState.NORMAL) {
-                    throw new DdlException("Table[" + tableName + "]'s state is not NORMAL");
-                }
-
-                // check partition name
-                if (olapTable.checkPartitionNameExist(partitionName)) {
-                    if (singlePartitionDesc.isSetIfNotExists()) {
-                        LOG.info("add partition[{}] which already exists", partitionName);
-                        return;
-                    } else {
-                        ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
-                    }
-                }
-
-                // check if meta changed
-                // rollup index may be added or dropped during add partition operation.
-                // schema may be changed during add partition operation.
-                boolean metaChanged = false;
-                if (olapTable.getIndexNameToId().size() != indexIdToMeta.size()) {
-                    metaChanged = true;
-                } else {
-                    // compare schemaHash
-                    for (Map.Entry<Long, MaterializedIndexMeta> entry : olapTable.getIndexIdToMeta().entrySet()) {
-                        long indexId = entry.getKey();
-                        if (!indexIdToMeta.containsKey(indexId)) {
-                            metaChanged = true;
-                            break;
-                        }
-                        if (indexIdToMeta.get(indexId).getSchemaHash() != entry.getValue().getSchemaHash()) {
-                            metaChanged = true;
-                            break;
-                        }
-                    }
-                }
-
-                if (metaChanged) {
-                    throw new DdlException("Table[" + tableName + "]'s meta has been changed. try again.");
-                }
-
-                // check partition type
-                PartitionInfo partitionInfo = olapTable.getPartitionInfo();
-                if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
-                    throw new DdlException("Only support adding partition to range and list partitioned table");
-                }
-
-                // update partition info
-                partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId, isTempPartition);
-
-                if (isTempPartition) {
-                    olapTable.addTempPartition(partition);
-                } else {
-                    olapTable.addPartition(partition);
-                }
-
-                // log
-                PartitionPersistInfo info = null;
-                if (partitionInfo.getType() == PartitionType.RANGE) {
-                    info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
-                            partitionInfo.getItem(partitionId).getItems(),
-                            ListPartitionItem.DUMMY_ITEM,
-                            dataProperty,
-                            partitionInfo.getReplicaAllocation(partitionId),
-                            partitionInfo.getIsInMemory(partitionId),
-                            isTempPartition);
-                } else if (partitionInfo.getType() == PartitionType.LIST) {
-                    info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
-                            RangePartitionItem.DUMMY_ITEM,
-                            partitionInfo.getItem(partitionId), dataProperty,
-                            partitionInfo.getReplicaAllocation(partitionId),
-                            partitionInfo.getIsInMemory(partitionId),
-                            isTempPartition);
-                }
-                editLog.logAddPartition(info);
-
-                LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition);
-            } finally {
-                table.writeUnlock();
-            }
-        } catch (DdlException e) {
-            for (Long tabletId : tabletIdSet) {
-                Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
-            }
-            throw e;
-        }
-    }
-
-    public void replayAddPartition(PartitionPersistInfo info) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(info.getDbId());
-        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
-        olapTable.writeLock();
-        try {
-            Partition partition = info.getPartition();
-            PartitionInfo partitionInfo = olapTable.getPartitionInfo();
-            if (info.isTempPartition()) {
-                olapTable.addTempPartition(partition);
-            } else {
-                olapTable.addPartition(partition);
-            }
-
-            PartitionItem partitionItem = null;
-            if (partitionInfo.getType() == PartitionType.RANGE) {
-                partitionItem = new RangePartitionItem(info.getRange());
-            } else if (partitionInfo.getType() == PartitionType.LIST) {
-                partitionItem = info.getListPartitionItem();
-            }
-
-            partitionInfo.unprotectHandleNewSinglePartitionDesc(partition.getId(), info.isTempPartition(),
-                    partitionItem, info.getDataProperty(), info.getReplicaAlloc(), info.isInMemory());
-
-            if (!isCheckpointThread()) {
-                // add to inverted index
-                TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
-                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
-                    long indexId = index.getId();
-                    int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-                    TabletMeta tabletMeta = new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(),
-                            index.getId(), schemaHash, info.getDataProperty().getStorageMedium());
-                    for (Tablet tablet : index.getTablets()) {
-                        long tabletId = tablet.getId();
-                        invertedIndex.addTablet(tabletId, tabletMeta);
-                        for (Replica replica : tablet.getReplicas()) {
-                            invertedIndex.addReplica(tabletId, replica);
-                        }
-                    }
-                }
-            }
-        } finally {
-            olapTable.writeUnlock();
-        }
-    }
+    public void replayAddPartition(PartitionPersistInfo info) throws MetaNotFoundException {
+        getInternalDataSource().replayAddPartition(info);
+    }
 
     public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause clause) throws DdlException {
-        Preconditions.checkArgument(olapTable.isWriteLockHeldByCurrentThread());
-
-        String partitionName = clause.getPartitionName();
-        boolean isTempPartition = clause.isTempPartition();
-
-        if (olapTable.getState() != OlapTableState.NORMAL) {
-            throw new DdlException("Table[" + olapTable.getName() + "]'s state is not NORMAL");
-        }
-
-        if (!olapTable.checkPartitionNameExist(partitionName, isTempPartition)) {
-            if (clause.isSetIfExists()) {
-                LOG.info("drop partition[{}] which does not exist", partitionName);
-                return;
-            } else {
-                ErrorReport.reportDdlException(ErrorCode.ERR_DROP_PARTITION_NON_EXISTENT, partitionName);
-            }
-        }
-
-        PartitionInfo partitionInfo = olapTable.getPartitionInfo();
-        if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
-            throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table");
-        }
-
-        // drop
-        if (isTempPartition) {
-            olapTable.dropTempPartition(partitionName, true);
-        } else {
-            if (!clause.isForceDrop()) {
-                Partition partition = olapTable.getPartition(partitionName);
-                if (partition != null) {
-                    if (Catalog.getCurrentCatalog().getGlobalTransactionMgr().existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) {
-                        throw new DdlException("There are still some transactions in the COMMITTED state waiting to be completed."
-                                + " The partition [" + partitionName + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
-                                + " please use \"DROP partition FORCE\".");
-                    }
-                }
-            }
-            olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop());
-        }
-
-        // log
-        DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition, clause.isForceDrop());
-        editLog.logDropPartition(info);
-
-        LOG.info("succeed in dropping partition[{}], is temp : {}, is force : {}", partitionName, isTempPartition, clause.isForceDrop());
+        getInternalDataSource().dropPartition(db, olapTable, clause);
     }
 
     public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(info.getDbId());
-        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
-        olapTable.writeLock();
-        try {
-            if (info.isTempPartition()) {
-                olapTable.dropTempPartition(info.getPartitionName(), true);
-            } else {
-                olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop());
-            }
-        } finally {
-            olapTable.writeUnlock();
-        }
+        getInternalDataSource().replayDropPartition(info);
     }
 
     public void replayErasePartition(long partitionId) {
-        Catalog.getCurrentRecycleBin().replayErasePartition(partitionId);
+        getInternalDataSource().replayErasePartition(partitionId);
     }
 
     public void replayRecoverPartition(RecoverInfo info) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(info.getDbId());
-        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
-        olapTable.writeLock();
-        try {
-            Catalog.getCurrentRecycleBin().replayRecoverPartition(olapTable, info.getPartitionId());
-        } finally {
-            olapTable.writeUnlock();
-        }
-    }
-
-    private Partition createPartitionWithIndices(String clusterName, long dbId, long tableId,
-                                                 long baseIndexId, long partitionId, String partitionName,
-                                                 Map<Long, MaterializedIndexMeta> indexIdToMeta,
-                                                 DistributionInfo distributionInfo,
-                                                 TStorageMedium storageMedium,
-                                                 ReplicaAllocation replicaAlloc,
-                                                 Long versionInfo,
-                                                 Set<String> bfColumns,
-                                                 double bfFpp,
-                                                 Set<Long> tabletIdSet,
-                                                 List<Index> indexes,
-                                                 boolean isInMemory,
-                                                 TStorageFormat storageFormat,
-                                                 TTabletType tabletType,
-                                                 TCompressionType compressionType,
-                                                 DataSortInfo dataSortInfo) throws DdlException {
-        // create base index first.
-        Preconditions.checkArgument(baseIndexId != -1);
-        MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
-
-        // create partition with base index
-        Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);
-
-        // add to index map
-        Map<Long, MaterializedIndex> indexMap = new HashMap<>();
-        indexMap.put(baseIndexId, baseIndex);
-
-        // create rollup index if has
-        for (long indexId : indexIdToMeta.keySet()) {
-            if (indexId == baseIndexId) {
-                continue;
-            }
-
-            MaterializedIndex rollup = new MaterializedIndex(indexId, IndexState.NORMAL);
-            indexMap.put(indexId, rollup);
-        }
-
-        // version and version hash
-        if (versionInfo != null) {
-            partition.updateVisibleVersion(versionInfo);
-        }
-        long version = partition.getVisibleVersion();
-
-        short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
-        for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
-            long indexId = entry.getKey();
-            MaterializedIndex index = entry.getValue();
-            MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
-
-            // create tablets
-            int schemaHash = indexMeta.getSchemaHash();
-            TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
-            createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version,
-                    replicaAlloc, tabletMeta, tabletIdSet);
-
-            boolean ok = false;
-            String errMsg = null;
-
-            // add create replica task for olap
-            short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
-            TStorageType storageType = indexMeta.getStorageType();
-            List<Column> schema = indexMeta.getSchema();
-            KeysType keysType = indexMeta.getKeysType();
-            int totalTaskNum = index.getTablets().size() * totalReplicaNum;
-            MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);
-            AgentBatchTask batchTask = new AgentBatchTask();
-            for (Tablet tablet : index.getTablets()) {
-                long tabletId = tablet.getId();
-                for (Replica replica : tablet.getReplicas()) {
-                    long backendId = replica.getBackendId();
-                    countDownLatch.addMark(backendId, tabletId);
-                    CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
-                            partitionId, indexId, tabletId,
-                            shortKeyColumnCount, schemaHash,
-                            version,
-                            keysType,
-                            storageType, storageMedium,
-                            schema, bfColumns, bfFpp,
-                            countDownLatch,
-                            indexes,
-                            isInMemory,
-                            tabletType,
-                            dataSortInfo,
-                            compressionType);
-                    task.setStorageFormat(storageFormat);
-                    batchTask.addTask(task);
-                    // add to AgentTaskQueue for handling finish report.
-                    // not for resending task
-                    AgentTaskQueue.addTask(task);
-                }
-            }
-            AgentTaskExecutor.submit(batchTask);
-
-            // estimate timeout
-            long timeout = Config.tablet_create_timeout_second * 1000L * totalTaskNum;
-            timeout = Math.min(timeout, Config.max_create_table_timeout_second * 1000);
-            try {
-                ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                LOG.warn("InterruptedException: ", e);
-                ok = false;
-            }
-
-            if (!ok || !countDownLatch.getStatus().ok()) {
-                errMsg = "Failed to create partition[" + partitionName + "]. Timeout.";
-                // clear tasks
-                AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
-
-                if (!countDownLatch.getStatus().ok()) {
-                    errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg();
-                } else {
-                    List<Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks();
-                    // only show at most 3 results
-                    List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 3));
-                    if (!subList.isEmpty()) {
-                        errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList);
-                    }
-                }
-                LOG.warn(errMsg);
-                throw new DdlException(errMsg);
-            }
-
-            if (index.getId() != baseIndexId) {
-                // add rollup index to partition
-                partition.createRollupIndex(index);
-            }
-        } // end for indexMap
-        return partition;
-    }
-
-    // Create olap table and related base index synchronously.
-    private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
-        String tableName = stmt.getTableName();
-        LOG.debug("begin create olap table: {}", tableName);
-
-        // create columns
-        List<Column> baseSchema = stmt.getColumns();
-        validateColumns(baseSchema);
-
-        // create partition info
-        PartitionDesc partitionDesc = stmt.getPartitionDesc();
-        PartitionInfo partitionInfo = null;
-        Map<String, Long> partitionNameToId = Maps.newHashMap();
-        if (partitionDesc != null) {
-            // gen partition id first
-            PartitionDesc partDesc = partitionDesc;
-            for (SinglePartitionDesc desc : partDesc.getSinglePartitionDescs()) {
-                long partitionId = getNextId();
-                partitionNameToId.put(desc.getPartitionName(), partitionId);
-            }
-            partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
-        } else {
-            if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) {
-                throw new DdlException("Only support dynamic partition properties on range partition table");
-            }
-            long partitionId = getNextId();
-            // use table name as single partition name
-            partitionNameToId.put(tableName, partitionId);
-            partitionInfo = new SinglePartitionInfo();
-        }
-
-        // get keys type
-        KeysDesc keysDesc = stmt.getKeysDesc();
-        Preconditions.checkNotNull(keysDesc);
-        KeysType keysType = keysDesc.getKeysType();
-
-        // create distribution info
-        DistributionDesc distributionDesc = stmt.getDistributionDesc();
-        Preconditions.checkNotNull(distributionDesc);
-        DistributionInfo defaultDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
-
-        // calc short key column count
-        short shortKeyColumnCount = Catalog.calcShortKeyColumnCount(baseSchema, stmt.getProperties());
-        LOG.debug("create table[{}] short key column count: {}", tableName, shortKeyColumnCount);
-
-        // indexes
-        TableIndexes indexes = new TableIndexes(stmt.getIndexes());
-
-        // create table
-        long tableId = Catalog.getCurrentCatalog().getNextId();
-        OlapTable olapTable = new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo,
-                defaultDistributionInfo, indexes);
-        olapTable.setComment(stmt.getComment());
-
-        // set base index id
-        long baseIndexId = getNextId();
-        olapTable.setBaseIndexId(baseIndexId);
-
-        // set base index info to table
-        // this should be done before create partition.
-        Map<String, String> properties = stmt.getProperties();
-
-        // get storage format
-        TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2
-        try {
-            storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        olapTable.setStorageFormat(storageFormat);
-
-        // get compression type
-        TCompressionType compressionType = TCompressionType.LZ4;
-        try {
-            compressionType = PropertyAnalyzer.analyzeCompressionType(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        olapTable.setCompressionType(compressionType);
-
-        // check data sort properties
-        DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
-                keysDesc.keysColumnSize(), storageFormat);
-        olapTable.setDataSortInfo(dataSortInfo);
-
-        // analyze bloom filter columns
-        Set<String> bfColumns = null;
-        double bfFpp = 0;
-        try {
-            bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(properties, baseSchema, keysType);
-            if (bfColumns != null && bfColumns.isEmpty()) {
-                bfColumns = null;
-            }
-
-            bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(properties);
-            if (bfColumns != null && bfFpp == 0) {
-                bfFpp = FeConstants.default_bloom_filter_fpp;
-            } else if (bfColumns == null) {
-                bfFpp = 0;
-            }
-
-            olapTable.setBloomFilterInfo(bfColumns, bfFpp);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        // analyze replica allocation
-        ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
-        if (replicaAlloc.isNotSet()) {
-            replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
-        }
-        olapTable.setReplicationAllocation(replicaAlloc);
-
-        // set in memory
-        boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false);
-        olapTable.setIsInMemory(isInMemory);
-
-        // set remote storage
-        String resourceName = PropertyAnalyzer.analyzeRemoteStorageResource(properties);
-        olapTable.setRemoteStorageResource(resourceName);
-
-        TTabletType tabletType;
-        try {
-            tabletType = PropertyAnalyzer.analyzeTabletType(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-
-        if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
-            // if this is an unpartitioned table, we should analyze data property and replication num here.
-            // if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase.
-
-            // use table name as this single partition name
-            long partitionId = partitionNameToId.get(tableName);
-            DataProperty dataProperty = null;
-            try {
-                dataProperty = PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
-                        DataProperty.DEFAULT_DATA_PROPERTY);
-            } catch (AnalysisException e) {
-                throw new DdlException(e.getMessage());
-            }
-            Preconditions.checkNotNull(dataProperty);
-            partitionInfo.setDataProperty(partitionId, dataProperty);
-            partitionInfo.setReplicaAllocation(partitionId, replicaAlloc);
-            partitionInfo.setIsInMemory(partitionId, isInMemory);
-            partitionInfo.setTabletType(partitionId, tabletType);
-        }
-
-        // check colocation properties
-        try {
-            String colocateGroup = PropertyAnalyzer.analyzeColocate(properties);
-            if (colocateGroup != null) {
-                if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
-                    throw new AnalysisException("Random distribution for colocate table is unsupported");
-                }
-                String fullGroupName = db.getId() + "_" + colocateGroup;
-                ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName);
-                if (groupSchema != null) {
-                    // group already exist, check if this table can be added to this group
-                    groupSchema.checkColocateSchema(olapTable);
-                }
-                // add table to this group, if group does not exist, create a new one
-                getColocateTableIndex().addTableToGroup(db.getId(), olapTable, colocateGroup,
-                        null /* generate group id inside */);
-                olapTable.setColocateGroup(colocateGroup);
-            }
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        // get base index storage type. default is COLUMN
-        TStorageType baseIndexStorageType = null;
-        try {
-            baseIndexStorageType = PropertyAnalyzer.analyzeStorageType(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        Preconditions.checkNotNull(baseIndexStorageType);
-        // set base index meta
-        int schemaVersion = 0;
-        try {
-            schemaVersion = PropertyAnalyzer.analyzeSchemaVersion(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        int schemaHash = Util.generateSchemaHash();
-        olapTable.setIndexMeta(baseIndexId, tableName, baseSchema, schemaVersion, schemaHash,
-                shortKeyColumnCount, baseIndexStorageType, keysType);
-
-        for (AlterClause alterClause : stmt.getRollupAlterClauseList()) {
-            AddRollupClause addRollupClause = (AddRollupClause) alterClause;
-
-            Long baseRollupIndex = olapTable.getIndexIdByName(tableName);
-
-            // get storage type for rollup index
-            TStorageType rollupIndexStorageType = null;
-            try {
-                rollupIndexStorageType = PropertyAnalyzer.analyzeStorageType(addRollupClause.getProperties());
-            } catch (AnalysisException e) {
-                throw new DdlException(e.getMessage());
-            }
-            Preconditions.checkNotNull(rollupIndexStorageType);
-            // set rollup index meta to olap table
-            List<Column> rollupColumns = getMaterializedViewHandler().checkAndPrepareMaterializedView(addRollupClause,
-                    olapTable, baseRollupIndex, false);
-            short rollupShortKeyColumnCount = Catalog.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties());
-            int rollupSchemaHash = Util.generateSchemaHash();
-            long rollupIndexId = getCurrentCatalog().getNextId();
-            olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion,
-                    rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType);
-        }
-
-        // analyse sequence column
-        Type sequenceColType = null;
-        try {
-            sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType());
-            if (sequenceColType != null) {
-                olapTable.setSequenceInfo(sequenceColType);
-            }
-        } catch (Exception e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        // analyze version info
-        Long versionInfo = null;
-        try {
-            versionInfo = PropertyAnalyzer.analyzeVersionInfo(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        Preconditions.checkNotNull(versionInfo);
-
-        // a set to record every new tablet created when create table
-        // if failed in any step, use this set to do clear things
-        Set<Long> tabletIdSet = new HashSet<>();
-        // create partition
-        try {
-            if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
-                // this is a 1-level partitioned table
-                // use table name as partition name
-                DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
-                String partitionName = tableName;
-                long partitionId = partitionNameToId.get(partitionName);
-
-                // check replica quota if this operation done
-                long indexNum = olapTable.getIndexIdToMeta().size();
-                long bucketNum = partitionDistributionInfo.getBucketNum();
-                long replicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
-                long totalReplicaNum = indexNum * bucketNum * replicaNum;
-                if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
-                    throw new DdlException("Database " + db.getFullName() + " create unpartitioned table "
-                            + tableName + " increasing " + totalReplicaNum
-                            + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
-                }
-                // create partition
-                Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
-                        olapTable.getId(), olapTable.getBaseIndexId(),
-                        partitionId, partitionName,
-                        olapTable.getIndexIdToMeta(),
-                        partitionDistributionInfo,
-                        partitionInfo.getDataProperty(partitionId).getStorageMedium(),
-                        partitionInfo.getReplicaAllocation(partitionId),
-                        versionInfo, bfColumns, bfFpp,
-                        tabletIdSet, olapTable.getCopiedIndexes(),
-                        isInMemory, storageFormat, tabletType, compressionType, olapTable.getDataSortInfo());
-                olapTable.addPartition(partition);
-            } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
-                try {
-                    // just for remove entries in stmt.getProperties(),
-                    // and then check if there still has unknown properties
-                    PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_DATA_PROPERTY);
-                    if (partitionInfo.getType() == PartitionType.RANGE) {
-                        DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties);
-
-                    } else if (partitionInfo.getType() == PartitionType.LIST) {
-                        if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
-                            throw new DdlException("Only support dynamic partition properties on range partition table");
-
-                        }
-                    }
-
-                    if (properties != null && !properties.isEmpty()) {
-                        // here, all properties should be checked
-                        throw new DdlException("Unknown properties: " + properties);
-                    }
-                } catch (AnalysisException e) {
-                    throw new DdlException(e.getMessage());
-                }
-
-                // check replica quota if this operation done
-                long totalReplicaNum = 0;
-                for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
-                    long indexNum = olapTable.getIndexIdToMeta().size();
-                    long bucketNum = defaultDistributionInfo.getBucketNum();
-                    long replicaNum = partitionInfo.getReplicaAllocation(entry.getValue()).getTotalReplicaNum();
-                    totalReplicaNum += indexNum * bucketNum * replicaNum;
-                }
-                if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
-                    throw new DdlException("Database " + db.getFullName() + " create table "
-                            + tableName + " increasing " + totalReplicaNum
-                            + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
-                }
-
-                // this is a 2-level partitioned tables
-                for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
-                    DataProperty dataProperty = partitionInfo.getDataProperty(entry.getValue());
-                    DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
-                    Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
-                            olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(),
-                            olapTable.getIndexIdToMeta(), partitionDistributionInfo,
-                            dataProperty.getStorageMedium(),
-                            partitionInfo.getReplicaAllocation(entry.getValue()),
-                            versionInfo, bfColumns, bfFpp,
-                            tabletIdSet, olapTable.getCopiedIndexes(),
-                            isInMemory, storageFormat,
-                            partitionInfo.getTabletType(entry.getValue()),
-                            compressionType, olapTable.getDataSortInfo());
-                    olapTable.addPartition(partition);
-                }
-            } else {
-                throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
-            }
-
-            Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
-            if (!result.first) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-            }
-
-            if (result.second) {
-                if (getColocateTableIndex().isColocateTable(tableId)) {
-                    // if this is a colocate join table, its table id is already added to colocate group
-                    // so we should remove the tableId here
-                    getColocateTableIndex().removeTable(tableId);
-                }
-                for (Long tabletId : tabletIdSet) {
-                    Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
-                }
-                LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
-            } else {
-                // we have added these index to memory, only need to persist here
-                if (getColocateTableIndex().isColocateTable(tableId)) {
-                    GroupId groupId = getColocateTableIndex().getGroup(tableId);
-                    Map<Tag, List<List<Long>>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
-                    ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
-                    editLog.logColocateAddTable(info);
-                }
-                LOG.info("successfully create table[{};{}]", tableName, tableId);
-                // register or remove table from DynamicPartition after table created
-                DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
-                dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
-                        tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
-            }
-        } catch (DdlException e) {
-            for (Long tabletId : tabletIdSet) {
-                Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
-            }
-            // only remove from memory, because we have not persist it
-            if (getColocateTableIndex().isColocateTable(tableId)) {
-                getColocateTableIndex().removeTable(tableId);
-            }
-
-            throw e;
-        }
-    }
-
-    private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
-        String tableName = stmt.getTableName();
-
-        List<Column> columns = stmt.getColumns();
-
-        long tableId = Catalog.getCurrentCatalog().getNextId();
-        MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
-        mysqlTable.setComment(stmt.getComment());
-        if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-        }
-        LOG.info("successfully create table[{}-{}]", tableName, tableId);
-        return;
-    }
-
-    private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
-        String tableName = stmt.getTableName();
-        List<Column> columns = stmt.getColumns();
-
-        long tableId = Catalog.getCurrentCatalog().getNextId();
-        OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
-        odbcTable.setComment(stmt.getComment());
-        if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-        }
-        LOG.info("successfully create table[{}-{}]", tableName, tableId);
-        return;
-    }
-
-    private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException {
-        String tableName = stmt.getTableName();
-
-        // create columns
-        List<Column> baseSchema = stmt.getColumns();
-        validateColumns(baseSchema);
-
-        // create partition info
-        PartitionDesc partitionDesc = stmt.getPartitionDesc();
-        PartitionInfo partitionInfo = null;
-        Map<String, Long> partitionNameToId = Maps.newHashMap();
-        if (partitionDesc != null) {
-            partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
-        } else {
-            long partitionId = getNextId();
-            // use table name as single partition name
-            partitionNameToId.put(tableName, partitionId);
-            partitionInfo = new SinglePartitionInfo();
-        }
-
-        long tableId = Catalog.getCurrentCatalog().getNextId();
-        EsTable esTable = new EsTable(tableId, tableName, baseSchema, stmt.getProperties(), partitionInfo);
-        esTable.setComment(stmt.getComment());
-
-        if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-        }
-        LOG.info("successfully create table{} with id {}", tableName, tableId);
-        return esTable;
-    }
-
-    private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
-        String tableName = stmt.getTableName();
-
-        List<Column> columns = stmt.getColumns();
-
-        long tableId = Catalog.getCurrentCatalog().getNextId();
-        BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, stmt.getProperties());
-        brokerTable.setComment(stmt.getComment());
-        brokerTable.setBrokerProperties(stmt.getExtProperties());
-
-        if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-        }
-        LOG.info("successfully create table[{}-{}]", tableName, tableId);
-
-        return;
-    }
-
-    private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
-        String tableName = stmt.getTableName();
-        List<Column> columns = stmt.getColumns();
-        long tableId = getNextId();
-        HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
-        hiveTable.setComment(stmt.getComment());
-        // check hive table whether exists in hive database
-        HiveMetaStoreClient hiveMetaStoreClient =
-                HiveMetaStoreClientHelper.getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
-        if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, hiveTable.getHiveDb(), hiveTable.getHiveTable())) {
-            throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
-        }
-        // check hive table if exists in doris database
-        if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-        }
-        LOG.info("successfully create table[{}-{}]", tableName, tableId);
-    }
-
-    private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlException {
-        String tableName = stmt.getTableName();
-        List<Column> columns = stmt.getColumns();
-        long tableId = getNextId();
-        HudiTable hudiTable = new HudiTable(tableId, tableName, columns, stmt.getProperties());
-        hudiTable.setComment(stmt.getComment());
-        // check hudi properties in create stmt.
-        HudiUtils.validateCreateTable(hudiTable);
-        // check hudi table whether exists in hive database
-        String metastoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
-        HiveMetaStoreClient hiveMetaStoreClient = HiveMetaStoreClientHelper.getClient(metastoreUris);
-        if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient,
-                hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName())) {
-            throw new DdlException(String.format("Table [%s] dose not exist in Hive Metastore.",
-                    hudiTable.getHmsTableIdentifer()));
-        }
-        org.apache.hadoop.hive.metastore.api.Table hiveTable = HiveMetaStoreClientHelper.getTable(
-                hudiTable.getHmsDatabaseName(),
-                hudiTable.getHmsTableName(),
-                metastoreUris);
-        if (!HudiUtils.isHudiTable(hiveTable)) {
-            throw new DdlException(String.format("Table [%s] is not a hudi table.", hudiTable.getHmsTableIdentifer()));
-        }
-        // after support snapshot query for mor, we should remove the check.
-        if (HudiUtils.isHudiRealtimeTable(hiveTable)) {
-            throw new DdlException(String.format("Can not support hudi realtime table.", hudiTable.getHmsTableName()));
-        }
-        // check table's schema when user specify the schema
-        if (!hudiTable.getFullSchema().isEmpty()) {
-            HudiUtils.validateColumns(hudiTable, hiveTable);
-        }
-        switch (hiveTable.getTableType()) {
-            case "EXTERNAL_TABLE":
-            case "MANAGED_TABLE":
-                break;
-            case "VIRTUAL_VIEW":
-            default:
-                throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "].");
-        }
-        // check hive table if exists in doris database
-        if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-        }
-        LOG.info("successfully create table[{}-{}]", tableName, tableId);
+        getInternalDataSource().replayRecoverPartition(info);
     }
 
     public static void getDdlStmt(Table table, List<String> createTableStmt, List<String> addPartitionStmt,
@@ -4576,306 +3008,48 @@ public class Catalog {
     }
 
     public void replayCreateTable(String dbName, Table table) throws MetaNotFoundException {
-        Database db = this.fullNameToDb.get(dbName);
-        try {
-            db.createTableWithLock(table, true, false);
-        } catch (DdlException e) {
-            throw new MetaNotFoundException(e.getMessage());
-        }
-        if (!isCheckpointThread()) {
-            // add to inverted index
-            if (table.getType() == TableType.OLAP) {
-                TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
-                OlapTable olapTable = (OlapTable) table;
-                long dbId = db.getId();
-                long tableId = table.getId();
-                for (Partition partition : olapTable.getAllPartitions()) {
-                    long partitionId = partition.getId();
-                    TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(
-                            partitionId).getStorageMedium();
-                    for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
-                        long indexId = mIndex.getId();
-                        int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
-                        for (Tablet tablet : mIndex.getTablets()) {
-                            long tabletId = tablet.getId();
-                            invertedIndex.addTablet(tabletId, tabletMeta);
-                            for (Replica replica : tablet.getReplicas()) {
-                                invertedIndex.addReplica(tabletId, replica);
-                            }
-                        }
-                    }
-                } // end for partitions
-                DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable, true);
-            }
-        }
+        getInternalDataSource().replayCreateTable(dbName, table);
     }
 
     public void replayAlterExternalTableSchema(String dbName, String tableName, List<Column> newSchema) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(dbName);
-        Table table = db.getTableOrMetaException(tableName);
-        table.writeLock();
-        try {
-            table.setNewFullSchema(newSchema);
-        } finally {
-            table.writeUnlock();
-        }
-    }
-
-    private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
-                               DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc,
-                               TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
-        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
-        Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
-        GroupId groupId = null;
-        if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
-            if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
-                throw new DdlException("Random distribution for colocate table is unsupported");
-            }
-            // if this is a colocate table, try to get backend seqs from colocation index.
-            groupId = colocateIndex.getGroup(tabletMeta.getTableId());
-            backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId);
-        }
-
-        // chooseBackendsArbitrary is true, means this may be the first table of colocation group,
-        // or this is just a normal table, and we can choose backends arbitrary.
-        // otherwise, backends should be chosen from backendsPerBucketSeq;
-        boolean chooseBackendsArbitrary = backendsPerBucketSeq == null || backendsPerBucketSeq.isEmpty();
-        if (chooseBackendsArbitrary) {
-            backendsPerBucketSeq = Maps.newHashMap();
-        }
-        for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
-            // create a new tablet with random chosen backends
-            Tablet tablet = new Tablet(getNextId());
-
-            // add tablet to inverted index first
-            index.addTablet(tablet, tabletMeta);
-            tabletIdSet.add(tablet.getId());
-
-            // get BackendIds
-            Map<Tag, List<Long>> chosenBackendIds;
-            if (chooseBackendsArbitrary) {
-                // This is the first colocate table in the group, or just a normal table,
-                // randomly choose backends
-                if (!Config.disable_storage_medium_check) {
-                    chosenBackendIds =
-                            getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
-                            tabletMeta.getStorageMedium());
-                } else {
-                    chosenBackendIds =
-                            getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
-                }
-
-                for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
-                    backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
-                    backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
-                }
-            } else {
-                // get backends from existing backend sequence
-                chosenBackendIds = Maps.newHashMap();
-                for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) {
-                    chosenBackendIds.put(entry.getKey(), entry.getValue().get(i));
-                }
-            }
-            // create replicas
-            short totalReplicaNum = (short) 0;
-            for (List<Long> backendIds : chosenBackendIds.values()) {
-                for (long backendId : backendIds) {
-                    long replicaId = getNextId();
-                    Replica replica = new Replica(replicaId, backendId, replicaState, version, tabletMeta.getOldSchemaHash());
-                    tablet.addReplica(replica);
-                    totalReplicaNum++;
-                }
-            }
-            Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum(),
-                    totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum());
-        }
-
-        if (groupId != null && chooseBackendsArbitrary) {
-            colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
-            ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
-            editLog.logColocateBackendsPerBucketSeq(info);
-        }
+        getInternalDataSource().replayAlterExternalTableSchema(dbName, tableName, newSchema);
     }
 
     // Drop table
     public void dropTable(DropTableStmt stmt) throws DdlException {
-        String dbName = stmt.getDbName();
-        String tableName = stmt.getTableName();
-
-        // check database
-        Database db = this.getDbOrDdlException(dbName);
-        db.writeLockOrDdlException();
-        try {
-            Table table = db.getTableNullable(tableName);
-            if (table == null) {
-                if (stmt.isSetIfExists()) {
-                    LOG.info("drop table[{}] which does not exist", tableName);
-                    return;
-                } else {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
-                }
-            }
-            // Check if a view
-            if (stmt.isView()) {
-                if (!(table instanceof View)) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "VIEW");
-                }
-            } else {
-                if (table instanceof View) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "TABLE");
-                }
-            }
-
-            if (!stmt.isForceDrop()) {
-                if (Catalog.getCurrentCatalog().getGlobalTransactionMgr().existCommittedTxns(db.getId(), table.getId(), null)) {
-                    throw new DdlException("There are still some transactions in the COMMITTED state waiting to be completed. "
-                            + "The table [" + tableName + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
-                            + " please use \"DROP table FORCE\".");
-                }
-            }
-            DropInfo info = new DropInfo(db.getId(), table.getId(), -1L, stmt.isForceDrop());
-            table.writeLock();
-            try {
-                if (table instanceof OlapTable && !stmt.isForceDrop()) {
-                    OlapTable olapTable = (OlapTable) table;
-                    if ((olapTable.getState() != OlapTableState.NORMAL)) {
-                        throw new DdlException("The table [" + tableName + "]'s state is " + olapTable.getState() + ", cannot be dropped."
-                                + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered),"
-                                + " please use \"DROP table FORCE\".");
-                    }
-                }
-                unprotectDropTable(db, table, stmt.isForceDrop(), false);
-            } finally {
-                table.writeUnlock();
-            }
-            editLog.logDropTable(info);
-        } finally {
-            db.writeUnlock();
-        }
-        LOG.info("finished dropping table: {} from db: {}, is force: {}", tableName, dbName, stmt.isForceDrop());
+        getInternalDataSource().dropTable(stmt);
     }
 
     public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay) {
-        if (table.getType() == TableType.ELASTICSEARCH) {
-            esRepository.deRegisterTable(table.getId());
-        } else if (table.getType() == TableType.OLAP) {
-            // drop all temp partitions of this table, so that there is no temp partitions in recycle bin,
-            // which make things easier.
-            ((OlapTable) table).dropAllTempPartitions();
-        } else if (table.getType() == TableType.ICEBERG) {
-            // drop Iceberg database table creation record
-            icebergTableCreationRecordMgr.deregisterTable(db, (IcebergTable) table);
-        }
-
-        db.dropTable(table.getName());
-        if (!isForceDrop) {
-            Catalog.getCurrentRecycleBin().recycleTable(db.getId(), table);
-        } else {
-            if (table.getType() == TableType.OLAP) {
-                Catalog.getCurrentCatalog().onEraseOlapTable((OlapTable) table, isReplay);
-            }
-        }
-
-        LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName());
-        return true;
+        return getInternalDataSource().unprotectDropTable(db, table, isForceDrop, isReplay);
     }
 
     public void replayDropTable(Database db, long tableId, boolean isForceDrop) throws MetaNotFoundException {
-        Table table = db.getTableOrMetaException(tableId);
-        db.writeLock();
-        table.writeLock();
-        try {
-            unprotectDropTable(db, table, isForceDrop, true);
-        } finally {
-            table.writeUnlock();
-            db.writeUnlock();
-        }
+        getInternalDataSource().replayDropTable(db, tableId, isForceDrop);
     }
 
     public void replayEraseTable(long tableId) {
-        Catalog.getCurrentRecycleBin().replayEraseTable(tableId);
+        getInternalDataSource().replayEraseTable(tableId);
     }
 
     public void replayRecoverTable(RecoverInfo info) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(info.getDbId());
-        db.writeLock();
-        try {
-            Catalog.getCurrentRecycleBin().replayRecoverTable(db, info.getTableId());
-        } finally {
-            db.writeUnlock();
-        }
-    }
-
-    private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo info) {
-        LOG.debug("replay add a replica {}", info);
-        Partition partition = olapTable.getPartition(info.getPartitionId());
-        MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
-        Tablet tablet = materializedIndex.getTablet(info.getTabletId());
-
-        // for compatibility
-        int schemaHash = info.getSchemaHash();
-        if (schemaHash == -1) {
-            schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId());
-        }
-
-        Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(),
-                schemaHash, info.getDataSize(), info.getRowCount(),
-                ReplicaState.NORMAL,
-                info.getLastFailedVersion(),
-                info.getLastSuccessVersion());
-        tablet.addReplica(replica);
-    }
-
-    private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info) {
-        LOG.debug("replay update a replica {}", info);
-        Partition partition = olapTable.getPartition(info.getPartitionId());
-        MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
-        Tablet tablet = materializedIndex.getTablet(info.getTabletId());
-        Replica replica = tablet.getReplicaByBackendId(info.getBackendId());
-        Preconditions.checkNotNull(replica, info);
-        replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRowCount());
-        replica.setBad(false);
+        getInternalDataSource().replayRecoverTable(info);
     }
 
     public void replayAddReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(info.getDbId());
-        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
-        olapTable.writeLock();
-        try {
-            unprotectAddReplica(olapTable, info);
-        } finally {
-            olapTable.writeUnlock();
-        }
+        getInternalDataSource().replayAddReplica(info);
     }
 
     public void replayUpdateReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(info.getDbId());
-        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
-        olapTable.writeLock();
-        try {
-            unprotectUpdateReplica(olapTable, info);
-        } finally {
-            olapTable.writeUnlock();
-        }
+        getInternalDataSource().replayUpdateReplica(info);
     }
 
     public void unprotectDeleteReplica(OlapTable olapTable, ReplicaPersistInfo info) {
-        Partition partition = olapTable.getPartition(info.getPartitionId());
-        MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
-        Tablet tablet = materializedIndex.getTablet(info.getTabletId());
-        tablet.deleteReplicaByBackendId(info.getBackendId());
+        getInternalDataSource().unprotectDeleteReplica(olapTable, info);
     }
-
-    public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(info.getDbId());
-        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
-        olapTable.writeLock();
-        try {
-            unprotectDeleteReplica(olapTable, info);
-        } finally {
-            olapTable.writeUnlock();
-        }
+
+    public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
+        getInternalDataSource().replayDeleteReplica(info);
     }
 
     public void replayAddFrontend(Frontend fe) {
@@ -4940,26 +3114,12 @@ public class Catalog {
 
     @Nullable
     public Database getDbNullable(String dbName) {
-        if (fullNameToDb.containsKey(dbName)) {
-            return fullNameToDb.get(dbName);
-        } else {
-            // This maybe a information_schema db request, and information_schema db name is case insensitive.
-            // So, we first extract db name to check if it is information_schema.
-            // Then we reassemble the origin cluster name with lower case db name,
-            // and finally get information_schema db from the name map.
-            String fullName = ClusterNamespace.getNameFromFullName(dbName);
-            if (fullName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME)) {
-                String clusterName = ClusterNamespace.getClusterNameFromFullName(dbName);
-                fullName = ClusterNamespace.getFullName(clusterName, fullName.toLowerCase());
-                return fullNameToDb.get(fullName);
-            }
-        }
-        return null;
+        return (Database) getInternalDataSource().getDbNullable(dbName);
     }
 
     @Nullable
     public Database getDbNullable(long dbId) {
-        return idToDb.get(dbId);
+        return (Database) getInternalDataSource().getDbNullable(dbId);
     }
 
     public Optional<Database> getDb(String dbName) {
@@ -4988,33 +3148,33 @@ public class Catalog {
     }
 
     public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
-        return getDbOrException(dbName, s -> new MetaNotFoundException("unknown databases, dbName=" + s,
-                ErrorCode.ERR_BAD_DB_ERROR));
+        return getDbOrException(dbName,
+                s -> new MetaNotFoundException("unknown databases, dbName=" + s, ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
-        return getDbOrException(dbId, s -> new MetaNotFoundException("unknown databases, dbId=" + s,
-                ErrorCode.ERR_BAD_DB_ERROR));
+        return getDbOrException(dbId,
+                s -> new MetaNotFoundException("unknown databases, dbId=" + s, ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     public Database getDbOrDdlException(String dbName) throws DdlException {
-        return getDbOrException(dbName, s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s),
-                ErrorCode.ERR_BAD_DB_ERROR));
+        return getDbOrException(dbName,
+                s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     public Database getDbOrDdlException(long dbId) throws DdlException {
-        return getDbOrException(dbId, s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s),
-                ErrorCode.ERR_BAD_DB_ERROR));
+        return getDbOrException(dbId,
+                s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     public Database getDbOrAnalysisException(String dbName) throws AnalysisException {
-        return getDbOrException(dbName, s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s),
-                ErrorCode.ERR_BAD_DB_ERROR));
+        return getDbOrException(dbName,
+                s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     public Database getDbOrAnalysisException(long dbId) throws AnalysisException {
-        return getDbOrException(dbId, s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s),
-                ErrorCode.ERR_BAD_DB_ERROR));
+        return getDbOrException(dbId,
+                s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     public EditLog getEditLog() {
@@ -5027,19 +3187,15 @@ public class Catalog {
     }
 
     public List<String> getDbNames() {
-        return Lists.newArrayList(fullNameToDb.keySet());
+        return getInternalDataSource().getDbNames();
     }
 
     public List<String> getClusterDbNames(String clusterName) throws AnalysisException {
-        final Cluster cluster = nameToCluster.get(clusterName);
-        if (cluster == null) {
-            throw new AnalysisException("No cluster selected");
-        }
-        return Lists.newArrayList(cluster.getDbNames());
+        return getInternalDataSource().getClusterDbNames(clusterName);
     }
 
     public List<Long> getDbIds() {
-        return Lists.newArrayList(idToDb.keySet());
+        return getInternalDataSource().getDbIds();
     }
 
     public HashMap<Long, TStorageMedium> getPartitionIdToStorageMediumMap() {
@@ -5196,7 +3352,7 @@ public class Catalog {
     }
 
     public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() {
-        return icebergTableCreationRecordMgr;
+        return getInternalDataSource().getIcebergTableCreationRecordMgr();
     }
 
     public MasterTaskExecutor getPendingLoadTaskScheduler() {
@@ -5302,7 +3458,7 @@ public class Catalog {
     }
 
     public EsRepository getEsRepository() {
-        return this.esRepository;
+        return getInternalDataSource().getEsRepository();
     }
 
     public PolicyMgr getPolicyMgr() {
@@ -5997,32 +4153,6 @@ public class Catalog {
         this.alter.getClusterHandler().cancel(stmt);
     }
 
-    /*
-     * generate and check columns' order and key's existence
-     */
-    private void validateColumns(List<Column> columns) throws DdlException {
-        if (columns.isEmpty()) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
-        }
-
-        boolean encounterValue = false;
-        boolean hasKey = false;
-        for (Column column : columns) {
-            if (column.isKey()) {
-                if (encounterValue) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_OLAP_KEY_MUST_BEFORE_VALUE);
-                }
-                hasKey = true;
-            } else {
-                encounterValue = true;
-            }
-        }
-
-        if (!hasKey) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_MUST_HAVE_KEYS);
-        }
-    }
-
     // Change current database of this session.
     public void changeDb(ConnectContext ctx, String qualifiedDb) throws DdlException {
         if (!auth.checkDbPriv(ctx, qualifiedDb, PrivPredicate.SHOW)) {
@@ -6035,12 +4165,7 @@ public class Catalog {
 
     // for test only
     public void clear() {
-        if (SingletonHolder.INSTANCE.idToDb != null) {
-            SingletonHolder.INSTANCE.idToDb.clear();
-        }
-        if (SingletonHolder.INSTANCE.fullNameToDb != null) {
-            SingletonHolder.INSTANCE.fullNameToDb.clear();
-        }
+        getInternalDataSource().clearDbs();
         if (load.getIdToLoadJob() != null) {
             load.getIdToLoadJob().clear();
             // load = null;
@@ -6122,68 +4247,7 @@ public class Catalog {
      * @throws DdlException
      */
     public void createCluster(CreateClusterStmt stmt) throws DdlException {
-        final String clusterName = stmt.getClusterName();
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        try {
-            if (nameToCluster.containsKey(clusterName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_HAS_EXIST, clusterName);
-            } else {
-                List<Long> backendList = systemInfo.createCluster(clusterName, stmt.getInstanceNum());
-                // 1: BE returned is less than requested, throws DdlException.
-                // 2: BE returned is more than or equal to 0, succeeds.
-                if (backendList != null || stmt.getInstanceNum() == 0) {
-                    final long id = getNextId();
-                    final Cluster cluster = new Cluster(clusterName, id);
-                    cluster.setBackendIdList(backendList);
-                    unprotectCreateCluster(cluster);
-                    if (clusterName.equals(SystemInfoService.DEFAULT_CLUSTER)) {
-                        for (Database db : idToDb.values()) {
-                            if (db.getClusterName().equals(SystemInfoService.DEFAULT_CLUSTER)) {
-                                cluster.addDb(db.getFullName(), db.getId());
-                            }
-                        }
-                    }
-                    editLog.logCreateCluster(cluster);
-                    LOG.info("finish to create cluster: {}", clusterName);
-                } else {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BE_NOT_ENOUGH);
-                }
-            }
-        } finally {
-            unlock();
-        }
-
-        // create super user for this cluster
-        UserIdentity adminUser = new UserIdentity(PaloAuth.ADMIN_USER, "%");
-        try {
-            adminUser.analyze(stmt.getClusterName());
-        } catch (AnalysisException e) {
-            LOG.error("should not happen", e);
-        }
-        auth.createUser(new CreateUserStmt(new UserDesc(adminUser, "", true)));
-    }
-
-    private void unprotectCreateCluster(Cluster cluster) {
-        for (Long id : cluster.getBackendIdList()) {
-            final Backend backend = systemInfo.getBackend(id);
-            backend.setOwnerClusterName(cluster.getName());
-            backend.setBackendState(BackendState.using);
-        }
-
-        idToCluster.put(cluster.getId(), cluster);
-        nameToCluster.put(cluster.getName(), cluster);
-
-        // create info schema db
-        final InfoSchemaDb infoDb = new InfoSchemaDb(cluster.getName());
-        infoDb.setClusterName(cluster.getName());
-        unprotectCreateDb(infoDb);
-
-        // only need to create default cluster once.
-        if (cluster.getName().equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
-            isDefaultClusterCreated = true;
-        }
+        getInternalDataSource().createCluster(stmt);
     }
 
     /**
@@ -6192,12 +4256,7 @@ public class Catalog {
      * @param cluster
      */
     public void replayCreateCluster(Cluster cluster) {
-        tryLock(true);
-        try {
-            unprotectCreateCluster(cluster);
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().replayCreateCluster(cluster);
     }
 
     /**
@@ -6207,77 +4266,15 @@ public class Catalog {
      * @throws DdlException
      */
     public void dropCluster(DropClusterStmt stmt) throws DdlException {
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        try {
-            final String clusterName = stmt.getClusterName();
-            final Cluster cluster = nameToCluster.get(clusterName);
-            if (cluster == null) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
-            }
-            final List<Backend> backends = systemInfo.getClusterBackends(clusterName);
-            for (Backend backend : backends) {
-                if (backend.isDecommissioned()) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_IN_DECOMMISSION, clusterName);
-                }
-            }
-
-            // check if there still have databases undropped, except for information_schema db
-            if (cluster.getDbNames().size() > 1) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DELETE_DB_EXIST, clusterName);
-            }
-
-            systemInfo.releaseBackends(clusterName, false /* is not replay */);
-            final ClusterInfo info = new ClusterInfo(clusterName, cluster.getId());
-            unprotectDropCluster(info, false /* is not replay */);
-            editLog.logDropCluster(info);
-        } finally {
-            unlock();
-        }
-
-        // drop user of this cluster
-        // set is replay to true, not write log
-        auth.dropUserOfCluster(stmt.getClusterName(), true /* is replay */);
-    }
-
-    private void unprotectDropCluster(ClusterInfo info, boolean isReplay) {
-        systemInfo.releaseBackends(info.getClusterName(), isReplay);
-        idToCluster.remove(info.getClusterId());
-        nameToCluster.remove(info.getClusterName());
-        final Database infoSchemaDb = fullNameToDb.get(InfoSchemaDb.getFullInfoSchemaDbName(info.getClusterName()));
-        fullNameToDb.remove(infoSchemaDb.getFullName());
-        idToDb.remove(infoSchemaDb.getId());
+        getInternalDataSource().dropCluster(stmt);
     }
 
     public void replayDropCluster(ClusterInfo info) throws DdlException {
-        tryLock(true);
-        try {
-            unprotectDropCluster(info, true/* is replay */);
-        } finally {
-            unlock();
-        }
-
-        auth.dropUserOfCluster(info.getClusterName(), true /* is replay */);
+        getInternalDataSource().replayDropCluster(info);
     }
 
     public void replayExpandCluster(ClusterInfo info) {
-        tryLock(true);
-        try {
-            final Cluster cluster = nameToCluster.get(info.getClusterName());
-            cluster.addBackends(info.getBackendIdList());
-
-            for (Long beId : info.getBackendIdList()) {
-                Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
-                if (be == null) {
-                    continue;
-                }
-                be.setOwnerClusterName(info.getClusterName());
-                be.setBackendState(BackendState.using);
-            }
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().replayExpandCluster(info);
     }
 
     /**
@@ -6287,72 +4284,7 @@ public class Catalog {
      * @throws DdlException
      */
     public void processModifyCluster(AlterClusterStmt stmt) throws UserException {
-        final String clusterName = stmt.getAlterClusterName();
-        final int newInstanceNum = stmt.getInstanceNum();
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        try {
-            Cluster cluster = nameToCluster.get(clusterName);
-            if (cluster == null) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
-            }
-
-            // check if this cluster has backend in decommission
-            final List<Long> backendIdsInCluster = cluster.getBackendIdList();
-            for (Long beId : backendIdsInCluster) {
-                Backend be = systemInfo.getBackend(beId);
-                if (be.isDecommissioned()) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_IN_DECOMMISSION, clusterName);
-                }
-            }
-
-            final int oldInstanceNum = backendIdsInCluster.size();
-            if (newInstanceNum > oldInstanceNum) {
-                // expansion
-                final List<Long> expandBackendIds = systemInfo.calculateExpansionBackends(clusterName,
-                        newInstanceNum - oldInstanceNum);
-                if (expandBackendIds == null) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BE_NOT_ENOUGH);
-                }
-                cluster.addBackends(expandBackendIds);
-                final ClusterInfo info = new ClusterInfo(clusterName, cluster.getId(), expandBackendIds);
-                editLog.logExpandCluster(info);
-            } else if (newInstanceNum < oldInstanceNum) {
-                // shrink
-                final List<Long> decomBackendIds = systemInfo.calculateDecommissionBackends(clusterName,
-                        oldInstanceNum - newInstanceNum);
-                if (decomBackendIds == null || decomBackendIds.size() == 0) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BACKEND_ERROR);
-                }
-
-                List<String> hostPortList = Lists.newArrayList();
-                for (Long id : decomBackendIds) {
-                    final Backend backend = systemInfo.getBackend(id);
-                    hostPortList.add(new StringBuilder().append(backend.getHost()).append(":")
-                            .append(backend.getHeartbeatPort()).toString());
-                }
-
-                // here we reuse the process of decommission backends. but set backend's decommission type to
-                // ClusterDecommission, which means this backend will not be removed from the system
-                // after decommission is done.
-                final DecommissionBackendClause clause = new DecommissionBackendClause(hostPortList);
-                try {
-                    clause.analyze(null);
-                    clause.setType(DecommissionType.ClusterDecommission);
-                    AlterSystemStmt alterStmt = new AlterSystemStmt(clause);
-                    alterStmt.setClusterName(clusterName);
-                    this.alter.processAlterCluster(alterStmt);
-                } catch (AnalysisException e) {
-                    Preconditions.checkState(false, "should not happened: " + e.getMessage());
-                }
-            } else {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_NO_CHANGE, newInstanceNum);
-            }
-
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().processModifyCluster(stmt);
     }
 
     /**
@@ -6361,16 +4293,7 @@ public class Catalog {
      * @throws DdlException
      */
     public void changeCluster(ConnectContext ctx, String clusterName) throws DdlException {
-        if (!Catalog.getCurrentCatalog().getAuth().checkCanEnterCluster(ConnectContext.get(), clusterName)) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_AUTHORITY,
-                    ConnectContext.get().getQualifiedUser(), "enter");
-        }
-
-        if (!nameToCluster.containsKey(clusterName)) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
-        }
-
-        ctx.setCluster(clusterName);
+        getInternalDataSource().changeCluster(ctx, clusterName);
     }
 
     /**
@@ -6380,121 +4303,15 @@ public class Catalog {
      * @throws DdlException
      */
     public void migrateDb(MigrateDbStmt stmt) throws DdlException {
-        final String srcClusterName = stmt.getSrcCluster();
-        final String destClusterName = stmt.getDestCluster();
-        final String srcDbName = stmt.getSrcDb();
-        final String destDbName = stmt.getDestDb();
-
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        try {
-            if (!nameToCluster.containsKey(srcClusterName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NOT_EXIST, srcClusterName);
-            }
-            if (!nameToCluster.containsKey(destClusterName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DEST_CLUSTER_NOT_EXIST, destClusterName);
-            }
-
-            if (srcClusterName.equals(destClusterName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_SAME_CLUSTER);
-            }
-
-            final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
-            if (!srcCluster.containDb(srcDbName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_DB_NOT_EXIST, srcDbName);
-            }
-            final Cluster destCluster = this.nameToCluster.get(destClusterName);
-            if (!destCluster.containLink(destDbName, srcDbName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATION_NO_LINK, srcDbName, destDbName);
-            }
-
-            final Database db = fullNameToDb.get(srcDbName);
-
-            // if the max replication num of the src db is larger then the backends num of the dest cluster,
-            // the migration will not be processed.
-            final int maxReplicationNum = db.getMaxReplicationNum();
-            if (maxReplicationNum > destCluster.getBackendIdList().size()) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_BE_NOT_ENOUGH, destClusterName);
-            }
-
-            if (db.getDbState() == DbState.LINK) {
-                final BaseParam param = new BaseParam();
-                param.addStringParam(destDbName);
-                param.addLongParam(db.getId());
-                param.addStringParam(srcDbName);
-                param.addStringParam(destClusterName);
-                param.addStringParam(srcClusterName);
-                fullNameToDb.remove(db.getFullName());
-                srcCluster.removeDb(db.getFullName(), db.getId());
-                destCluster.removeLinkDb(param);
-                destCluster.addDb(destDbName, db.getId());
-                db.writeLock();
-                try {
-                    db.setDbState(DbState.MOVE);
-                    // set cluster to the dest cluster.
-                    // and Clone process will do the migration things.
-                    db.setClusterName(destClusterName);
-                    db.setName(destDbName);
-                    db.setAttachDb(srcDbName);
-                } finally {
-                    db.writeUnlock();
-                }
-                editLog.logMigrateCluster(param);
-            } else {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATION_NO_LINK, srcDbName, destDbName);
-            }
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().migrateDb(stmt);
     }
 
     public void replayMigrateDb(BaseParam param) {
-        final String desDbName = param.getStringParam();
-        final String srcDbName = param.getStringParam(1);
-        final String desClusterName = param.getStringParam(2);
-        final String srcClusterName = param.getStringParam(3);
-        tryLock(true);
-        try {
-            final Cluster desCluster = this.nameToCluster.get(desClusterName);
-            final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
-            final Database db = fullNameToDb.get(srcDbName);
-            if (db.getDbState() == DbState.LINK) {
-                fullNameToDb.remove(db.getFullName());
-                srcCluster.removeDb(db.getFullName(), db.getId());
-                desCluster.removeLinkDb(param);
-                desCluster.addDb(param.getStringParam(), db.getId());
-
-                db.writeLock();
-                db.setName(desDbName);
-                db.setAttachDb(srcDbName);
-                db.setDbState(DbState.MOVE);
-                db.setClusterName(desClusterName);
-                db.writeUnlock();
-            }
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().replayMigrateDb(param);
     }
 
     public void replayLinkDb(BaseParam param) {
-        final String desClusterName = param.getStringParam(2);
-        final String srcDbName = param.getStringParam(1);
-        final String desDbName = param.getStringParam();
-
-        tryLock(true);
-        try {
-            final Cluster desCluster = this.nameToCluster.get(desClusterName);
-            final Database srcDb = fullNameToDb.get(srcDbName);
-            srcDb.writeLock();
-            srcDb.setDbState(DbState.LINK);
-            srcDb.setAttachDb(desDbName);
-            srcDb.writeUnlock();
-            desCluster.addLinkDb(param);
-            fullNameToDb.put(desDbName, srcDb);
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().replayLinkDb(param);
     }
 
     /**
@@ -6504,74 +4321,15 @@ public class Catalog {
      * @throws DdlException
      */
     public void linkDb(LinkDbStmt stmt) throws DdlException {
-        final String srcClusterName = stmt.getSrcCluster();
-        final String destClusterName = stmt.getDestCluster();
-        final String srcDbName = stmt.getSrcDb();
-        final String destDbName = stmt.getDestDb();
-
-        if (!tryLock(false)) {
-            throw new DdlException("Failed to acquire catalog lock. Try again");
-        }
-        try {
-            if (!nameToCluster.containsKey(srcClusterName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NOT_EXIST, srcClusterName);
-            }
-
-            if (!nameToCluster.containsKey(destClusterName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DEST_CLUSTER_NOT_EXIST, destClusterName);
-            }
-
-            if (srcClusterName.equals(destClusterName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_SAME_CLUSTER);
-            }
-
-            if (fullNameToDb.containsKey(destDbName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, destDbName);
-            }
-
-            final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
-            final Cluster destCluster = this.nameToCluster.get(destClusterName);
-
-            if (!srcCluster.containDb(srcDbName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_DB_NOT_EXIST, srcDbName);
-            }
-            final Database srcDb = fullNameToDb.get(srcDbName);
-
-            if (srcDb.getDbState() != DbState.NORMAL) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
-                        ClusterNamespace.getNameFromFullName(srcDbName));
-            }
-
-            srcDb.writeLock();
-            try {
-                srcDb.setDbState(DbState.LINK);
-                srcDb.setAttachDb(destDbName);
-            } finally {
-                srcDb.writeUnlock();
-            }
-
-            final long id = getNextId();
-            final BaseParam param = new BaseParam();
-            param.addStringParam(destDbName);
-            param.addStringParam(srcDbName);
-            param.addLongParam(id);
-            param.addLongParam(srcDb.getId());
-            param.addStringParam(destClusterName);
-            param.addStringParam(srcClusterName);
-            destCluster.addLinkDb(param);
-            fullNameToDb.put(destDbName, srcDb);
-            editLog.logLinkCluster(param);
-        } finally {
-            unlock();
-        }
+        getInternalDataSource().linkDb(stmt);
     }
 
     public Cluster getCluster(String clusterName) {
-        return nameToCluster.get(clusterName);
+        return getInternalDataSource().getCluster(clusterName);
     }
 
     public List<String> getClusterNames() {
-        return new ArrayList<String>(nameToCluster.keySet());
+        return getInternalDataSource().getClusterNames();
     }
 
     /**
@@ -6580,171 +4338,23 @@ public class Catalog {
      * @return
      */
     public Set<BaseParam> getMigrations() {
-        final Set<BaseParam> infos = Sets.newHashSet();
-        for (Database db : fullNameToDb.values()) {
-            db.readLock();
-            try {
-                if (db.getDbState() == DbState.MOVE) {
-                    int tabletTotal = 0;
-                    int tabletQuorum = 0;
-                    final Set<Long> beIds = Sets.newHashSet(systemInfo.getClusterBackendIds(db.getClusterName()));
-                    final Set<String> tableNames = db.getTableNamesWithLock();
-                    for (String tableName : tableNames) {
-
-                        Table table = db.getTableNullable(tableName);
-                        if (table == null || table.getType() != TableType.OLAP) {
-                            continue;
-                        }
-
-                        OlapTable olapTable = (OlapTable) table;
-                        olapTable.readLock();
-                        try {
-                            for (Partition partition : olapTable.getPartitions()) {
-                                ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo()
-                                        .getReplicaAllocation(partition.getId());
-                                short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
-                                for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
-                                    if (materializedIndex.getState() != IndexState.NORMAL) {
-                                        continue;
-                                    }
-                                    for (Tablet tablet : materializedIndex.getTablets()) {
-                                        int replicaNum = 0;
-                                        int quorum = totalReplicaNum / 2 + 1;
-                                        for (Replica replica : tablet.getReplicas()) {
-                                            if (replica.getState() != ReplicaState.CLONE
-                                                    && beIds.contains(replica.getBackendId())) {
-                                                replicaNum++;
-                                            }
-                                        }
-                                        if (replicaNum > quorum) {
-                                            replicaNum = quorum;
-                                        }
-
-                                        tabletQuorum = tabletQuorum + replicaNum;
-                                        tabletTotal = tabletTotal + quorum;
-                                    }
-                                }
-                            }
-                        } finally {
-                            olapTable.readUnlock();
-                        }
-                    }
-                    final BaseParam info = new BaseParam();
-                    info.addStringParam(db.getClusterName());
-                    info.addStringParam(db.getAttachDb());
-                    info.addStringParam(db.getFullName());
-                    final float percentage = tabletTotal > 0 ? (float) tabletQuorum / (float) tabletTotal : 0f;
-                    info.addFloatParam(percentage);
-                    infos.add(info);
-                }
-            } finally {
-                db.readUnlock();
-            }
-        }
-
-        return infos;
+        return getInternalDataSource().getMigrations();
     }
 
     public long loadCluster(DataInputStream dis, long checksum) throws IOException, DdlException {
-        int clusterCount = dis.readInt();
-        checksum ^= clusterCount;
-        for (long i = 0; i < clusterCount; ++i) {
-            final Cluster cluster = Cluster.read(dis);
-            checksum ^= cluster.getId();
-
-            List<Long> latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName());
-            if (latestBackendIds.size() != cluster.getBackendIdList().size()) {
-                LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is "
-                        + cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
-                        + cluster.getBackendIdList().size());
-            }
-            // The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
-            // SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are
-            // for adding BE to some Cluster, but loadCluster is after loadBackend.
-            cluster.setBackendIdList(latestBackendIds);
-
-            String dbName = InfoSchemaDb.getFullInfoSchemaDbName(cluster.getName());
-            InfoSchemaDb db;
-            // Use real Catalog instance to avoid InfoSchemaDb id continuously increment
-            // when checkpoint thread load image.
-            if (Catalog.getServingCatalog().getFullNameToDb().containsKey(dbName)) {
-                db = (InfoSchemaDb) Catalog.getServingCatalog().getFullNameToDb().get(dbName);
-            } else {
-                db = new InfoSchemaDb(cluster.getName());
-                db.setClusterName(cluster.getName());
-            }
-            String errMsg = "InfoSchemaDb id shouldn't larger than 10000, please restart your FE server";
-            // Every time we construct the InfoSchemaDb, which id will increment.
-            // When InfoSchemaDb id larger than 10000 and put it to idToDb,
-            // which may be overwrite the normal db meta in idToDb,
-            // so we ensure InfoSchemaDb id less than 10000.
-            Preconditions.checkState(db.getId() < NEXT_ID_INIT_VALUE, errMsg);
-            idToDb.put(db.getId(), db);
-            fullNameToDb.put(db.getFullName(), db);
-            cluster.addDb(dbName, db.getId());
-            idToCluster.put(cluster.getId(), cluster);
-            nameToCluster.put(cluster.getName(), cluster);
-        }
-        LOG.info("finished replay cluster from image");
-        return checksum;
+        return getInternalDataSource().loadCluster(dis, checksum);
     }
 
     public void initDefaultCluster() {
-        final List<Long> backendList = Lists.newArrayList();
-        final List<Backend> defaultClusterBackends = systemInfo.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
-        for (Backend backend : defaultClusterBackends) {
-            backendList.add(backend.getId());
-        }
-
-        final long id = getNextId();
-        final Cluster cluster = new Cluster(SystemInfoService.DEFAULT_CLUSTER, id);
-
-        // make sure one host hold only one backend.
-        Set<String> beHost = Sets.newHashSet();
-        for (Backend be : defaultClusterBackends) {
-            if (beHost.contains(be.getHost())) {
-                // we can not handle this situation automatically.
-                LOG.error("found more than one backends in same host: {}", be.getHost());
-                System.exit(-1);
-            } else {
-                beHost.add(be.getHost());
-            }
-        }
-
-        // we create default_cluster to meet the need for ease of use, because
-        // most users have no multi tenant needs.
-        cluster.setBackendIdList(backendList);
-        unprotectCreateCluster(cluster);
-        for (Database db : idToDb.values()) {
-            db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
-            cluster.addDb(db.getFullName(), db.getId());
-        }
-
-        // no matter default_cluster is created or not,
-        // mark isDefaultClusterCreated as true
-        isDefaultClusterCreated = true;
-        editLog.logCreateCluster(cluster);
+        getInternalDataSource().initDefaultCluster();
     }
 
     public void replayUpdateDb(DatabaseInfo info) {
-        final Database db = fullNameToDb.get(info.getDbName());
-        db.setClusterName(info.getClusterName());
-        db.setDbState(info.getDbState());
+        getInternalDataSource().replayUpdateDb(info);
     }
 
     public long saveCluster(CountingDataOutputStream dos, long checksum) throws IOException {
-        final int clusterCount = idToCluster.size();
-        checksum ^= clusterCount;
-        dos.writeInt(clusterCount);
-        for (Map.Entry<Long, Cluster> entry : idToCluster.entrySet()) {
-            long clusterId = entry.getKey();
-            if (clusterId >= NEXT_ID_INIT_VALUE) {
-                checksum ^= clusterId;
-                final Cluster cluster = entry.getValue();
-                cluster.write(dos);
-            }
-        }
-        return checksum;
+        return getInternalDataSource().saveCluster(dos, checksum);
     }
 
     public long saveBrokers(CountingDataOutputStream dos, long checksum) throws IOException {
@@ -6786,14 +4396,7 @@ public class Catalog {
     }
 
     public void replayUpdateClusterAndBackends(BackendIdsUpdateInfo info) {
-        for (long id : info.getBackendList()) {
-            final Backend backend = systemInfo.getBackend(id);
-            final Cluster cluster = nameToCluster.get(backend.getOwnerClusterName());
-            cluster.removeBackend(id);
-            backend.setDecommissioned(false);
-            backend.clearClusterName();
-            backend.setBackendState(BackendState.free);
-        }
+        getInternalDataSource().replayUpdateClusterAndBackends(info);
     }
 
     public String dumpImage() {
@@ -6858,200 +4461,11 @@ public class Catalog {
      *
      */
     public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlException {
-        TableRef tblRef = truncateTableStmt.getTblRef();
-        TableName dbTbl = tblRef.getName();
-
-        // check, and save some info which need to be checked again later
-        Map<String, Long> origPartitions = Maps.newHashMap();
-        Map<Long, DistributionInfo> partitionsDistributionInfo = Maps.newHashMap();
-        OlapTable copiedTbl;
-
-        boolean truncateEntireTable = tblRef.getPartitionNames() == null;
-
-        Database db = this.getDbOrDdlException(dbTbl.getDb());
-        OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
-
-        olapTable.readLock();
-        try {
-            if (olapTable.getState() != OlapTableState.NORMAL) {
-                throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
-            }
-
-            if (!truncateEntireTable) {
-                for (String partName : tblRef.getPartitionNames().getPartitionNames()) {
-                    Partition partition = olapTable.getPartition(partName);
-                    if (partition == null) {
-                        throw new DdlException("Partition " + partName + " does not exist");
-                    }
-                    origPartitions.put(partName, partition.getId());
-                    partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
-                }
-            } else {
-                for (Partition partition : olapTable.getPartitions()) {
-                    origPartitions.put(partition.getName(), partition.getId());
-                    partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
-                }
-            }
-            copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), IndexExtState.VISIBLE, false);
-        } finally {
-            olapTable.readUnlock();
-        }
-
-        // 2. use the copied table to create partitions
-        List<Partition> newPartitions = Lists.newArrayList();
-        // tabletIdSet to save all newly created tablet ids.
-        Set<Long> tabletIdSet = Sets.newHashSet();
-        try {
-            for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
-                // the new partition must use new id
-                // If we still use the old partition id, the behavior of current load jobs on this partition
-                // will be undefined.
-                // By using a new id, load job will be aborted(just like partition is dropped),
-                // which is the right behavior.
-                long oldPartitionId = entry.getValue();
-                long newPartitionId = getNextId();
-                Partition newPartition = createPartitionWithIndices(db.getClusterName(),
-                        db.getId(), copiedTbl.getId(), copiedTbl.getBaseIndexId(),
-                        newPartitionId, entry.getKey(),
-                        copiedTbl.getIndexIdToMeta(),
-                        partitionsDistributionInfo.get(oldPartitionId),
-                        copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).getStorageMedium(),
-                        copiedTbl.getPartitionInfo().getReplicaAllocation(oldPartitionId),
-                        null /* version info */,
-                        copiedTbl.getCopiedBfColumns(),
-                        copiedTbl.getBfFpp(),
-                        tabletIdSet,
-                        copiedTbl.getCopiedIndexes(),
-                        copiedTbl.isInMemory(),
-                        copiedTbl.getStorageFormat(),
-                        copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
-                        copiedTbl.getCompressionType(),
-                        copiedTbl.getDataSortInfo());
-                newPartitions.add(newPartition);
-            }
-        } catch (DdlException e) {
-            // create partition failed, remove all newly created tablets
-            for (Long tabletId : tabletIdSet) {
-                Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
-            }
-            throw e;
-        }
-        Preconditions.checkState(origPartitions.size() == newPartitions.size());
-
-        // all partitions are created successfully, try to replace the old partitions.
-        // before replacing, we need to check again.
-        // Things may be changed outside the table lock.
-        olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId());
-        olapTable.writeLockOrDdlException();
-        try {
-            if (olapTable.getState() != OlapTableState.NORMAL) {
-                throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
-            }
-
-            // check partitions
-            for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
-                Partition partition = copiedTbl.getPartition(entry.getValue());
-                if (partition == null || !partition.getName().equalsIgnoreCase(entry.getKey())) {
-                    throw new DdlException("Partition [" + entry.getKey() + "] is changed");
-                }
-            }
-
-            // check if meta changed
-            // rollup index may be added or dropped, and schema may be changed during creating partition operation.
-            boolean metaChanged = false;
-            if (olapTable.getIndexNameToId().size() != copiedTbl.getIndexNameToId().size()) {
-                metaChanged = true;
-            } else {
-                // compare schemaHash
-                Map<Long, Integer> copiedIndexIdToSchemaHash = copiedTbl.getIndexIdToSchemaHash();
-                for (Map.Entry<Long, Integer> entry : olapTable.getIndexIdToSchemaHash().entrySet()) {
-                    long indexId = entry.getKey();
-                    if (!copiedIndexIdToSchemaHash.containsKey(indexId)) {
-                        metaChanged = true;
-                        break;
-                    }
-                    if (!copiedIndexIdToSchemaHash.get(indexId).equals(entry.getValue())) {
-                        metaChanged = true;
-                        break;
-                    }
-                }
-            }
-
-            if (metaChanged) {
-                throw new DdlException("Table[" + copiedTbl.getName() + "]'s meta has been changed. try again.");
-            }
-
-            // replace
-            truncateTableInternal(olapTable, newPartitions, truncateEntireTable);
-
-            // write edit log
-            TruncateTableInfo info = new TruncateTableInfo(db.getId(), olapTable.getId(), newPartitions,
-                    truncateEntireTable);
-            editLog.logTruncateTable(info);
-        } finally {
-            olapTable.writeUnlock();
-        }
-
-        LOG.info("finished to truncate table {}, partitions: {}",
-                tblRef.getName().toSql(), tblRef.getPartitionNames());
-    }
-
-    private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions, boolean isEntireTable) {
-        // use new partitions to replace the old ones.
-        Set<Long> oldTabletIds = Sets.newHashSet();
-        for (Partition newPartition : newPartitions) {
-            Partition oldPartition = olapTable.replacePartition(newPartition);
-            // save old tablets to be removed
-            for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) {
-                index.getTablets().stream().forEach(t -> {
-                    oldTabletIds.add(t.getId());
-                });
-            }
-        }
-
-        if (isEntireTable) {
-            // drop all temp partitions
-            olapTable.dropAllTempPartitions();
-        }
-
-        // remove the tablets in old partitions
-        for (Long tabletId : oldTabletIds) {
-            Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
-        }
+        getInternalDataSource().truncateTable(truncateTableStmt);
     }
 
     public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException {
-        Database db = this.getDbOrMetaException(info.getDbId());
-        OlapTable olapTable = db.getTableOrMetaException(info.getTblId(), TableType.OLAP);
-        olapTable.writeLock();
-        try {
-            truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable());
-
-            if (!Catalog.isCheckpointThread()) {
-                // add tablet to inverted index
-                TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
-                for (Partition partition : info.getPartitions()) {
-                    long partitionId = partition.getId();
-                    TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(
-                            partitionId).getStorageMedium();
-                    for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
-                        long indexId = mIndex.getId();
-                        int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-                        TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(),
-                                partitionId, indexId, schemaHash, medium);
-                        for (Tablet tablet : mIndex.getTablets()) {
-                            long tabletId = tablet.getId();
-                            invertedIndex.addTablet(tabletId, tabletMeta);
-                            for (Replica replica : tablet.getReplicas()) {
-                                invertedIndex.addReplica(tabletId, replica);
-                            }
-                        }
-                    }
-                }
-            }
-        } finally {
-            olapTable.writeUnlock();
-        }
+        getInternalDataSource().replayTruncateTable(info);
     }
 
     public void createFunction(CreateFunctionStmt stmt) throws UserException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 945315d391..bffcb41263 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -71,7 +71,7 @@ import javax.annotation.Nullable;
  * if the table has never been loaded * if the table loading failed on the
  * previous attempt
  */
-public class Database extends MetaObject implements Writable {
+public class Database extends MetaObject implements Writable, DatabaseIf {
     private static final Logger LOG = LogManager.getLogger(Database.class);
 
     private long id;
@@ -314,7 +314,7 @@ public class Database extends MetaObject implements Writable {
         checkReplicaQuota();
     }
 
-    private boolean isTableExist(String tableName) {
+    public boolean isTableExist(String tableName) {
         if (Catalog.isTableNamesCaseInsensitive()) {
             tableName = lowerCaseToTableName.get(tableName.toLowerCase());
             if (tableName == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
index 060f96bcff..a34912150c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
@@ -48,8 +48,7 @@ public interface DatabaseIf {
 
     boolean writeLockIfExist();
 
-    <E extends Exception>
-    void writeLockOrException(E e) throws E;
+    <E extends Exception> void writeLockOrException(E e) throws E;
 
     void writeLockOrDdlException() throws DdlException;
 
@@ -79,23 +78,21 @@ public interface DatabaseIf {
 
     Optional<Table> getTable(long tableId);
 
-    <E extends Exception>
-    Table getTableOrException(String tableName, java.util.function.Function<String, E> e) throws E;
+    <E extends Exception> Table getTableOrException(String tableName, java.util.function.Function<String, E> e)
+            throws E;
 
-    <E extends Exception>
-    Table getTableOrException(long tableId, java.util.function.Function<Long, E> e) throws E;
+    <E extends Exception> Table getTableOrException(long tableId, java.util.function.Function<Long, E> e) throws E;
 
     Table getTableOrMetaException(String tableName) throws MetaNotFoundException;
 
     Table getTableOrMetaException(long tableId) throws MetaNotFoundException;
 
     @SuppressWarnings("unchecked")
-    <T extends Table>
-    T getTableOrMetaException(String tableName, Table.TableType tableType) throws MetaNotFoundException;
+    <T extends Table> T getTableOrMetaException(String tableName, Table.TableType tableType)
+            throws MetaNotFoundException;
 
     @SuppressWarnings("unchecked")
-    <T extends Table>
-    T getTableOrMetaException(long tableId, Table.TableType tableType) throws MetaNotFoundException;
+    <T extends Table> T getTableOrMetaException(long tableId, Table.TableType tableType) throws MetaNotFoundException;
 
     Table getTableOrDdlException(String tableName) throws DdlException;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index e45f384f87..1657e03b1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1646,4 +1646,11 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false, masterOnly = true)
     public static int backend_rpc_timeout_ms = 60000; // 1 min
 
+    /**
+     * Temp config for multi catalog feature.
+     * Should be removed when this feature is ready.
+     */
+    @ConfField(mutable = false, masterOnly = true)
+    public static boolean enable_multi_catalog = false; // 1 min
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
index f285b4d0a5..bbe474f0df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
@@ -79,7 +79,7 @@ public class MetaReader {
             checksum = Catalog.getCurrentSystemInfo().loadBackends(dis, checksum);
             checksum = catalog.loadDb(dis, checksum);
             // ATTN: this should be done after load Db, and before loadAlterJob
-            catalog.recreateTabletInvertIndex();
+            catalog.getInternalDataSource().recreateTabletInvertIndex();
             // rebuild es state state
             catalog.getEsRepository().loadTableFromCatalog();
             checksum = catalog.loadLoadJob(dis, checksum);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java
index 384c6e3099..fa1ba1352b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java
@@ -17,11 +17,12 @@
 
 package org.apache.doris.datasource;
 
-import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 
+import java.util.List;
 import java.util.Optional;
 import javax.annotation.Nullable;
 
@@ -33,32 +34,36 @@ public interface DataSourceIf {
     // Type of this data source
     String getType();
 
+    long getId();
+
     // Name of this data source
     String getName();
 
+    List<String> getDbNames();
+
     @Nullable
-    Database getDbNullable(String dbName);
+    DatabaseIf getDbNullable(String dbName);
 
     @Nullable
-    Database getDbNullable(long dbId);
+    DatabaseIf getDbNullable(long dbId);
 
-    Optional<Database> getDb(String dbName);
+    Optional<DatabaseIf> getDb(String dbName);
 
-    Optional<Database> getDb(long dbId);
+    Optional<DatabaseIf> getDb(long dbId);
 
-    <E extends Exception> Database getDbOrException(String dbName, java.util.function.Function<String, E> e) throws E;
+    <E extends Exception> DatabaseIf getDbOrException(String dbName, java.util.function.Function<String, E> e) throws E;
 
-    <E extends Exception> Database getDbOrException(long dbId, java.util.function.Function<Long, E> e) throws E;
+    <E extends Exception> DatabaseIf getDbOrException(long dbId, java.util.function.Function<Long, E> e) throws E;
 
-    Database getDbOrMetaException(String dbName) throws MetaNotFoundException;
+    DatabaseIf getDbOrMetaException(String dbName) throws MetaNotFoundException;
 
-    Database getDbOrMetaException(long dbId) throws MetaNotFoundException;
+    DatabaseIf getDbOrMetaException(long dbId) throws MetaNotFoundException;
 
-    Database getDbOrDdlException(String dbName) throws DdlException;
+    DatabaseIf getDbOrDdlException(String dbName) throws DdlException;
 
-    Database getDbOrDdlException(long dbId) throws DdlException;
+    DatabaseIf getDbOrDdlException(long dbId) throws DdlException;
 
-    Database getDbOrAnalysisException(String dbName) throws AnalysisException;
+    DatabaseIf getDbOrAnalysisException(String dbName) throws AnalysisException;
 
-    Database getDbOrAnalysisException(long dbId) throws AnalysisException;
+    DatabaseIf getDbOrAnalysisException(long dbId) throws AnalysisException;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
index b735413070..4d5c5be1c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
@@ -17,47 +17,83 @@
 
 package org.apache.doris.datasource;
 
-import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Writable;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Map;
 
 /**
  * DataSourceMgr will loaded all data sources at FE startup,
  * and save them in maps mapping with id and name.
  */
-public class DataSourceMgr {
+public class DataSourceMgr implements Writable {
+    private static final Logger LOG = LogManager.getLogger(DataSourceMgr.class);
 
-    private Map<Long, ExternalDataSource> idToDataSource = Maps.newConcurrentMap();
-    private Map<String, ExternalDataSource> nameToDataSource = Maps.newConcurrentMap();
+    private Map<Long, DataSourceIf> idToDataSource = Maps.newConcurrentMap();
+    private Map<String, DataSourceIf> nameToDataSource = Maps.newConcurrentMap();
+    private DataSourceMgrProperty dsMgrProperty = new DataSourceMgrProperty();
+
+    // Use a separate instance to facilitate access.
+    // internalDataSource still exists in idToDataSource and nameToDataSource
+    private InternalDataSource internalDataSource;
 
     public DataSourceMgr() {
-        loadDataSources();
+        initInternalDataSource();
+    }
+
+    private void initInternalDataSource() {
+        internalDataSource = new InternalDataSource();
+        idToDataSource.put(internalDataSource.getId(), internalDataSource);
+        nameToDataSource.put(internalDataSource.getName(), internalDataSource);
     }
 
-    public void loadDataSources() {
-        // TODO: Actually, we should initialize the data source object where user executing "create catalog" cmd,
-        //       not loaded from config file.
+    private void registerNewDataSource(ExternalDataSource ds) {
+        // TODO
     }
 
-    private void registerDataSource(ExternalDataSource ds) {
-        ds.setId(Catalog.getCurrentCatalog().getNextId());
-        idToDataSource.put(ds.getId(), ds);
-        nameToDataSource.put(ds.getName(), ds);
+    public InternalDataSource getInternalDataSource() {
+        return internalDataSource;
     }
 
-    public <E extends MetaNotFoundException> ExternalDataSource getDataSourceOrException(long id, java.util.function.Function<Long, E> e) throws E {
-        ExternalDataSource ds = idToDataSource.get(id);
+    /**
+     * get data source by id.
+     *
+     * @param id
+     * @param e
+     * @param <E>
+     * @return
+     * @throws E
+     */
+    public <E extends MetaNotFoundException> DataSourceIf getDataSourceOrException(long id,
+            java.util.function.Function<Long, E> e) throws E {
+        DataSourceIf ds = idToDataSource.get(id);
         if (ds == null) {
             throw e.apply(id);
         }
         return ds;
     }
 
-    public <E extends MetaNotFoundException> ExternalDataSource getDataSourceOrException(String name, java.util.function.Function<String, E> e) throws E {
-        ExternalDataSource ds = nameToDataSource.get(name);
+    /**
+     * get data source by name.
+     *
+     * @param name
+     * @param e
+     * @param <E>
+     * @return
+     * @throws E
+     */
+    public <E extends MetaNotFoundException> DataSourceIf getDataSourceOrException(String name,
+            java.util.function.Function<String, E> e) throws E {
+        DataSourceIf ds = nameToDataSource.get(name);
         if (ds == null) {
             throw e.apply(name);
         }
@@ -67,4 +103,52 @@ public class DataSourceMgr {
     public boolean hasDataSource(String name) {
         return nameToDataSource.containsKey(name);
     }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        if (Config.disable_cluster_feature) {
+            return;
+        }
+        Preconditions.checkState(false, "Do not call this until multi catalog feature is ready");
+        int size = idToDataSource.size();
+        if (idToDataSource.get(InternalDataSource.INTERNAL_DS_ID) != null) {
+            // No need to persis internal data source
+            size -= 1;
+        }
+        out.writeInt(size);
+        for (DataSourceIf ds : idToDataSource.values()) {
+            if (ds.getId() == InternalDataSource.INTERNAL_DS_ID) {
+                continue;
+            }
+            ExternalDataSource extDs = (ExternalDataSource) ds;
+            extDs.write(out);
+        }
+        dsMgrProperty.write(out);
+    }
+
+    /**
+     * read from image.
+     *
+     * @param in
+     * @return
+     * @throws IOException
+     */
+    public static DataSourceMgr read(DataInput in) throws IOException {
+        if (Config.disable_cluster_feature) {
+            return null;
+        }
+        DataSourceMgr mgr = new DataSourceMgr();
+        mgr.readFields(in);
+        return mgr;
+    }
+
+    private void readFields(DataInput in) throws IOException {
+        int size = in.readInt();
+        for (int i = 0; i < size; ++i) {
+            ExternalDataSource extDs = ExternalDataSource.read(in);
+            idToDataSource.put(extDs.getId(), extDs);
+            nameToDataSource.put(extDs.getName(), extDs);
+        }
+        dsMgrProperty = DataSourceMgrProperty.read(in);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgrProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgrProperty.java
new file mode 100644
index 0000000000..bcba2d8a35
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgrProperty.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Save the properties of a data source.
+ */
+public class DataSourceMgrProperty implements Writable {
+    @SerializedName(value = "properties")
+    private Map<String, String> properties = Maps.newHashMap();
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static DataSourceMgrProperty read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, DataSourceMgrProperty.class);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
new file mode 100644
index 0000000000..43bc87fb7d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+public class DataSourceProperty implements Writable {
+    @SerializedName(value = "properties")
+    private Map<String, String> properties = Maps.newHashMap();
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static DataSourceProperty read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, DataSourceProperty.class);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
index 983e4dd6fd..37e16708e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
@@ -27,16 +27,6 @@ import java.util.List;
  */
 public class EsExternalDataSource extends ExternalDataSource {
 
-    @Override
-    public String getType() {
-        return "es";
-    }
-
-    @Override
-    public String getName() {
-        return "es";
-    }
-
     @Override
     public List<String> listDatabaseNames(SessionContext ctx) {
         return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
index f75409f30b..a4c6c6390f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
@@ -18,54 +18,39 @@
 package org.apache.doris.datasource;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
 import org.apache.doris.external.ExternalScanRange;
+import org.apache.doris.persist.gson.GsonUtils;
 
-import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 
 /**
  * The abstract class for all types of external data sources.
  */
-public abstract class ExternalDataSource implements DataSourceIf {
-
+public abstract class ExternalDataSource implements DataSourceIf, Writable {
     // Unique id of this data source, will be assigned after data source is loaded.
+    @SerializedName(value = "id")
     private long id;
-
+    @SerializedName(value = "name")
+    private String name;
+    @SerializedName(value = "type")
+    private String type;
     // save properties of this data source, such as hive meta store url.
-    private Map<String, String> properties = Maps.newHashMap();
-
-    private Map<Long, ExternalDatabase> idToDbs = Maps.newConcurrentMap();
-    private Map<String, ExternalDatabase> nameToDbs = Maps.newConcurrentMap();
-
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    public long getId() {
-        return id;
-    }
-
-    public String getProperty(String key) {
-        return properties.get(key);
-    }
-
-    public String getPropertyOrException(String key) throws DataSourceException {
-        if (properties.containsKey(key)) {
-            return properties.get(key);
-        }
-        throw new DataSourceException("Not found property " + key + " in data source " + getName());
-    }
-
+    @SerializedName(value = "dsProperty")
+    private DataSourceProperty dsProperty = new DataSourceProperty();
 
     /**
      * @return names of database in this data source.
@@ -101,75 +86,96 @@ public abstract class ExternalDataSource implements DataSourceIf {
      */
     public abstract List<ExternalScanRange> getExternalScanRanges(SessionContext ctx);
 
+
     @Override
-    public String getType() {
-        return null;
+    public long getId() {
+        return id;
     }
 
     @Override
     public String getName() {
+        return name;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public List<String> getDbNames() {
         return null;
     }
 
     @Nullable
     @Override
-    public Database getDbNullable(String dbName) {
+    public DatabaseIf getDbNullable(String dbName) {
         return null;
     }
 
     @Nullable
     @Override
-    public Database getDbNullable(long dbId) {
+    public DatabaseIf getDbNullable(long dbId) {
         return null;
     }
 
     @Override
-    public Optional<Database> getDb(String dbName) {
+    public Optional<DatabaseIf> getDb(String dbName) {
         return Optional.empty();
     }
 
     @Override
-    public Optional<Database> getDb(long dbId) {
+    public Optional<DatabaseIf> getDb(long dbId) {
         return Optional.empty();
     }
 
     @Override
-    public <E extends Exception> Database getDbOrException(String dbName, Function<String, E> e) throws E {
+    public <E extends Exception> DatabaseIf getDbOrException(String dbName, Function<String, E> e) throws E {
         return null;
     }
 
     @Override
-    public <E extends Exception> Database getDbOrException(long dbId, Function<Long, E> e) throws E {
+    public <E extends Exception> DatabaseIf getDbOrException(long dbId, Function<Long, E> e) throws E {
         return null;
     }
 
     @Override
-    public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
+    public DatabaseIf getDbOrMetaException(String dbName) throws MetaNotFoundException {
         return null;
     }
 
     @Override
-    public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
+    public DatabaseIf getDbOrMetaException(long dbId) throws MetaNotFoundException {
         return null;
     }
 
     @Override
-    public Database getDbOrDdlException(String dbName) throws DdlException {
+    public DatabaseIf getDbOrDdlException(String dbName) throws DdlException {
         return null;
     }
 
     @Override
-    public Database getDbOrDdlException(long dbId) throws DdlException {
+    public DatabaseIf getDbOrDdlException(long dbId) throws DdlException {
         return null;
     }
 
     @Override
-    public Database getDbOrAnalysisException(String dbName) throws AnalysisException {
+    public DatabaseIf getDbOrAnalysisException(String dbName) throws AnalysisException {
         return null;
     }
 
     @Override
-    public Database getDbOrAnalysisException(long dbId) throws AnalysisException {
+    public DatabaseIf getDbOrAnalysisException(long dbId) throws AnalysisException {
         return null;
     }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static ExternalDataSource read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, ExternalDataSource.class);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
index 50aebbacef..8b2ada2946 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
@@ -30,21 +30,9 @@ import java.util.List;
  */
 public class HMSExternalDataSource extends ExternalDataSource {
     private static final Logger LOG = LogManager.getLogger(HMSExternalDataSource.class);
-
     public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
 
     public HMSExternalDataSource() {
-
-    }
-
-    @Override
-    public String getType() {
-        return "hms";
-    }
-
-    @Override
-    public String getName() {
-        return "hms";
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index 17831cacd3..f421c59e9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -17,22 +17,209 @@
 
 package org.apache.doris.datasource;
 
+import org.apache.doris.alter.DecommissionType;
+import org.apache.doris.analysis.AddPartitionClause;
+import org.apache.doris.analysis.AddRollupClause;
+import org.apache.doris.analysis.AlterClause;
+import org.apache.doris.analysis.AlterClusterStmt;
+import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
+import org.apache.doris.analysis.AlterDatabaseQuotaStmt.QuotaType;
+import org.apache.doris.analysis.AlterDatabaseRename;
+import org.apache.doris.analysis.AlterSystemStmt;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.ColumnDef.DefaultValue;
+import org.apache.doris.analysis.CreateClusterStmt;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableAsSelectStmt;
+import org.apache.doris.analysis.CreateTableLikeStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.CreateUserStmt;
+import org.apache.doris.analysis.DataSortInfo;
+import org.apache.doris.analysis.DecommissionBackendClause;
+import org.apache.doris.analysis.DistributionDesc;
+import org.apache.doris.analysis.DropClusterStmt;
+import org.apache.doris.analysis.DropDbStmt;
+import org.apache.doris.analysis.DropPartitionClause;
+import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.HashDistributionDesc;
+import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.LinkDbStmt;
+import org.apache.doris.analysis.MigrateDbStmt;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.RecoverDbStmt;
+import org.apache.doris.analysis.RecoverPartitionStmt;
+import org.apache.doris.analysis.RecoverTableStmt;
+import org.apache.doris.analysis.SinglePartitionDesc;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.analysis.TruncateTableStmt;
+import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.analysis.UserDesc;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.BrokerTable;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.ColocateGroupSchema;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Database.DbState;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.DatabaseProperty;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
+import org.apache.doris.catalog.EsTable;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.HiveTable;
+import org.apache.doris.catalog.IcebergTable;
+import org.apache.doris.catalog.Index;
+import org.apache.doris.catalog.InfoSchemaDb;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.MaterializedIndex.IndexState;
+import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.MysqlTable;
+import org.apache.doris.catalog.OdbcTable;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.OlapTable.OlapTableState;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.RangePartitionItem;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.SinglePartitionInfo;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Table.TableType;
+import org.apache.doris.catalog.TableIndexes;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.View;
+import org.apache.doris.clone.DynamicPartitionScheduler;
+import org.apache.doris.cluster.BaseParam;
+import org.apache.doris.cluster.Cluster;
+import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.CountingDataOutputStream;
+import org.apache.doris.common.util.DynamicPartitionUtil;
+import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.common.util.QueryableReentrantLock;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.external.elasticsearch.EsRepository;
+import org.apache.doris.external.hudi.HudiProperty;
+import org.apache.doris.external.hudi.HudiTable;
+import org.apache.doris.external.hudi.HudiUtils;
+import org.apache.doris.external.iceberg.IcebergCatalogMgr;
+import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.persist.BackendIdsUpdateInfo;
+import org.apache.doris.persist.ClusterInfo;
+import org.apache.doris.persist.ColocatePersistInfo;
+import org.apache.doris.persist.DatabaseInfo;
+import org.apache.doris.persist.DropDbInfo;
+import org.apache.doris.persist.DropInfo;
+import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
+import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.PartitionPersistInfo;
+import org.apache.doris.persist.RecoverInfo;
+import org.apache.doris.persist.ReplicaPersistInfo;
+import org.apache.doris.persist.TruncateTableInfo;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.Backend.BackendState;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.CreateReplicaTask;
+import org.apache.doris.thrift.TCompressionType;
+import org.apache.doris.thrift.TStorageFormat;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.thrift.TStorageType;
+import org.apache.doris.thrift.TTabletType;
+import org.apache.doris.thrift.TTaskType;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.Getter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 /**
  * The Internal data source will manage all self-managed meta object in a Doris cluster.
  * Such as Database, tables, etc.
- * There is only one internal data source in a cluster.
+ * There is only one internal data source in a cluster. And its id is always 0.
  */
 public class InternalDataSource implements DataSourceIf {
+    public static final String INTERNAL_DS_NAME = "__internal";
+    public static final long INTERNAL_DS_ID = 0L;
+
+    private static final Logger LOG = LogManager.getLogger(InternalDataSource.class);
+
+    private QueryableReentrantLock lock = new QueryableReentrantLock(true);
+    private ConcurrentHashMap<Long, Database> idToDb = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, Database> fullNameToDb = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<Long, Cluster> idToCluster = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, Cluster> nameToCluster = new ConcurrentHashMap<>();
+
+    @Getter
+    private EsRepository esRepository = new EsRepository();
+    @Getter
+    private IcebergTableCreationRecordMgr icebergTableCreationRecordMgr = new IcebergTableCreationRecordMgr();
+
+    @Override
+    public long getId() {
+        return 0;
+    }
+
     @Override
     public String getType() {
         return "internal";
@@ -40,68 +227,2988 @@ public class InternalDataSource implements DataSourceIf {
 
     @Override
     public String getName() {
-        return "_internal_";
+        return INTERNAL_DS_NAME;
+    }
+
+
+    @Override
+    public List<String> getDbNames() {
+        return Lists.newArrayList(fullNameToDb.keySet());
     }
 
     @Nullable
     @Override
-    public Database getDbNullable(String dbName) {
+    public DatabaseIf getDbNullable(String dbName) {
+        if (fullNameToDb.containsKey(dbName)) {
+            return fullNameToDb.get(dbName);
+        } else {
+            // This maybe a information_schema db request, and information_schema db name is case insensitive.
+            // So, we first extract db name to check if it is information_schema.
+            // Then we reassemble the origin cluster name with lower case db name,
+            // and finally get information_schema db from the name map.
+            String fullName = ClusterNamespace.getNameFromFullName(dbName);
+            if (fullName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME)) {
+                String clusterName = ClusterNamespace.getClusterNameFromFullName(dbName);
+                fullName = ClusterNamespace.getFullName(clusterName, fullName.toLowerCase());
+                return fullNameToDb.get(fullName);
+            }
+        }
         return null;
     }
 
     @Nullable
     @Override
-    public Database getDbNullable(long dbId) {
-        return null;
+    public DatabaseIf getDbNullable(long dbId) {
+        return idToDb.get(dbId);
     }
 
     @Override
-    public Optional<Database> getDb(String dbName) {
-        return Optional.empty();
+    public Optional<DatabaseIf> getDb(String dbName) {
+        return Optional.ofNullable(getDbNullable(dbName));
     }
 
     @Override
-    public Optional<Database> getDb(long dbId) {
-        return Optional.empty();
+    public Optional<DatabaseIf> getDb(long dbId) {
+        return Optional.ofNullable(getDbNullable(dbId));
     }
 
     @Override
-    public <E extends Exception> Database getDbOrException(String dbName, Function<String, E> e) throws E {
-        return null;
+    public <E extends Exception> DatabaseIf getDbOrException(String dbName, Function<String, E> e) throws E {
+        DatabaseIf db = getDbNullable(dbName);
+        if (db == null) {
+            throw e.apply(dbName);
+        }
+        return db;
     }
 
     @Override
-    public <E extends Exception> Database getDbOrException(long dbId, Function<Long, E> e) throws E {
-        return null;
+    public <E extends Exception> DatabaseIf getDbOrException(long dbId, Function<Long, E> e) throws E {
+        DatabaseIf db = getDbNullable(dbId);
+        if (db == null) {
+            throw e.apply(dbId);
+        }
+        return db;
     }
 
     @Override
-    public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
-        return null;
+    public DatabaseIf getDbOrMetaException(String dbName) throws MetaNotFoundException {
+        return getDbOrException(dbName,
+                s -> new MetaNotFoundException("unknown databases, dbName=" + s, ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     @Override
-    public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
-        return null;
+    public DatabaseIf getDbOrMetaException(long dbId) throws MetaNotFoundException {
+        return getDbOrException(dbId,
+                s -> new MetaNotFoundException("unknown databases, dbId=" + s, ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     @Override
-    public Database getDbOrDdlException(String dbName) throws DdlException {
-        return null;
+    public DatabaseIf getDbOrDdlException(String dbName) throws DdlException {
+        return getDbOrException(dbName,
+                s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     @Override
-    public Database getDbOrDdlException(long dbId) throws DdlException {
-        return null;
+    public DatabaseIf getDbOrDdlException(long dbId) throws DdlException {
+        return getDbOrException(dbId,
+                s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     @Override
-    public Database getDbOrAnalysisException(String dbName) throws AnalysisException {
-        return null;
+    public DatabaseIf getDbOrAnalysisException(String dbName) throws AnalysisException {
+        return getDbOrException(dbName,
+                s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     @Override
-    public Database getDbOrAnalysisException(long dbId) throws AnalysisException {
-        return null;
+    public DatabaseIf getDbOrAnalysisException(long dbId) throws AnalysisException {
+        return getDbOrException(dbId,
+                s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
+    }
+
+    // Use tryLock to avoid potential dead lock
+    private boolean tryLock(boolean mustLock) {
+        while (true) {
+            try {
+                if (!lock.tryLock(Config.catalog_try_lock_timeout_ms, TimeUnit.MILLISECONDS)) {
+                    if (LOG.isDebugEnabled()) {
+                        // to see which thread held this lock for long time.
+                        Thread owner = lock.getOwner();
+                        if (owner != null) {
+                            LOG.debug("catalog lock is held by: {}", Util.dumpThread(owner, 10));
+                        }
+                    }
+
+                    if (mustLock) {
+                        continue;
+                    } else {
+                        return false;
+                    }
+                }
+                return true;
+            } catch (InterruptedException e) {
+                LOG.warn("got exception while getting catalog lock", e);
+                if (mustLock) {
+                    continue;
+                } else {
+                    return lock.isHeldByCurrentThread();
+                }
+            }
+        }
+    }
+
+    public List<Long> getDbIds() {
+        return Lists.newArrayList(idToDb.keySet());
+    }
+
+    private void unlock() {
+        if (lock.isHeldByCurrentThread()) {
+            this.lock.unlock();
+        }
+    }
+
+    /**
+     * create the tablet inverted index from metadata.
+     */
+    public void recreateTabletInvertIndex() {
+        if (Catalog.isCheckpointThread()) {
+            return;
+        }
+
+        // create inverted index
+        TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+        for (Database db : this.fullNameToDb.values()) {
+            long dbId = db.getId();
+            for (Table table : db.getTables()) {
+                if (table.getType() != TableType.OLAP) {
+                    continue;
+                }
+
+                OlapTable olapTable = (OlapTable) table;
+                long tableId = olapTable.getId();
+                Collection<Partition> allPartitions = olapTable.getAllPartitions();
+                for (Partition partition : allPartitions) {
+                    long partitionId = partition.getId();
+                    TStorageMedium medium =
+                            olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
+                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
+                        long indexId = index.getId();
+                        int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
+                        for (Tablet tablet : index.getTablets()) {
+                            long tabletId = tablet.getId();
+                            invertedIndex.addTablet(tabletId, tabletMeta);
+                            for (Replica replica : tablet.getReplicas()) {
+                                invertedIndex.addReplica(tabletId, replica);
+                            }
+                        }
+                    } // end for indices
+                } // end for partitions
+            } // end for tables
+        } // end for dbs
+    }
+
+    /**
+     * Entry of creating a database.
+     *
+     * @param stmt
+     * @throws DdlException
+     */
+    public void createDb(CreateDbStmt stmt) throws DdlException {
+        final String clusterName = stmt.getClusterName();
+        String fullDbName = stmt.getFullDbName();
+        Map<String, String> properties = stmt.getProperties();
+
+        long id = Catalog.getCurrentCatalog().getNextId();
+        Database db = new Database(id, fullDbName);
+        db.setClusterName(clusterName);
+        // check and analyze database properties before create database
+        db.setDbProperties(new DatabaseProperty(properties).checkAndBuildProperties());
+
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        try {
+            if (!nameToCluster.containsKey(clusterName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_SELECT_CLUSTER, clusterName);
+            }
+            if (fullNameToDb.containsKey(fullDbName)) {
+                if (stmt.isSetIfNotExists()) {
+                    LOG.info("create database[{}] which already exists", fullDbName);
+                    return;
+                } else {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, fullDbName);
+                }
+            } else {
+                unprotectCreateDb(db);
+                Catalog.getCurrentCatalog().getEditLog().logCreateDb(db);
+            }
+        } finally {
+            unlock();
+        }
+        LOG.info("createDb dbName = " + fullDbName + ", id = " + id);
+
+        // create tables in iceberg database
+        if (db.getDbProperties().getIcebergProperty().isExist()) {
+            icebergTableCreationRecordMgr.registerDb(db);
+        }
+    }
+
+    /**
+     * For replaying creating database.
+     *
+     * @param db
+     */
+    public void unprotectCreateDb(Database db) {
+        idToDb.put(db.getId(), db);
+        fullNameToDb.put(db.getFullName(), db);
+        final Cluster cluster = nameToCluster.get(db.getClusterName());
+        cluster.addDb(db.getFullName(), db.getId());
+        Catalog.getCurrentGlobalTransactionMgr().addDatabaseTransactionMgr(db.getId());
+    }
+
+    // for test
+    public void addCluster(Cluster cluster) {
+        nameToCluster.put(cluster.getName(), cluster);
+        idToCluster.put(cluster.getId(), cluster);
+    }
+
+    /**
+     * replayCreateDb.
+     *
+     * @param db
+     */
+    public void replayCreateDb(Database db) {
+        tryLock(true);
+        try {
+            unprotectCreateDb(db);
+        } finally {
+            unlock();
+        }
+    }
+
+    public void dropDb(DropDbStmt stmt) throws DdlException {
+        String dbName = stmt.getDbName();
+
+        // 1. check if database exists
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        try {
+            if (!fullNameToDb.containsKey(dbName)) {
+                if (stmt.isSetIfExists()) {
+                    LOG.info("drop database[{}] which does not exist", dbName);
+                    return;
+                } else {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
+                }
+            }
+
+            // 2. drop tables in db
+            Database db = this.fullNameToDb.get(dbName);
+            db.writeLock();
+            try {
+                if (!stmt.isForceDrop()) {
+                    if (Catalog.getCurrentCatalog().getGlobalTransactionMgr()
+                            .existCommittedTxns(db.getId(), null, null)) {
+                        throw new DdlException(
+                                "There are still some transactions in the COMMITTED state waiting to be completed. "
+                                        + "The database [" + dbName
+                                        + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
+                                        + " please use \"DROP database FORCE\".");
+                    }
+                }
+                if (db.getDbState() == DbState.LINK && dbName.equals(db.getAttachDb())) {
+                    // We try to drop a hard link.
+                    final DropLinkDbAndUpdateDbInfo info = new DropLinkDbAndUpdateDbInfo();
+                    fullNameToDb.remove(db.getAttachDb());
+                    db.setDbState(DbState.NORMAL);
+                    info.setUpdateDbState(DbState.NORMAL);
+                    final Cluster cluster =
+                            nameToCluster.get(ClusterNamespace.getClusterNameFromFullName(db.getAttachDb()));
+                    final BaseParam param = new BaseParam();
+                    param.addStringParam(db.getAttachDb());
+                    param.addLongParam(db.getId());
+                    cluster.removeLinkDb(param);
+                    info.setDropDbCluster(cluster.getName());
+                    info.setDropDbId(db.getId());
+                    info.setDropDbName(db.getAttachDb());
+                    Catalog.getCurrentCatalog().getEditLog().logDropLinkDb(info);
+                    return;
+                }
+
+                if (db.getDbState() == DbState.LINK && dbName.equals(db.getFullName())) {
+                    // We try to drop a db which other dbs attach to it,
+                    // which is not allowed.
+                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
+                            ClusterNamespace.getNameFromFullName(dbName));
+                    return;
+                }
+
+                if (dbName.equals(db.getAttachDb()) && db.getDbState() == DbState.MOVE) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
+                            ClusterNamespace.getNameFromFullName(dbName));
+                    return;
+                }
+
+                // save table names for recycling
+                Set<String> tableNames = db.getTableNamesWithLock();
+                List<Table> tableList = db.getTablesOnIdOrder();
+                MetaLockUtils.writeLockTables(tableList);
+                try {
+                    if (!stmt.isForceDrop()) {
+                        for (Table table : tableList) {
+                            if (table.getType() == TableType.OLAP) {
+                                OlapTable olapTable = (OlapTable) table;
+                                if (olapTable.getState() != OlapTableState.NORMAL) {
+                                    throw new DdlException("The table [" + olapTable.getState() + "]'s state is "
+                                            + olapTable.getState() + ", cannot be dropped."
+                                            + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered),"
+                                            + " please use \"DROP table FORCE\".");
+                                }
+                            }
+                        }
+                    }
+                    unprotectDropDb(db, stmt.isForceDrop(), false);
+                } finally {
+                    MetaLockUtils.writeUnlockTables(tableList);
+                }
+
+                if (!stmt.isForceDrop()) {
+                    Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
+                } else {
+                    Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
+                }
+            } finally {
+                db.writeUnlock();
+            }
+
+            // 3. remove db from catalog
+            idToDb.remove(db.getId());
+            fullNameToDb.remove(db.getFullName());
+            final Cluster cluster = nameToCluster.get(db.getClusterName());
+            cluster.removeDb(dbName, db.getId());
+            DropDbInfo info = new DropDbInfo(dbName, stmt.isForceDrop());
+            Catalog.getCurrentCatalog().getEditLog().logDropDb(info);
+        } finally {
+            unlock();
+        }
+
+        LOG.info("finish drop database[{}], is force : {}", dbName, stmt.isForceDrop());
+    }
+
+    public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay) {
+        // drop Iceberg database table creation records
+        if (db.getDbProperties().getIcebergProperty().isExist()) {
+            icebergTableCreationRecordMgr.deregisterDb(db);
+        }
+        for (Table table : db.getTables()) {
+            unprotectDropTable(db, table, isForeDrop, isReplay);
+        }
+        db.markDropped();
+    }
+
+    public void replayDropLinkDb(DropLinkDbAndUpdateDbInfo info) {
+        tryLock(true);
+        try {
+            final Database db = this.fullNameToDb.remove(info.getDropDbName());
+            db.setDbState(info.getUpdateDbState());
+            final Cluster cluster = nameToCluster.get(info.getDropDbCluster());
+            final BaseParam param = new BaseParam();
+            param.addStringParam(db.getAttachDb());
+            param.addLongParam(db.getId());
+            cluster.removeLinkDb(param);
+        } finally {
+            unlock();
+        }
+    }
+
+    public void replayDropDb(String dbName, boolean isForceDrop) throws DdlException {
+        tryLock(true);
+        try {
+            Database db = fullNameToDb.get(dbName);
+            db.writeLock();
+            try {
+                Set<String> tableNames = db.getTableNamesWithLock();
+                List<Table> tableList = db.getTablesOnIdOrder();
+                MetaLockUtils.writeLockTables(tableList);
+                try {
+                    unprotectDropDb(db, isForceDrop, true);
+                } finally {
+                    MetaLockUtils.writeUnlockTables(tableList);
+                }
+                if (!isForceDrop) {
+                    Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
+                } else {
+                    Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
+                }
+            } finally {
+                db.writeUnlock();
+            }
+
+            fullNameToDb.remove(dbName);
+            idToDb.remove(db.getId());
+            final Cluster cluster = nameToCluster.get(db.getClusterName());
+            cluster.removeDb(dbName, db.getId());
+        } finally {
+            unlock();
+        }
+    }
+
+    public void recoverDatabase(RecoverDbStmt recoverStmt) throws DdlException {
+        // check is new db with same name already exist
+        if (getDb(recoverStmt.getDbName()).isPresent()) {
+            throw new DdlException("Database[" + recoverStmt.getDbName() + "] already exist.");
+        }
+
+        Database db = Catalog.getCurrentRecycleBin().recoverDatabase(recoverStmt.getDbName());
+
+        // add db to catalog
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        db.writeLock();
+        List<Table> tableList = db.getTablesOnIdOrder();
+        MetaLockUtils.writeLockTables(tableList);
+        try {
+            if (fullNameToDb.containsKey(db.getFullName())) {
+                throw new DdlException("Database[" + db.getFullName() + "] already exist.");
+                // it's ok that we do not put db back to CatalogRecycleBin
+                // cause this db cannot recover any more
+            }
+
+            fullNameToDb.put(db.getFullName(), db);
+            idToDb.put(db.getId(), db);
+            final Cluster cluster = nameToCluster.get(db.getClusterName());
+            cluster.addDb(db.getFullName(), db.getId());
+
+            // log
+            RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L);
+            Catalog.getCurrentCatalog().getEditLog().logRecoverDb(recoverInfo);
+            db.unmarkDropped();
+        } finally {
+            MetaLockUtils.writeUnlockTables(tableList);
+            db.writeUnlock();
+            unlock();
+        }
+
+        LOG.info("recover database[{}]", db.getId());
+    }
+
+    public void recoverTable(RecoverTableStmt recoverStmt) throws DdlException {
+        String dbName = recoverStmt.getDbName();
+        String tableName = recoverStmt.getTableName();
+
+        Database db = (Database) getDbOrDdlException(dbName);
+        db.writeLockOrDdlException();
+        try {
+            if (db.getTable(tableName).isPresent()) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+            }
+            if (!Catalog.getCurrentRecycleBin().recoverTable(db, tableName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
+            }
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    public void recoverPartition(RecoverPartitionStmt recoverStmt) throws DdlException {
+        String dbName = recoverStmt.getDbName();
+        String tableName = recoverStmt.getTableName();
+
+        Database db = (Database) getDbOrDdlException(dbName);
+        OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
+        olapTable.writeLockOrDdlException();
+        try {
+            String partitionName = recoverStmt.getPartitionName();
+            if (olapTable.getPartition(partitionName) != null) {
+                throw new DdlException("partition[" + partitionName + "] already exist in table[" + tableName + "]");
+            }
+
+            Catalog.getCurrentRecycleBin().recoverPartition(db.getId(), olapTable, partitionName);
+        } finally {
+            olapTable.writeUnlock();
+        }
+    }
+
+    public void replayEraseDatabase(long dbId) throws DdlException {
+        Catalog.getCurrentRecycleBin().replayEraseDatabase(dbId);
+    }
+
+    public void replayRecoverDatabase(RecoverInfo info) {
+        long dbId = info.getDbId();
+        Database db = Catalog.getCurrentRecycleBin().replayRecoverDatabase(dbId);
+
+        // add db to catalog
+        replayCreateDb(db);
+
+        LOG.info("replay recover db[{}]", dbId);
+    }
+
+    public void alterDatabaseQuota(AlterDatabaseQuotaStmt stmt) throws DdlException {
+        String dbName = stmt.getDbName();
+        Database db = (Database) getDbOrDdlException(dbName);
+        QuotaType quotaType = stmt.getQuotaType();
+        db.writeLockOrDdlException();
+        try {
+            if (quotaType == QuotaType.DATA) {
+                db.setDataQuota(stmt.getQuota());
+            } else if (quotaType == QuotaType.REPLICA) {
+                db.setReplicaQuota(stmt.getQuota());
+            }
+            long quota = stmt.getQuota();
+            DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType);
+            Catalog.getCurrentCatalog().getEditLog().logAlterDb(dbInfo);
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    public void replayAlterDatabaseQuota(String dbName, long quota, QuotaType quotaType) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(dbName);
+        db.writeLock();
+        try {
+            if (quotaType == QuotaType.DATA) {
+                db.setDataQuota(quota);
+            } else if (quotaType == QuotaType.REPLICA) {
+                db.setReplicaQuota(quota);
+            }
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    public void renameDatabase(AlterDatabaseRename stmt) throws DdlException {
+        String fullDbName = stmt.getDbName();
+        String newFullDbName = stmt.getNewDbName();
+        String clusterName = stmt.getClusterName();
+
+        if (fullDbName.equals(newFullDbName)) {
+            throw new DdlException("Same database name");
+        }
+
+        Database db = null;
+        Cluster cluster = null;
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        try {
+            cluster = nameToCluster.get(clusterName);
+            if (cluster == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
+            }
+            // check if db exists
+            db = fullNameToDb.get(fullDbName);
+            if (db == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, fullDbName);
+            }
+
+            if (db.getDbState() == DbState.LINK || db.getDbState() == DbState.MOVE) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_RENAME_DB_ERR, fullDbName);
+            }
+            // check if name is already used
+            if (fullNameToDb.get(newFullDbName) != null) {
+                throw new DdlException("Database name[" + newFullDbName + "] is already used");
+            }
+
+            cluster.removeDb(db.getFullName(), db.getId());
+            cluster.addDb(newFullDbName, db.getId());
+            // 1. rename db
+            db.setNameWithLock(newFullDbName);
+
+            // 2. add to meta. check again
+            fullNameToDb.remove(fullDbName);
+            fullNameToDb.put(newFullDbName, db);
+
+            DatabaseInfo dbInfo = new DatabaseInfo(fullDbName, newFullDbName, -1L, QuotaType.NONE);
+            Catalog.getCurrentCatalog().getEditLog().logDatabaseRename(dbInfo);
+        } finally {
+            unlock();
+        }
+
+        LOG.info("rename database[{}] to [{}]", fullDbName, newFullDbName);
+    }
+
+    public void replayRenameDatabase(String dbName, String newDbName) {
+        tryLock(true);
+        try {
+            Database db = fullNameToDb.get(dbName);
+            Cluster cluster = nameToCluster.get(db.getClusterName());
+            cluster.removeDb(db.getFullName(), db.getId());
+            db.setName(newDbName);
+            cluster.addDb(newDbName, db.getId());
+            fullNameToDb.remove(dbName);
+            fullNameToDb.put(newDbName, db);
+        } finally {
+            unlock();
+        }
+
+        LOG.info("replay rename database {} to {}", dbName, newDbName);
+    }
+
+    // Drop table
+    public void dropTable(DropTableStmt stmt) throws DdlException {
+        String dbName = stmt.getDbName();
+        String tableName = stmt.getTableName();
+
+        // check database
+        Database db = (Database) getDbOrDdlException(dbName);
+        db.writeLockOrDdlException();
+        try {
+            Table table = db.getTableNullable(tableName);
+            if (table == null) {
+                if (stmt.isSetIfExists()) {
+                    LOG.info("drop table[{}] which does not exist", tableName);
+                    return;
+                } else {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
+                }
+            }
+            // Check if a view
+            if (stmt.isView()) {
+                if (!(table instanceof View)) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "VIEW");
+                }
+            } else {
+                if (table instanceof View) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "TABLE");
+                }
+            }
+
+            if (!stmt.isForceDrop()) {
+                if (Catalog.getCurrentCatalog().getGlobalTransactionMgr()
+                        .existCommittedTxns(db.getId(), table.getId(), null)) {
+                    throw new DdlException(
+                            "There are still some transactions in the COMMITTED state waiting to be completed. "
+                                    + "The table [" + tableName
+                                    + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
+                                    + " please use \"DROP table FORCE\".");
+                }
+            }
+            DropInfo info = new DropInfo(db.getId(), table.getId(), -1L, stmt.isForceDrop());
+            table.writeLock();
+            try {
+                if (table instanceof OlapTable && !stmt.isForceDrop()) {
+                    OlapTable olapTable = (OlapTable) table;
+                    if ((olapTable.getState() != OlapTableState.NORMAL)) {
+                        throw new DdlException("The table [" + tableName + "]'s state is " + olapTable.getState()
+                                + ", cannot be dropped."
+                                + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered),"
+                                + " please use \"DROP table FORCE\".");
+                    }
+                }
+                unprotectDropTable(db, table, stmt.isForceDrop(), false);
+            } finally {
+                table.writeUnlock();
+            }
+            Catalog.getCurrentCatalog().getEditLog().logDropTable(info);
+        } finally {
+            db.writeUnlock();
+        }
+        LOG.info("finished dropping table: {} from db: {}, is force: {}", tableName, dbName, stmt.isForceDrop());
+    }
+
+    public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay) {
+        if (table.getType() == TableType.ELASTICSEARCH) {
+            esRepository.deRegisterTable(table.getId());
+        } else if (table.getType() == TableType.OLAP) {
+            // drop all temp partitions of this table, so that there is no temp partitions in recycle bin,
+            // which make things easier.
+            ((OlapTable) table).dropAllTempPartitions();
+        } else if (table.getType() == TableType.ICEBERG) {
+            // drop Iceberg database table creation record
+            icebergTableCreationRecordMgr.deregisterTable(db, (IcebergTable) table);
+        }
+
+        db.dropTable(table.getName());
+        if (!isForceDrop) {
+            Catalog.getCurrentRecycleBin().recycleTable(db.getId(), table);
+        } else {
+            if (table.getType() == TableType.OLAP) {
+                Catalog.getCurrentCatalog().onEraseOlapTable((OlapTable) table, isReplay);
+            }
+        }
+
+        LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName());
+        return true;
+    }
+
+    public void replayDropTable(Database db, long tableId, boolean isForceDrop) throws MetaNotFoundException {
+        Table table = db.getTableOrMetaException(tableId);
+        db.writeLock();
+        table.writeLock();
+        try {
+            unprotectDropTable(db, table, isForceDrop, true);
+        } finally {
+            table.writeUnlock();
+            db.writeUnlock();
+        }
+    }
+
+    public void replayEraseTable(long tableId) {
+        Catalog.getCurrentRecycleBin().replayEraseTable(tableId);
+    }
+
+    public void replayRecoverTable(RecoverInfo info) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(info.getDbId());
+        db.writeLock();
+        try {
+            Catalog.getCurrentRecycleBin().replayRecoverTable(db, info.getTableId());
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo info) {
+        LOG.debug("replay add a replica {}", info);
+        Partition partition = olapTable.getPartition(info.getPartitionId());
+        MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
+        Tablet tablet = materializedIndex.getTablet(info.getTabletId());
+
+        // for compatibility
+        int schemaHash = info.getSchemaHash();
+        if (schemaHash == -1) {
+            schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId());
+        }
+
+        Replica replica =
+                new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash, info.getDataSize(),
+                        info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
+                        info.getLastSuccessVersion());
+        tablet.addReplica(replica);
+    }
+
+    private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info) {
+        LOG.debug("replay update a replica {}", info);
+        Partition partition = olapTable.getPartition(info.getPartitionId());
+        MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
+        Tablet tablet = materializedIndex.getTablet(info.getTabletId());
+        Replica replica = tablet.getReplicaByBackendId(info.getBackendId());
+        Preconditions.checkNotNull(replica, info);
+        replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRowCount());
+        replica.setBad(false);
+    }
+
+    public void replayAddReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(info.getDbId());
+        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+        olapTable.writeLock();
+        try {
+            unprotectAddReplica(olapTable, info);
+        } finally {
+            olapTable.writeUnlock();
+        }
+    }
+
+    public void replayUpdateReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(info.getDbId());
+        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+        olapTable.writeLock();
+        try {
+            unprotectUpdateReplica(olapTable, info);
+        } finally {
+            olapTable.writeUnlock();
+        }
+    }
+
+    public void unprotectDeleteReplica(OlapTable olapTable, ReplicaPersistInfo info) {
+        Partition partition = olapTable.getPartition(info.getPartitionId());
+        MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
+        Tablet tablet = materializedIndex.getTablet(info.getTabletId());
+        tablet.deleteReplicaByBackendId(info.getBackendId());
+    }
+
+    public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(info.getDbId());
+        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+        olapTable.writeLock();
+        try {
+            unprotectDeleteReplica(olapTable, info);
+        } finally {
+            olapTable.writeUnlock();
+        }
+    }
+
+    /**
+     * Following is the step to create an olap table:
+     * 1. create columns
+     * 2. create partition info
+     * 3. create distribution info
+     * 4. set table id and base index id
+     * 5. set bloom filter columns
+     * 6. set and build TableProperty includes:
+     * 6.1. dynamicProperty
+     * 6.2. replicationNum
+     * 6.3. inMemory
+     * 6.4. storageFormat
+     * 6.5. compressionType
+     * 7. set index meta
+     * 8. check colocation properties
+     * 9. create tablet in BE
+     * 10. add this table to FE's meta
+     * 11. add this table to ColocateGroup if necessary
+     */
+    public void createTable(CreateTableStmt stmt) throws UserException {
+        String engineName = stmt.getEngineName();
+        String dbName = stmt.getDbName();
+        String tableName = stmt.getTableName();
+
+        // check if db exists
+        Database db = (Database) getDbOrDdlException(dbName);
+
+        // only internal table should check quota and cluster capacity
+        if (!stmt.isExternal()) {
+            // check cluster capacity
+            Catalog.getCurrentSystemInfo().checkClusterCapacity(stmt.getClusterName());
+            // check db quota
+            db.checkQuota();
+        }
+
+        // check if table exists in db
+        if (db.getTable(tableName).isPresent()) {
+            if (stmt.isSetIfNotExists()) {
+                LOG.info("create table[{}] which already exists", tableName);
+                return;
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+            }
+        }
+
+        if (engineName.equals("olap")) {
+            createOlapTable(db, stmt);
+            return;
+        } else if (engineName.equals("odbc")) {
+            createOdbcTable(db, stmt);
+            return;
+        } else if (engineName.equals("mysql")) {
+            createMysqlTable(db, stmt);
+            return;
+        } else if (engineName.equals("broker")) {
+            createBrokerTable(db, stmt);
+            return;
+        } else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
+            createEsTable(db, stmt);
+            return;
+        } else if (engineName.equalsIgnoreCase("hive")) {
+            createHiveTable(db, stmt);
+            return;
+        } else if (engineName.equalsIgnoreCase("iceberg")) {
+            IcebergCatalogMgr.createIcebergTable(db, stmt);
+            return;
+        } else if (engineName.equalsIgnoreCase("hudi")) {
+            createHudiTable(db, stmt);
+            return;
+        } else {
+            ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
+        }
+        Preconditions.checkState(false);
+    }
+
+    public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
+        try {
+            Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getExistedDbName());
+            Table table = db.getTableOrDdlException(stmt.getExistedTableName());
+
+            if (table.getType() == TableType.VIEW) {
+                throw new DdlException("Not support create table from a View");
+            }
+
+            List<String> createTableStmt = Lists.newArrayList();
+            table.readLock();
+            try {
+                if (table.getType() == TableType.OLAP) {
+                    if (!CollectionUtils.isEmpty(stmt.getRollupNames())) {
+                        OlapTable olapTable = (OlapTable) table;
+                        for (String rollupIndexName : stmt.getRollupNames()) {
+                            if (!olapTable.hasMaterializedIndex(rollupIndexName)) {
+                                throw new DdlException("Rollup index[" + rollupIndexName + "] not exists in Table["
+                                        + olapTable.getName() + "]");
+                            }
+                        }
+                    }
+                } else if (!CollectionUtils.isEmpty(stmt.getRollupNames()) || stmt.isWithAllRollup()) {
+                    throw new DdlException("Table[" + table.getName() + "] is external, not support rollup copy");
+                }
+
+                Catalog.getDdlStmt(stmt, stmt.getDbName(), table, createTableStmt, null, null, false, false, true);
+                if (createTableStmt.isEmpty()) {
+                    ErrorReport.reportDdlException(ErrorCode.ERROR_CREATE_TABLE_LIKE_EMPTY, "CREATE");
+                }
+            } finally {
+                table.readUnlock();
+            }
+            CreateTableStmt parsedCreateTableStmt =
+                    (CreateTableStmt) SqlParserUtils.parseAndAnalyzeStmt(createTableStmt.get(0), ConnectContext.get());
+            parsedCreateTableStmt.setTableName(stmt.getTableName());
+            parsedCreateTableStmt.setIfNotExists(stmt.isIfNotExists());
+            createTable(parsedCreateTableStmt);
+        } catch (UserException e) {
+            throw new DdlException("Failed to execute CREATE TABLE LIKE " + stmt.getExistedTableName() + ". Reason: "
+                    + e.getMessage());
+        }
+    }
+
+    public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlException {
+        try {
+            List<String> columnNames = stmt.getColumnNames();
+            CreateTableStmt createTableStmt = stmt.getCreateTableStmt();
+            QueryStmt queryStmt = stmt.getQueryStmt();
+            ArrayList<Expr> resultExprs = queryStmt.getResultExprs();
+            ArrayList<String> colLabels = queryStmt.getColLabels();
+            int size = resultExprs.size();
+            // Check columnNames
+            int colNameIndex = 0;
+            for (int i = 0; i < size; ++i) {
+                String name;
+                if (columnNames != null) {
+                    // use custom column names
+                    name = columnNames.get(i);
+                } else {
+                    name = colLabels.get(i);
+                }
+                try {
+                    FeNameFormat.checkColumnName(name);
+                } catch (AnalysisException exception) {
+                    name = "_col" + (colNameIndex++);
+                }
+                TypeDef typeDef;
+                Expr resultExpr = resultExprs.get(i);
+                if (resultExpr.getType().isStringType() && resultExpr.getType().getLength() < 0) {
+                    typeDef = new TypeDef(Type.STRING);
+                } else {
+                    typeDef = new TypeDef(resultExpr.getType());
+                }
+                ColumnDef columnDef;
+                if (resultExpr.getSrcSlotRef() == null) {
+                    columnDef = new ColumnDef(name, typeDef, false, null, true, new DefaultValue(false, null), "");
+                } else {
+                    Column column = resultExpr.getSrcSlotRef().getDesc().getColumn();
+                    boolean setDefault = StringUtils.isNotBlank(column.getDefaultValue());
+                    columnDef = new ColumnDef(name, typeDef, column.isKey(), column.getAggregationType(),
+                            column.isAllowNull(), new DefaultValue(setDefault, column.getDefaultValue()),
+                            column.getComment());
+                }
+                createTableStmt.addColumnDef(columnDef);
+                // set first column as default distribution
+                if (createTableStmt.getDistributionDesc() == null && i == 0) {
+                    createTableStmt.setDistributionDesc(new HashDistributionDesc(10, Lists.newArrayList(name)));
+                }
+            }
+            Analyzer dummyRootAnalyzer = new Analyzer(Catalog.getCurrentCatalog(), ConnectContext.get());
+            createTableStmt.analyze(dummyRootAnalyzer);
+            createTable(createTableStmt);
+        } catch (UserException e) {
+            throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
+        }
+    }
+
+    public void replayCreateTable(String dbName, Table table) throws MetaNotFoundException {
+        Database db = this.fullNameToDb.get(dbName);
+        try {
+            db.createTableWithLock(table, true, false);
+        } catch (DdlException e) {
+            throw new MetaNotFoundException(e.getMessage());
+        }
+        if (!Catalog.isCheckpointThread()) {
+            // add to inverted index
+            if (table.getType() == TableType.OLAP) {
+                TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+                OlapTable olapTable = (OlapTable) table;
+                long dbId = db.getId();
+                long tableId = table.getId();
+                for (Partition partition : olapTable.getAllPartitions()) {
+                    long partitionId = partition.getId();
+                    TStorageMedium medium =
+                            olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
+                    for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
+                        long indexId = mIndex.getId();
+                        int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
+                        for (Tablet tablet : mIndex.getTablets()) {
+                            long tabletId = tablet.getId();
+                            invertedIndex.addTablet(tabletId, tabletMeta);
+                            for (Replica replica : tablet.getReplicas()) {
+                                invertedIndex.addReplica(tabletId, replica);
+                            }
+                        }
+                    }
+                } // end for partitions
+                DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable, true);
+            }
+        }
+    }
+
+    public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
+        SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
+        DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
+        boolean isTempPartition = addPartitionClause.isTempPartition();
+
+        DistributionInfo distributionInfo;
+        Map<Long, MaterializedIndexMeta> indexIdToMeta;
+        Set<String> bfColumns;
+        String partitionName = singlePartitionDesc.getPartitionName();
+
+        // check
+        Table table = db.getOlapTableOrDdlException(tableName);
+        // check state
+        OlapTable olapTable = (OlapTable) table;
+
+        olapTable.readLock();
+        try {
+            if (olapTable.getState() != OlapTableState.NORMAL) {
+                throw new DdlException("Table[" + tableName + "]'s state is not NORMAL");
+            }
+
+            // check partition type
+            PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+            if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
+                throw new DdlException("Only support adding partition to range and list partitioned table");
+            }
+
+            // check partition name
+            if (olapTable.checkPartitionNameExist(partitionName)) {
+                if (singlePartitionDesc.isSetIfNotExists()) {
+                    LOG.info("add partition[{}] which already exists", partitionName);
+                    return;
+                } else {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
+                }
+            }
+
+            Map<String, String> properties = singlePartitionDesc.getProperties();
+            // partition properties should inherit table properties
+            ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation();
+            if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM) && !properties.containsKey(
+                    PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
+                properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt());
+            }
+            if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
+                properties.put(PropertyAnalyzer.PROPERTIES_INMEMORY, olapTable.isInMemory().toString());
+            }
+
+            singlePartitionDesc.analyze(partitionInfo.getPartitionColumns().size(), properties);
+            partitionInfo.createAndCheckPartitionItem(singlePartitionDesc, isTempPartition);
+
+            // get distributionInfo
+            List<Column> baseSchema = olapTable.getBaseSchema();
+            DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
+            if (distributionDesc != null) {
+                distributionInfo = distributionDesc.toDistributionInfo(baseSchema);
+                // for now. we only support modify distribution's bucket num
+                if (distributionInfo.getType() != defaultDistributionInfo.getType()) {
+                    throw new DdlException("Cannot assign different distribution type. default is: "
+                            + defaultDistributionInfo.getType());
+                }
+
+                if (distributionInfo.getType() == DistributionInfoType.HASH) {
+                    HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
+                    List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
+                    List<Column> defaultDistriCols =
+                            ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
+                    if (!newDistriCols.equals(defaultDistriCols)) {
+                        throw new DdlException(
+                                "Cannot assign hash distribution with different distribution cols. " + "default is: "
+                                        + defaultDistriCols);
+                    }
+                    if (hashDistributionInfo.getBucketNum() <= 0) {
+                        throw new DdlException("Cannot assign hash distribution buckets less than 1");
+                    }
+                }
+            } else {
+                // make sure partition-dristribution-info is deep copied from default-distribution-info
+                distributionInfo = defaultDistributionInfo.toDistributionDesc().toDistributionInfo(baseSchema);
+            }
+
+            // check colocation
+            if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
+                String fullGroupName = db.getId() + "_" + olapTable.getColocateGroup();
+                ColocateGroupSchema groupSchema = Catalog.getCurrentColocateIndex().getGroupSchema(fullGroupName);
+                Preconditions.checkNotNull(groupSchema);
+                groupSchema.checkDistribution(distributionInfo);
+                groupSchema.checkReplicaAllocation(singlePartitionDesc.getReplicaAlloc());
+            }
+
+            indexIdToMeta = olapTable.getCopiedIndexIdToMeta();
+            bfColumns = olapTable.getCopiedBfColumns();
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        } finally {
+            olapTable.readUnlock();
+        }
+
+        Preconditions.checkNotNull(distributionInfo);
+        Preconditions.checkNotNull(olapTable);
+        Preconditions.checkNotNull(indexIdToMeta);
+
+        // create partition outside db lock
+        DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty();
+        Preconditions.checkNotNull(dataProperty);
+        // check replica quota if this operation done
+        long indexNum = indexIdToMeta.size();
+        long bucketNum = distributionInfo.getBucketNum();
+        long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum();
+        long totalReplicaNum = indexNum * bucketNum * replicaNum;
+        if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
+            throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing "
+                    + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
+        }
+        Set<Long> tabletIdSet = new HashSet<Long>();
+        try {
+            long partitionId = Catalog.getCurrentCatalog().getNextId();
+            Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
+                    olapTable.getBaseIndexId(), partitionId, partitionName, indexIdToMeta, distributionInfo,
+                    dataProperty.getStorageMedium(), singlePartitionDesc.getReplicaAlloc(),
+                    singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet,
+                    olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(),
+                    singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo());
+
+            // check again
+            table = db.getOlapTableOrDdlException(tableName);
+            table.writeLockOrDdlException();
+            try {
+                olapTable = (OlapTable) table;
+                if (olapTable.getState() != OlapTableState.NORMAL) {
+                    throw new DdlException("Table[" + tableName + "]'s state is not NORMAL");
+                }
+
+                // check partition name
+                if (olapTable.checkPartitionNameExist(partitionName)) {
+                    if (singlePartitionDesc.isSetIfNotExists()) {
+                        LOG.info("add partition[{}] which already exists", partitionName);
+                        return;
+                    } else {
+                        ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
+                    }
+                }
+
+                // check if meta changed
+                // rollup index may be added or dropped during add partition operation.
+                // schema may be changed during add partition operation.
+                boolean metaChanged = false;
+                if (olapTable.getIndexNameToId().size() != indexIdToMeta.size()) {
+                    metaChanged = true;
+                } else {
+                    // compare schemaHash
+                    for (Map.Entry<Long, MaterializedIndexMeta> entry : olapTable.getIndexIdToMeta().entrySet()) {
+                        long indexId = entry.getKey();
+                        if (!indexIdToMeta.containsKey(indexId)) {
+                            metaChanged = true;
+                            break;
+                        }
+                        if (indexIdToMeta.get(indexId).getSchemaHash() != entry.getValue().getSchemaHash()) {
+                            metaChanged = true;
+                            break;
+                        }
+                    }
+                }
+
+                if (metaChanged) {
+                    throw new DdlException("Table[" + tableName + "]'s meta has been changed. try again.");
+                }
+
+                // check partition type
+                PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+                if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
+                    throw new DdlException("Only support adding partition to range and list partitioned table");
+                }
+
+                // update partition info
+                partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId, isTempPartition);
+
+                if (isTempPartition) {
+                    olapTable.addTempPartition(partition);
+                } else {
+                    olapTable.addPartition(partition);
+                }
+
+                // log
+                PartitionPersistInfo info = null;
+                if (partitionInfo.getType() == PartitionType.RANGE) {
+                    info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
+                            partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty,
+                            partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId),
+                            isTempPartition);
+                } else if (partitionInfo.getType() == PartitionType.LIST) {
+                    info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
+                            RangePartitionItem.DUMMY_ITEM, partitionInfo.getItem(partitionId), dataProperty,
+                            partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId),
+                            isTempPartition);
+                }
+                Catalog.getCurrentCatalog().getEditLog().logAddPartition(info);
+
+                LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition);
+            } finally {
+                table.writeUnlock();
+            }
+        } catch (DdlException e) {
+            for (Long tabletId : tabletIdSet) {
+                Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+            }
+            throw e;
+        }
+    }
+
+    public void replayAddPartition(PartitionPersistInfo info) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(info.getDbId());
+        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+        olapTable.writeLock();
+        try {
+            Partition partition = info.getPartition();
+            PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+            if (info.isTempPartition()) {
+                olapTable.addTempPartition(partition);
+            } else {
+                olapTable.addPartition(partition);
+            }
+
+            PartitionItem partitionItem = null;
+            if (partitionInfo.getType() == PartitionType.RANGE) {
+                partitionItem = new RangePartitionItem(info.getRange());
+            } else if (partitionInfo.getType() == PartitionType.LIST) {
+                partitionItem = info.getListPartitionItem();
+            }
+
+            partitionInfo.unprotectHandleNewSinglePartitionDesc(partition.getId(), info.isTempPartition(),
+                    partitionItem, info.getDataProperty(), info.getReplicaAlloc(), info.isInMemory());
+
+            if (!Catalog.isCheckpointThread()) {
+                // add to inverted index
+                TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
+                    long indexId = index.getId();
+                    int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+                    TabletMeta tabletMeta =
+                            new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(), index.getId(),
+                                    schemaHash, info.getDataProperty().getStorageMedium());
+                    for (Tablet tablet : index.getTablets()) {
+                        long tabletId = tablet.getId();
+                        invertedIndex.addTablet(tabletId, tabletMeta);
+                        for (Replica replica : tablet.getReplicas()) {
+                            invertedIndex.addReplica(tabletId, replica);
+                        }
+                    }
+                }
+            }
+        } finally {
+            olapTable.writeUnlock();
+        }
+    }
+
+    public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause clause) throws DdlException {
+        Preconditions.checkArgument(olapTable.isWriteLockHeldByCurrentThread());
+
+        String partitionName = clause.getPartitionName();
+        boolean isTempPartition = clause.isTempPartition();
+
+        if (olapTable.getState() != OlapTableState.NORMAL) {
+            throw new DdlException("Table[" + olapTable.getName() + "]'s state is not NORMAL");
+        }
+
+        if (!olapTable.checkPartitionNameExist(partitionName, isTempPartition)) {
+            if (clause.isSetIfExists()) {
+                LOG.info("drop partition[{}] which does not exist", partitionName);
+                return;
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_DROP_PARTITION_NON_EXISTENT, partitionName);
+            }
+        }
+
+        PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+        if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
+            throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table");
+        }
+
+        // drop
+        if (isTempPartition) {
+            olapTable.dropTempPartition(partitionName, true);
+        } else {
+            if (!clause.isForceDrop()) {
+                Partition partition = olapTable.getPartition(partitionName);
+                if (partition != null) {
+                    if (Catalog.getCurrentCatalog().getGlobalTransactionMgr()
+                            .existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) {
+                        throw new DdlException(
+                                "There are still some transactions in the COMMITTED state waiting to be completed."
+                                        + " The partition [" + partitionName
+                                        + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
+                                        + " please use \"DROP partition FORCE\".");
+                    }
+                }
+            }
+            olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop());
+        }
+
+        // log
+        DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition,
+                clause.isForceDrop());
+        Catalog.getCurrentCatalog().getEditLog().logDropPartition(info);
+
+        LOG.info("succeed in dropping partition[{}], is temp : {}, is force : {}", partitionName, isTempPartition,
+                clause.isForceDrop());
+    }
+
+    public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(info.getDbId());
+        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+        olapTable.writeLock();
+        try {
+            if (info.isTempPartition()) {
+                olapTable.dropTempPartition(info.getPartitionName(), true);
+            } else {
+                olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop());
+            }
+        } finally {
+            olapTable.writeUnlock();
+        }
+    }
+
+    public void replayErasePartition(long partitionId) {
+        Catalog.getCurrentRecycleBin().replayErasePartition(partitionId);
+    }
+
+    public void replayRecoverPartition(RecoverInfo info) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(info.getDbId());
+        OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+        olapTable.writeLock();
+        try {
+            Catalog.getCurrentRecycleBin().replayRecoverPartition(olapTable, info.getPartitionId());
+        } finally {
+            olapTable.writeUnlock();
+        }
+    }
+
+    private Partition createPartitionWithIndices(String clusterName, long dbId, long tableId, long baseIndexId,
+            long partitionId, String partitionName, Map<Long, MaterializedIndexMeta> indexIdToMeta,
+            DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
+            Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
+            boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
+            DataSortInfo dataSortInfo) throws DdlException {
+        // create base index first.
+        Preconditions.checkArgument(baseIndexId != -1);
+        MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
+
+        // create partition with base index
+        Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);
+
+        // add to index map
+        Map<Long, MaterializedIndex> indexMap = new HashMap<>();
+        indexMap.put(baseIndexId, baseIndex);
+
+        // create rollup index if has
+        for (long indexId : indexIdToMeta.keySet()) {
+            if (indexId == baseIndexId) {
+                continue;
+            }
+
+            MaterializedIndex rollup = new MaterializedIndex(indexId, IndexState.NORMAL);
+            indexMap.put(indexId, rollup);
+        }
+
+        // version and version hash
+        if (versionInfo != null) {
+            partition.updateVisibleVersion(versionInfo);
+        }
+        long version = partition.getVisibleVersion();
+
+        short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+        for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
+            long indexId = entry.getKey();
+            MaterializedIndex index = entry.getValue();
+            MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+
+            // create tablets
+            int schemaHash = indexMeta.getSchemaHash();
+            TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
+            createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, tabletMeta,
+                    tabletIdSet);
+
+            boolean ok = false;
+            String errMsg = null;
+
+            // add create replica task for olap
+            short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
+            TStorageType storageType = indexMeta.getStorageType();
+            List<Column> schema = indexMeta.getSchema();
+            KeysType keysType = indexMeta.getKeysType();
+            int totalTaskNum = index.getTablets().size() * totalReplicaNum;
+            MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);
+            AgentBatchTask batchTask = new AgentBatchTask();
+            for (Tablet tablet : index.getTablets()) {
+                long tabletId = tablet.getId();
+                for (Replica replica : tablet.getReplicas()) {
+                    long backendId = replica.getBackendId();
+                    countDownLatch.addMark(backendId, tabletId);
+                    CreateReplicaTask task =
+                            new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId,
+                                    shortKeyColumnCount, schemaHash, version, keysType, storageType, storageMedium,
+                                    schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType,
+                                    dataSortInfo, compressionType);
+                    task.setStorageFormat(storageFormat);
+                    batchTask.addTask(task);
+                    // add to AgentTaskQueue for handling finish report.
+                    // not for resending task
+                    AgentTaskQueue.addTask(task);
+                }
+            }
+            AgentTaskExecutor.submit(batchTask);
+
+            // estimate timeout
+            long timeout = Config.tablet_create_timeout_second * 1000L * totalTaskNum;
+            timeout = Math.min(timeout, Config.max_create_table_timeout_second * 1000);
+            try {
+                ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.warn("InterruptedException: ", e);
+                ok = false;
+            }
+
+            if (!ok || !countDownLatch.getStatus().ok()) {
+                errMsg = "Failed to create partition[" + partitionName + "]. Timeout.";
+                // clear tasks
+                AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
+
+                if (!countDownLatch.getStatus().ok()) {
+                    errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg();
+                } else {
+                    List<Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks();
+                    // only show at most 3 results
+                    List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 3));
+                    if (!subList.isEmpty()) {
+                        errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList);
+                    }
+                }
+                LOG.warn(errMsg);
+                throw new DdlException(errMsg);
+            }
+
+            if (index.getId() != baseIndexId) {
+                // add rollup index to partition
+                partition.createRollupIndex(index);
+            }
+        } // end for indexMap
+        return partition;
+    }
+
+    // Create olap table and related base index synchronously.
+    private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
+        String tableName = stmt.getTableName();
+        LOG.debug("begin create olap table: {}", tableName);
+
+        // create columns
+        List<Column> baseSchema = stmt.getColumns();
+        validateColumns(baseSchema);
+
+        // create partition info
+        PartitionDesc partitionDesc = stmt.getPartitionDesc();
+        PartitionInfo partitionInfo = null;
+        Map<String, Long> partitionNameToId = Maps.newHashMap();
+        if (partitionDesc != null) {
+            // gen partition id first
+            PartitionDesc partDesc = partitionDesc;
+            for (SinglePartitionDesc desc : partDesc.getSinglePartitionDescs()) {
+                long partitionId = Catalog.getCurrentCatalog().getNextId();
+                partitionNameToId.put(desc.getPartitionName(), partitionId);
+            }
+            partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
+        } else {
+            if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) {
+                throw new DdlException("Only support dynamic partition properties on range partition table");
+            }
+            long partitionId = Catalog.getCurrentCatalog().getNextId();
+            // use table name as single partition name
+            partitionNameToId.put(tableName, partitionId);
+            partitionInfo = new SinglePartitionInfo();
+        }
+
+        // get keys type
+        KeysDesc keysDesc = stmt.getKeysDesc();
+        Preconditions.checkNotNull(keysDesc);
+        KeysType keysType = keysDesc.getKeysType();
+
+        // create distribution info
+        DistributionDesc distributionDesc = stmt.getDistributionDesc();
+        Preconditions.checkNotNull(distributionDesc);
+        DistributionInfo defaultDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
+
+        // calc short key column count
+        short shortKeyColumnCount = Catalog.calcShortKeyColumnCount(baseSchema, stmt.getProperties());
+        LOG.debug("create table[{}] short key column count: {}", tableName, shortKeyColumnCount);
+
+        // indexes
+        TableIndexes indexes = new TableIndexes(stmt.getIndexes());
+
+        // create table
+        long tableId = Catalog.getCurrentCatalog().getNextId();
+        OlapTable olapTable =
+                new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo, defaultDistributionInfo,
+                        indexes);
+        olapTable.setComment(stmt.getComment());
+
+        // set base index id
+        long baseIndexId = Catalog.getCurrentCatalog().getNextId();
+        olapTable.setBaseIndexId(baseIndexId);
+
+        // set base index info to table
+        // this should be done before create partition.
+        Map<String, String> properties = stmt.getProperties();
+
+        // get storage format
+        TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2
+        try {
+            storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        olapTable.setStorageFormat(storageFormat);
+
+        // get compression type
+        TCompressionType compressionType = TCompressionType.LZ4;
+        try {
+            compressionType = PropertyAnalyzer.analyzeCompressionType(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        olapTable.setCompressionType(compressionType);
+
+        // check data sort properties
+        DataSortInfo dataSortInfo =
+                PropertyAnalyzer.analyzeDataSortInfo(properties, keysType, keysDesc.keysColumnSize(), storageFormat);
+        olapTable.setDataSortInfo(dataSortInfo);
+
+        // analyze bloom filter columns
+        Set<String> bfColumns = null;
+        double bfFpp = 0;
+        try {
+            bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(properties, baseSchema, keysType);
+            if (bfColumns != null && bfColumns.isEmpty()) {
+                bfColumns = null;
+            }
+
+            bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(properties);
+            if (bfColumns != null && bfFpp == 0) {
+                bfFpp = FeConstants.default_bloom_filter_fpp;
+            } else if (bfColumns == null) {
+                bfFpp = 0;
+            }
+
+            olapTable.setBloomFilterInfo(bfColumns, bfFpp);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+
+        // analyze replica allocation
+        ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
+        if (replicaAlloc.isNotSet()) {
+            replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
+        }
+        olapTable.setReplicationAllocation(replicaAlloc);
+
+        // set in memory
+        boolean isInMemory =
+                PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false);
+        olapTable.setIsInMemory(isInMemory);
+
+        // set remote storage
+        String resourceName = PropertyAnalyzer.analyzeRemoteStorageResource(properties);
+        olapTable.setRemoteStorageResource(resourceName);
+
+        TTabletType tabletType;
+        try {
+            tabletType = PropertyAnalyzer.analyzeTabletType(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+
+
+        if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
+            // if this is an unpartitioned table, we should analyze data property and replication num here.
+            // if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase.
+
+            // use table name as this single partition name
+            long partitionId = partitionNameToId.get(tableName);
+            DataProperty dataProperty = null;
+            try {
+                dataProperty =
+                        PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_DATA_PROPERTY);
+            } catch (AnalysisException e) {
+                throw new DdlException(e.getMessage());
+            }
+            Preconditions.checkNotNull(dataProperty);
+            partitionInfo.setDataProperty(partitionId, dataProperty);
+            partitionInfo.setReplicaAllocation(partitionId, replicaAlloc);
+            partitionInfo.setIsInMemory(partitionId, isInMemory);
+            partitionInfo.setTabletType(partitionId, tabletType);
+        }
+
+        // check colocation properties
+        try {
+            String colocateGroup = PropertyAnalyzer.analyzeColocate(properties);
+            if (colocateGroup != null) {
+                if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
+                    throw new AnalysisException("Random distribution for colocate table is unsupported");
+                }
+                String fullGroupName = db.getId() + "_" + colocateGroup;
+                ColocateGroupSchema groupSchema = Catalog.getCurrentColocateIndex().getGroupSchema(fullGroupName);
+                if (groupSchema != null) {
+                    // group already exist, check if this table can be added to this group
+                    groupSchema.checkColocateSchema(olapTable);
+                }
+                // add table to this group, if group does not exist, create a new one
+                Catalog.getCurrentColocateIndex()
+                        .addTableToGroup(db.getId(), olapTable, colocateGroup, null /* generate group id inside */);
+                olapTable.setColocateGroup(colocateGroup);
+            }
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+
+        // get base index storage type. default is COLUMN
+        TStorageType baseIndexStorageType = null;
+        try {
+            baseIndexStorageType = PropertyAnalyzer.analyzeStorageType(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        Preconditions.checkNotNull(baseIndexStorageType);
+        // set base index meta
+        int schemaVersion = 0;
+        try {
+            schemaVersion = PropertyAnalyzer.analyzeSchemaVersion(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        int schemaHash = Util.generateSchemaHash();
+        olapTable.setIndexMeta(baseIndexId, tableName, baseSchema, schemaVersion, schemaHash, shortKeyColumnCount,
+                baseIndexStorageType, keysType);
+
+        for (AlterClause alterClause : stmt.getRollupAlterClauseList()) {
+            AddRollupClause addRollupClause = (AddRollupClause) alterClause;
+
+            Long baseRollupIndex = olapTable.getIndexIdByName(tableName);
+
+            // get storage type for rollup index
+            TStorageType rollupIndexStorageType = null;
+            try {
+                rollupIndexStorageType = PropertyAnalyzer.analyzeStorageType(addRollupClause.getProperties());
+            } catch (AnalysisException e) {
+                throw new DdlException(e.getMessage());
+            }
+            Preconditions.checkNotNull(rollupIndexStorageType);
+            // set rollup index meta to olap table
+            List<Column> rollupColumns = Catalog.getCurrentCatalog().getMaterializedViewHandler()
+                    .checkAndPrepareMaterializedView(addRollupClause, olapTable, baseRollupIndex, false);
+            short rollupShortKeyColumnCount =
+                    Catalog.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties());
+            int rollupSchemaHash = Util.generateSchemaHash();
+            long rollupIndexId = Catalog.getCurrentCatalog().getNextId();
+            olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion,
+                    rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType);
+        }
+
+        // analyse sequence column
+        Type sequenceColType = null;
+        try {
+            sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType());
+            if (sequenceColType != null) {
+                olapTable.setSequenceInfo(sequenceColType);
+            }
+        } catch (Exception e) {
+            throw new DdlException(e.getMessage());
+        }
+
+        // analyze version info
+        Long versionInfo = null;
+        try {
+            versionInfo = PropertyAnalyzer.analyzeVersionInfo(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        Preconditions.checkNotNull(versionInfo);
+
+        // a set to record every new tablet created when create table
+        // if failed in any step, use this set to do clear things
+        Set<Long> tabletIdSet = new HashSet<>();
+        // create partition
+        try {
+            if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
+                // this is a 1-level partitioned table
+                // use table name as partition name
+                DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
+                String partitionName = tableName;
+                long partitionId = partitionNameToId.get(partitionName);
+
+                // check replica quota if this operation done
+                long indexNum = olapTable.getIndexIdToMeta().size();
+                long bucketNum = partitionDistributionInfo.getBucketNum();
+                long replicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
+                long totalReplicaNum = indexNum * bucketNum * replicaNum;
+                if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
+                    throw new DdlException(
+                            "Database " + db.getFullName() + " create unpartitioned table " + tableName + " increasing "
+                                    + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
+                }
+                // create partition
+                Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
+                        olapTable.getBaseIndexId(), partitionId, partitionName, olapTable.getIndexIdToMeta(),
+                        partitionDistributionInfo, partitionInfo.getDataProperty(partitionId).getStorageMedium(),
+                        partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet,
+                        olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType,
+                        olapTable.getDataSortInfo());
+                olapTable.addPartition(partition);
+            } else if (partitionInfo.getType() == PartitionType.RANGE
+                    || partitionInfo.getType() == PartitionType.LIST) {
+                try {
+                    // just for remove entries in stmt.getProperties(),
+                    // and then check if there still has unknown properties
+                    PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_DATA_PROPERTY);
+                    if (partitionInfo.getType() == PartitionType.RANGE) {
+                        DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties);
+
+                    } else if (partitionInfo.getType() == PartitionType.LIST) {
+                        if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
+                            throw new DdlException(
+                                    "Only support dynamic partition properties on range partition table");
+
+                        }
+                    }
+
+                    if (properties != null && !properties.isEmpty()) {
+                        // here, all properties should be checked
+                        throw new DdlException("Unknown properties: " + properties);
+                    }
+                } catch (AnalysisException e) {
+                    throw new DdlException(e.getMessage());
+                }
+
+                // check replica quota if this operation done
+                long totalReplicaNum = 0;
+                for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
+                    long indexNum = olapTable.getIndexIdToMeta().size();
+                    long bucketNum = defaultDistributionInfo.getBucketNum();
+                    long replicaNum = partitionInfo.getReplicaAllocation(entry.getValue()).getTotalReplicaNum();
+                    totalReplicaNum += indexNum * bucketNum * replicaNum;
+                }
+                if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
+                    throw new DdlException(
+                            "Database " + db.getFullName() + " create table " + tableName + " increasing "
+                                    + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
+                }
+
+                // this is a 2-level partitioned tables
+                for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
+                    DataProperty dataProperty = partitionInfo.getDataProperty(entry.getValue());
+                    DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
+                    Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
+                            olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(),
+                            partitionDistributionInfo, dataProperty.getStorageMedium(),
+                            partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp,
+                            tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat,
+                            partitionInfo.getTabletType(entry.getValue()), compressionType,
+                            olapTable.getDataSortInfo());
+                    olapTable.addPartition(partition);
+                }
+            } else {
+                throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
+            }
+
+            Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
+            if (!result.first) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+            }
+
+            if (result.second) {
+                if (Catalog.getCurrentColocateIndex().isColocateTable(tableId)) {
+                    // if this is a colocate join table, its table id is already added to colocate group
+                    // so we should remove the tableId here
+                    Catalog.getCurrentColocateIndex().removeTable(tableId);
+                }
+                for (Long tabletId : tabletIdSet) {
+                    Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+                }
+                LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
+            } else {
+                // we have added these index to memory, only need to persist here
+                if (Catalog.getCurrentColocateIndex().isColocateTable(tableId)) {
+                    GroupId groupId = Catalog.getCurrentColocateIndex().getGroup(tableId);
+                    Map<Tag, List<List<Long>>> backendsPerBucketSeq =
+                            Catalog.getCurrentColocateIndex().getBackendsPerBucketSeq(groupId);
+                    ColocatePersistInfo info =
+                            ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
+                    Catalog.getCurrentCatalog().getEditLog().logColocateAddTable(info);
+                }
+                LOG.info("successfully create table[{};{}]", tableName, tableId);
+                // register or remove table from DynamicPartition after table created
+                DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
+                Catalog.getCurrentCatalog().getDynamicPartitionScheduler()
+                        .createOrUpdateRuntimeInfo(tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME,
+                                TimeUtils.getCurrentFormatTime());
+            }
+        } catch (DdlException e) {
+            for (Long tabletId : tabletIdSet) {
+                Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+            }
+            // only remove from memory, because we have not persist it
+            if (Catalog.getCurrentColocateIndex().isColocateTable(tableId)) {
+                Catalog.getCurrentColocateIndex().removeTable(tableId);
+            }
+
+            throw e;
+        }
+    }
+
+    private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
+        String tableName = stmt.getTableName();
+
+        List<Column> columns = stmt.getColumns();
+
+        long tableId = Catalog.getCurrentCatalog().getNextId();
+        MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
+        mysqlTable.setComment(stmt.getComment());
+        if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+        }
+        LOG.info("successfully create table[{}-{}]", tableName, tableId);
+        return;
+    }
+
+    private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
+        String tableName = stmt.getTableName();
+        List<Column> columns = stmt.getColumns();
+
+        long tableId = Catalog.getCurrentCatalog().getNextId();
+        OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
+        odbcTable.setComment(stmt.getComment());
+        if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+        }
+        LOG.info("successfully create table[{}-{}]", tableName, tableId);
+        return;
+    }
+
+    private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException {
+        String tableName = stmt.getTableName();
+
+        // create columns
+        List<Column> baseSchema = stmt.getColumns();
+        validateColumns(baseSchema);
+
+        // create partition info
+        PartitionDesc partitionDesc = stmt.getPartitionDesc();
+        PartitionInfo partitionInfo = null;
+        Map<String, Long> partitionNameToId = Maps.newHashMap();
+        if (partitionDesc != null) {
+            partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
+        } else {
+            long partitionId = Catalog.getCurrentCatalog().getNextId();
+            // use table name as single partition name
+            partitionNameToId.put(tableName, partitionId);
+            partitionInfo = new SinglePartitionInfo();
+        }
+
+        long tableId = Catalog.getCurrentCatalog().getNextId();
+        EsTable esTable = new EsTable(tableId, tableName, baseSchema, stmt.getProperties(), partitionInfo);
+        esTable.setComment(stmt.getComment());
+
+        if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+        }
+        LOG.info("successfully create table{} with id {}", tableName, tableId);
+        return esTable;
+    }
+
+    private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
+        String tableName = stmt.getTableName();
+
+        List<Column> columns = stmt.getColumns();
+
+        long tableId = Catalog.getCurrentCatalog().getNextId();
+        BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, stmt.getProperties());
+        brokerTable.setComment(stmt.getComment());
+        brokerTable.setBrokerProperties(stmt.getExtProperties());
+
+        if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+        }
+        LOG.info("successfully create table[{}-{}]", tableName, tableId);
+
+        return;
+    }
+
+    private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
+        String tableName = stmt.getTableName();
+        List<Column> columns = stmt.getColumns();
+        long tableId = Catalog.getCurrentCatalog().getNextId();
+        HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
+        hiveTable.setComment(stmt.getComment());
+        // check hive table whether exists in hive database
+        HiveMetaStoreClient hiveMetaStoreClient =
+                HiveMetaStoreClientHelper.getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
+        if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, hiveTable.getHiveDb(),
+                hiveTable.getHiveTable())) {
+            throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
+        }
+        // check hive table if exists in doris database
+        if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+        }
+        LOG.info("successfully create table[{}-{}]", tableName, tableId);
+    }
+
+    private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlException {
+        String tableName = stmt.getTableName();
+        List<Column> columns = stmt.getColumns();
+        long tableId = Catalog.getCurrentCatalog().getNextId();
+        HudiTable hudiTable = new HudiTable(tableId, tableName, columns, stmt.getProperties());
+        hudiTable.setComment(stmt.getComment());
+        // check hudi properties in create stmt.
+        HudiUtils.validateCreateTable(hudiTable);
+        // check hudi table whether exists in hive database
+        String metastoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
+        HiveMetaStoreClient hiveMetaStoreClient = HiveMetaStoreClientHelper.getClient(metastoreUris);
+        if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, hudiTable.getHmsDatabaseName(),
+                hudiTable.getHmsTableName())) {
+            throw new DdlException(
+                    String.format("Table [%s] dose not exist in Hive Metastore.", hudiTable.getHmsTableIdentifer()));
+        }
+        org.apache.hadoop.hive.metastore.api.Table hiveTable =
+                HiveMetaStoreClientHelper.getTable(hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName(),
+                        metastoreUris);
+        if (!HudiUtils.isHudiTable(hiveTable)) {
+            throw new DdlException(String.format("Table [%s] is not a hudi table.", hudiTable.getHmsTableIdentifer()));
+        }
+        // after support snapshot query for mor, we should remove the check.
+        if (HudiUtils.isHudiRealtimeTable(hiveTable)) {
+            throw new DdlException(String.format("Can not support hudi realtime table.", hudiTable.getHmsTableName()));
+        }
+        // check table's schema when user specify the schema
+        if (!hudiTable.getFullSchema().isEmpty()) {
+            HudiUtils.validateColumns(hudiTable, hiveTable);
+        }
+        switch (hiveTable.getTableType()) {
+            case "EXTERNAL_TABLE":
+            case "MANAGED_TABLE":
+                break;
+            case "VIRTUAL_VIEW":
+            default:
+                throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "].");
+        }
+        // check hive table if exists in doris database
+        if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+        }
+        LOG.info("successfully create table[{}-{}]", tableName, tableId);
+    }
+
+    private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
+            DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta,
+            Set<Long> tabletIdSet) throws DdlException {
+        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+        Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
+        GroupId groupId = null;
+        if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
+            if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
+                throw new DdlException("Random distribution for colocate table is unsupported");
+            }
+            // if this is a colocate table, try to get backend seqs from colocation index.
+            groupId = colocateIndex.getGroup(tabletMeta.getTableId());
+            backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId);
+        }
+
+        // chooseBackendsArbitrary is true, means this may be the first table of colocation group,
+        // or this is just a normal table, and we can choose backends arbitrary.
+        // otherwise, backends should be chosen from backendsPerBucketSeq;
+        boolean chooseBackendsArbitrary = backendsPerBucketSeq == null || backendsPerBucketSeq.isEmpty();
+        if (chooseBackendsArbitrary) {
+            backendsPerBucketSeq = Maps.newHashMap();
+        }
+        for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
+            // create a new tablet with random chosen backends
+            Tablet tablet = new Tablet(Catalog.getCurrentCatalog().getNextId());
+
+            // add tablet to inverted index first
+            index.addTablet(tablet, tabletMeta);
+            tabletIdSet.add(tablet.getId());
+
+            // get BackendIds
+            Map<Tag, List<Long>> chosenBackendIds;
+            if (chooseBackendsArbitrary) {
+                // This is the first colocate table in the group, or just a normal table,
+                // randomly choose backends
+                if (!Config.disable_storage_medium_check) {
+                    chosenBackendIds = Catalog.getCurrentSystemInfo()
+                            .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
+                                    tabletMeta.getStorageMedium());
+                } else {
+                    chosenBackendIds = Catalog.getCurrentSystemInfo()
+                            .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
+                }
+
+                for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
+                    backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
+                    backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
+                }
+            } else {
+                // get backends from existing backend sequence
+                chosenBackendIds = Maps.newHashMap();
+                for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) {
+                    chosenBackendIds.put(entry.getKey(), entry.getValue().get(i));
+                }
+            }
+            // create replicas
+            short totalReplicaNum = (short) 0;
+            for (List<Long> backendIds : chosenBackendIds.values()) {
+                for (long backendId : backendIds) {
+                    long replicaId = Catalog.getCurrentCatalog().getNextId();
+                    Replica replica =
+                            new Replica(replicaId, backendId, replicaState, version, tabletMeta.getOldSchemaHash());
+                    tablet.addReplica(replica);
+                    totalReplicaNum++;
+                }
+            }
+            Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum(),
+                    totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum());
+        }
+
+        if (groupId != null && chooseBackendsArbitrary) {
+            colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
+            ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
+            Catalog.getCurrentCatalog().getEditLog().logColocateBackendsPerBucketSeq(info);
+        }
+    }
+
+    /*
+     * generate and check columns' order and key's existence
+     */
+    private void validateColumns(List<Column> columns) throws DdlException {
+        if (columns.isEmpty()) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
+        }
+
+        boolean encounterValue = false;
+        boolean hasKey = false;
+        for (Column column : columns) {
+            if (column.isKey()) {
+                if (encounterValue) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_OLAP_KEY_MUST_BEFORE_VALUE);
+                }
+                hasKey = true;
+            } else {
+                encounterValue = true;
+            }
+        }
+
+        if (!hasKey) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_MUST_HAVE_KEYS);
+        }
+    }
+
+    /*
+     * Truncate specified table or partitions.
+     * The main idea is:
+     *
+     * 1. using the same schema to create new table(partitions)
+     * 2. use the new created table(partitions) to replace the old ones.
+     *
+     * if no partition specified, it will truncate all partitions of this table, including all temp partitions,
+     * otherwise, it will only truncate those specified partitions.
+     *
+     */
+    public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlException {
+        TableRef tblRef = truncateTableStmt.getTblRef();
+        TableName dbTbl = tblRef.getName();
+
+        // check, and save some info which need to be checked again later
+        Map<String, Long> origPartitions = Maps.newHashMap();
+        Map<Long, DistributionInfo> partitionsDistributionInfo = Maps.newHashMap();
+        OlapTable copiedTbl;
+
+        boolean truncateEntireTable = tblRef.getPartitionNames() == null;
+
+        Database db = (Database) getDbOrDdlException(dbTbl.getDb());
+        OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
+
+        olapTable.readLock();
+        try {
+            if (olapTable.getState() != OlapTableState.NORMAL) {
+                throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
+            }
+
+            if (!truncateEntireTable) {
+                for (String partName : tblRef.getPartitionNames().getPartitionNames()) {
+                    Partition partition = olapTable.getPartition(partName);
+                    if (partition == null) {
+                        throw new DdlException("Partition " + partName + " does not exist");
+                    }
+                    origPartitions.put(partName, partition.getId());
+                    partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
+                }
+            } else {
+                for (Partition partition : olapTable.getPartitions()) {
+                    origPartitions.put(partition.getName(), partition.getId());
+                    partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
+                }
+            }
+            copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), IndexExtState.VISIBLE, false);
+        } finally {
+            olapTable.readUnlock();
+        }
+
+        // 2. use the copied table to create partitions
+        List<Partition> newPartitions = Lists.newArrayList();
+        // tabletIdSet to save all newly created tablet ids.
+        Set<Long> tabletIdSet = Sets.newHashSet();
+        try {
+            for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
+                // the new partition must use new id
+                // If we still use the old partition id, the behavior of current load jobs on this partition
+                // will be undefined.
+                // By using a new id, load job will be aborted(just like partition is dropped),
+                // which is the right behavior.
+                long oldPartitionId = entry.getValue();
+                long newPartitionId = Catalog.getCurrentCatalog().getNextId();
+                Partition newPartition = createPartitionWithIndices(db.getClusterName(), db.getId(), copiedTbl.getId(),
+                        copiedTbl.getBaseIndexId(), newPartitionId, entry.getKey(), copiedTbl.getIndexIdToMeta(),
+                        partitionsDistributionInfo.get(oldPartitionId),
+                        copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).getStorageMedium(),
+                        copiedTbl.getPartitionInfo().getReplicaAllocation(oldPartitionId), null /* version info */,
+                        copiedTbl.getCopiedBfColumns(), copiedTbl.getBfFpp(), tabletIdSet, copiedTbl.getCopiedIndexes(),
+                        copiedTbl.isInMemory(), copiedTbl.getStorageFormat(),
+                        copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), copiedTbl.getCompressionType(),
+                        copiedTbl.getDataSortInfo());
+                newPartitions.add(newPartition);
+            }
+        } catch (DdlException e) {
+            // create partition failed, remove all newly created tablets
+            for (Long tabletId : tabletIdSet) {
+                Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+            }
+            throw e;
+        }
+        Preconditions.checkState(origPartitions.size() == newPartitions.size());
+
+        // all partitions are created successfully, try to replace the old partitions.
+        // before replacing, we need to check again.
+        // Things may be changed outside the table lock.
+        olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId());
+        olapTable.writeLockOrDdlException();
+        try {
+            if (olapTable.getState() != OlapTableState.NORMAL) {
+                throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
+            }
+
+            // check partitions
+            for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
+                Partition partition = copiedTbl.getPartition(entry.getValue());
+                if (partition == null || !partition.getName().equalsIgnoreCase(entry.getKey())) {
+                    throw new DdlException("Partition [" + entry.getKey() + "] is changed");
+                }
+            }
+
+            // check if meta changed
+            // rollup index may be added or dropped, and schema may be changed during creating partition operation.
+            boolean metaChanged = false;
+            if (olapTable.getIndexNameToId().size() != copiedTbl.getIndexNameToId().size()) {
+                metaChanged = true;
+            } else {
+                // compare schemaHash
+                Map<Long, Integer> copiedIndexIdToSchemaHash = copiedTbl.getIndexIdToSchemaHash();
+                for (Map.Entry<Long, Integer> entry : olapTable.getIndexIdToSchemaHash().entrySet()) {
+                    long indexId = entry.getKey();
+                    if (!copiedIndexIdToSchemaHash.containsKey(indexId)) {
+                        metaChanged = true;
+                        break;
+                    }
+                    if (!copiedIndexIdToSchemaHash.get(indexId).equals(entry.getValue())) {
+                        metaChanged = true;
+                        break;
+                    }
+                }
+            }
+
+            if (metaChanged) {
+                throw new DdlException("Table[" + copiedTbl.getName() + "]'s meta has been changed. try again.");
+            }
+
+            // replace
+            truncateTableInternal(olapTable, newPartitions, truncateEntireTable);
+
+            // write edit log
+            TruncateTableInfo info =
+                    new TruncateTableInfo(db.getId(), olapTable.getId(), newPartitions, truncateEntireTable);
+            Catalog.getCurrentCatalog().getEditLog().logTruncateTable(info);
+        } finally {
+            olapTable.writeUnlock();
+        }
+
+        LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames());
+    }
+
+    private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions, boolean isEntireTable) {
+        // use new partitions to replace the old ones.
+        Set<Long> oldTabletIds = Sets.newHashSet();
+        for (Partition newPartition : newPartitions) {
+            Partition oldPartition = olapTable.replacePartition(newPartition);
+            // save old tablets to be removed
+            for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) {
+                index.getTablets().stream().forEach(t -> {
+                    oldTabletIds.add(t.getId());
+                });
+            }
+        }
+
+        if (isEntireTable) {
+            // drop all temp partitions
+            olapTable.dropAllTempPartitions();
+        }
+
+        // remove the tablets in old partitions
+        for (Long tabletId : oldTabletIds) {
+            Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+        }
+    }
+
+    public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(info.getDbId());
+        OlapTable olapTable = db.getTableOrMetaException(info.getTblId(), TableType.OLAP);
+        olapTable.writeLock();
+        try {
+            truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable());
+
+            if (!Catalog.isCheckpointThread()) {
+                // add tablet to inverted index
+                TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+                for (Partition partition : info.getPartitions()) {
+                    long partitionId = partition.getId();
+                    TStorageMedium medium =
+                            olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
+                    for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
+                        long indexId = mIndex.getId();
+                        int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+                        TabletMeta tabletMeta =
+                                new TabletMeta(db.getId(), olapTable.getId(), partitionId, indexId, schemaHash, medium);
+                        for (Tablet tablet : mIndex.getTablets()) {
+                            long tabletId = tablet.getId();
+                            invertedIndex.addTablet(tabletId, tabletMeta);
+                            for (Replica replica : tablet.getReplicas()) {
+                                invertedIndex.addReplica(tabletId, replica);
+                            }
+                        }
+                    }
+                }
+            }
+        } finally {
+            olapTable.writeUnlock();
+        }
+    }
+
+    public void replayAlterExternalTableSchema(String dbName, String tableName, List<Column> newSchema)
+            throws MetaNotFoundException {
+        Database db = (Database) getDbOrMetaException(dbName);
+        Table table = db.getTableOrMetaException(tableName);
+        table.writeLock();
+        try {
+            table.setNewFullSchema(newSchema);
+        } finally {
+            table.writeUnlock();
+        }
+    }
+
+    // for test only
+    public void clearDbs() {
+        if (idToDb != null) {
+            idToDb.clear();
+        }
+        if (fullNameToDb != null) {
+            fullNameToDb.clear();
+        }
+    }
+
+    /**
+     * create cluster
+     *
+     * @param stmt
+     * @throws DdlException
+     */
+    public void createCluster(CreateClusterStmt stmt) throws DdlException {
+        final String clusterName = stmt.getClusterName();
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        try {
+            if (nameToCluster.containsKey(clusterName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_HAS_EXIST, clusterName);
+            } else {
+                List<Long> backendList =
+                        Catalog.getCurrentSystemInfo().createCluster(clusterName, stmt.getInstanceNum());
+                // 1: BE returned is less than requested, throws DdlException.
+                // 2: BE returned is more than or equal to 0, succeeds.
+                if (backendList != null || stmt.getInstanceNum() == 0) {
+                    final long id = Catalog.getCurrentCatalog().getNextId();
+                    final Cluster cluster = new Cluster(clusterName, id);
+                    cluster.setBackendIdList(backendList);
+                    unprotectCreateCluster(cluster);
+                    if (clusterName.equals(SystemInfoService.DEFAULT_CLUSTER)) {
+                        for (Database db : idToDb.values()) {
+                            if (db.getClusterName().equals(SystemInfoService.DEFAULT_CLUSTER)) {
+                                cluster.addDb(db.getFullName(), db.getId());
+                            }
+                        }
+                    }
+                    Catalog.getCurrentCatalog().getEditLog().logCreateCluster(cluster);
+                    LOG.info("finish to create cluster: {}", clusterName);
+                } else {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BE_NOT_ENOUGH);
+                }
+            }
+        } finally {
+            unlock();
+        }
+
+        // create super user for this cluster
+        UserIdentity adminUser = new UserIdentity(PaloAuth.ADMIN_USER, "%");
+        try {
+            adminUser.analyze(stmt.getClusterName());
+        } catch (AnalysisException e) {
+            LOG.error("should not happen", e);
+        }
+        Catalog.getCurrentCatalog().getAuth().createUser(new CreateUserStmt(new UserDesc(adminUser, "", true)));
+    }
+
+    private void unprotectCreateCluster(Cluster cluster) {
+        for (Long id : cluster.getBackendIdList()) {
+            final Backend backend = Catalog.getCurrentSystemInfo().getBackend(id);
+            backend.setOwnerClusterName(cluster.getName());
+            backend.setBackendState(BackendState.using);
+        }
+
+        idToCluster.put(cluster.getId(), cluster);
+        nameToCluster.put(cluster.getName(), cluster);
+
+        // create info schema db
+        final InfoSchemaDb infoDb = new InfoSchemaDb(cluster.getName());
+        infoDb.setClusterName(cluster.getName());
+        unprotectCreateDb(infoDb);
+
+        // only need to create default cluster once.
+        if (cluster.getName().equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
+            Catalog.getCurrentCatalog().setDefaultClusterCreated(true);
+        }
+    }
+
+    /**
+     * replay create cluster
+     *
+     * @param cluster
+     */
+    public void replayCreateCluster(Cluster cluster) {
+        tryLock(true);
+        try {
+            unprotectCreateCluster(cluster);
+        } finally {
+            unlock();
+        }
+    }
+
+    /**
+     * drop cluster and cluster's db must be have deleted
+     *
+     * @param stmt
+     * @throws DdlException
+     */
+    public void dropCluster(DropClusterStmt stmt) throws DdlException {
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        try {
+            final String clusterName = stmt.getClusterName();
+            final Cluster cluster = nameToCluster.get(clusterName);
+            if (cluster == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
+            }
+            final List<Backend> backends = Catalog.getCurrentSystemInfo().getClusterBackends(clusterName);
+            for (Backend backend : backends) {
+                if (backend.isDecommissioned()) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_IN_DECOMMISSION, clusterName);
+                }
+            }
+
+            // check if there still have databases undropped, except for information_schema db
+            if (cluster.getDbNames().size() > 1) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DELETE_DB_EXIST, clusterName);
+            }
+
+            Catalog.getCurrentSystemInfo().releaseBackends(clusterName, false /* is not replay */);
+            final ClusterInfo info = new ClusterInfo(clusterName, cluster.getId());
+            unprotectDropCluster(info, false /* is not replay */);
+            Catalog.getCurrentCatalog().getEditLog().logDropCluster(info);
+        } finally {
+            unlock();
+        }
+
+        // drop user of this cluster
+        // set is replay to true, not write log
+        Catalog.getCurrentCatalog().getAuth().dropUserOfCluster(stmt.getClusterName(), true /* is replay */);
+    }
+
+    private void unprotectDropCluster(ClusterInfo info, boolean isReplay) {
+        Catalog.getCurrentSystemInfo().releaseBackends(info.getClusterName(), isReplay);
+        idToCluster.remove(info.getClusterId());
+        nameToCluster.remove(info.getClusterName());
+        final Database infoSchemaDb = fullNameToDb.get(InfoSchemaDb.getFullInfoSchemaDbName(info.getClusterName()));
+        fullNameToDb.remove(infoSchemaDb.getFullName());
+        idToDb.remove(infoSchemaDb.getId());
+    }
+
+    public void replayDropCluster(ClusterInfo info) throws DdlException {
+        tryLock(true);
+        try {
+            unprotectDropCluster(info, true/* is replay */);
+        } finally {
+            unlock();
+        }
+
+        Catalog.getCurrentCatalog().getAuth().dropUserOfCluster(info.getClusterName(), true /* is replay */);
+    }
+
+    public void replayExpandCluster(ClusterInfo info) {
+        tryLock(true);
+        try {
+            final Cluster cluster = nameToCluster.get(info.getClusterName());
+            cluster.addBackends(info.getBackendIdList());
+
+            for (Long beId : info.getBackendIdList()) {
+                Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
+                if (be == null) {
+                    continue;
+                }
+                be.setOwnerClusterName(info.getClusterName());
+                be.setBackendState(BackendState.using);
+            }
+        } finally {
+            unlock();
+        }
+    }
+
+    /**
+     * modify cluster: Expansion or shrink
+     *
+     * @param stmt
+     * @throws DdlException
+     */
+    public void processModifyCluster(AlterClusterStmt stmt) throws UserException {
+        final String clusterName = stmt.getAlterClusterName();
+        final int newInstanceNum = stmt.getInstanceNum();
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        try {
+            Cluster cluster = nameToCluster.get(clusterName);
+            if (cluster == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
+            }
+
+            // check if this cluster has backend in decommission
+            final List<Long> backendIdsInCluster = cluster.getBackendIdList();
+            for (Long beId : backendIdsInCluster) {
+                Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
+                if (be.isDecommissioned()) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_IN_DECOMMISSION, clusterName);
+                }
+            }
+
+            final int oldInstanceNum = backendIdsInCluster.size();
+            if (newInstanceNum > oldInstanceNum) {
+                // expansion
+                final List<Long> expandBackendIds = Catalog.getCurrentSystemInfo()
+                        .calculateExpansionBackends(clusterName, newInstanceNum - oldInstanceNum);
+                if (expandBackendIds == null) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BE_NOT_ENOUGH);
+                }
+                cluster.addBackends(expandBackendIds);
+                final ClusterInfo info = new ClusterInfo(clusterName, cluster.getId(), expandBackendIds);
+                Catalog.getCurrentCatalog().getEditLog().logExpandCluster(info);
+            } else if (newInstanceNum < oldInstanceNum) {
+                // shrink
+                final List<Long> decomBackendIds = Catalog.getCurrentSystemInfo()
+                        .calculateDecommissionBackends(clusterName, oldInstanceNum - newInstanceNum);
+                if (decomBackendIds == null || decomBackendIds.size() == 0) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BACKEND_ERROR);
+                }
+
+                List<String> hostPortList = Lists.newArrayList();
+                for (Long id : decomBackendIds) {
+                    final Backend backend = Catalog.getCurrentSystemInfo().getBackend(id);
+                    hostPortList.add(
+                            new StringBuilder().append(backend.getHost()).append(":").append(backend.getHeartbeatPort())
+                                    .toString());
+                }
+
+                // here we reuse the process of decommission backends. but set backend's decommission type to
+                // ClusterDecommission, which means this backend will not be removed from the system
+                // after decommission is done.
+                final DecommissionBackendClause clause = new DecommissionBackendClause(hostPortList);
+                try {
+                    clause.analyze(null);
+                    clause.setType(DecommissionType.ClusterDecommission);
+                    AlterSystemStmt alterStmt = new AlterSystemStmt(clause);
+                    alterStmt.setClusterName(clusterName);
+                    Catalog.getCurrentCatalog().getAlterInstance().processAlterCluster(alterStmt);
+                } catch (AnalysisException e) {
+                    Preconditions.checkState(false, "should not happened: " + e.getMessage());
+                }
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_NO_CHANGE, newInstanceNum);
+            }
+
+        } finally {
+            unlock();
+        }
+    }
+
+    /**
+     * @param ctx
+     * @param clusterName
+     * @throws DdlException
+     */
+    public void changeCluster(ConnectContext ctx, String clusterName) throws DdlException {
+        if (!Catalog.getCurrentCatalog().getAuth().checkCanEnterCluster(ConnectContext.get(), clusterName)) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_AUTHORITY, ConnectContext.get().getQualifiedUser(),
+                    "enter");
+        }
+
+        if (!nameToCluster.containsKey(clusterName)) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
+        }
+
+        ctx.setCluster(clusterName);
+    }
+
+    /**
+     * migrate db to link dest cluster
+     *
+     * @param stmt
+     * @throws DdlException
+     */
+    public void migrateDb(MigrateDbStmt stmt) throws DdlException {
+        final String srcClusterName = stmt.getSrcCluster();
+        final String destClusterName = stmt.getDestCluster();
+        final String srcDbName = stmt.getSrcDb();
+        final String destDbName = stmt.getDestDb();
+
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        try {
+            if (!nameToCluster.containsKey(srcClusterName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NOT_EXIST, srcClusterName);
+            }
+            if (!nameToCluster.containsKey(destClusterName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DEST_CLUSTER_NOT_EXIST, destClusterName);
+            }
+
+            if (srcClusterName.equals(destClusterName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_SAME_CLUSTER);
+            }
+
+            final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
+            if (!srcCluster.containDb(srcDbName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_DB_NOT_EXIST, srcDbName);
+            }
+            final Cluster destCluster = this.nameToCluster.get(destClusterName);
+            if (!destCluster.containLink(destDbName, srcDbName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATION_NO_LINK, srcDbName, destDbName);
+            }
+
+            final Database db = fullNameToDb.get(srcDbName);
+
+            // if the max replication num of the src db is larger then the backends num of the dest cluster,
+            // the migration will not be processed.
+            final int maxReplicationNum = db.getMaxReplicationNum();
+            if (maxReplicationNum > destCluster.getBackendIdList().size()) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_BE_NOT_ENOUGH, destClusterName);
+            }
+
+            if (db.getDbState() == DbState.LINK) {
+                final BaseParam param = new BaseParam();
+                param.addStringParam(destDbName);
+                param.addLongParam(db.getId());
+                param.addStringParam(srcDbName);
+                param.addStringParam(destClusterName);
+                param.addStringParam(srcClusterName);
+                fullNameToDb.remove(db.getFullName());
+                srcCluster.removeDb(db.getFullName(), db.getId());
+                destCluster.removeLinkDb(param);
+                destCluster.addDb(destDbName, db.getId());
+                db.writeLock();
+                try {
+                    db.setDbState(DbState.MOVE);
+                    // set cluster to the dest cluster.
+                    // and Clone process will do the migration things.
+                    db.setClusterName(destClusterName);
+                    db.setName(destDbName);
+                    db.setAttachDb(srcDbName);
+                } finally {
+                    db.writeUnlock();
+                }
+                Catalog.getCurrentCatalog().getEditLog().logMigrateCluster(param);
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATION_NO_LINK, srcDbName, destDbName);
+            }
+        } finally {
+            unlock();
+        }
+    }
+
+    public void replayMigrateDb(BaseParam param) {
+        final String desDbName = param.getStringParam();
+        final String srcDbName = param.getStringParam(1);
+        final String desClusterName = param.getStringParam(2);
+        final String srcClusterName = param.getStringParam(3);
+        tryLock(true);
+        try {
+            final Cluster desCluster = this.nameToCluster.get(desClusterName);
+            final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
+            final Database db = fullNameToDb.get(srcDbName);
+            if (db.getDbState() == DbState.LINK) {
+                fullNameToDb.remove(db.getFullName());
+                srcCluster.removeDb(db.getFullName(), db.getId());
+                desCluster.removeLinkDb(param);
+                desCluster.addDb(param.getStringParam(), db.getId());
+
+                db.writeLock();
+                db.setName(desDbName);
+                db.setAttachDb(srcDbName);
+                db.setDbState(DbState.MOVE);
+                db.setClusterName(desClusterName);
+                db.writeUnlock();
+            }
+        } finally {
+            unlock();
+        }
+    }
+
+    public void replayLinkDb(BaseParam param) {
+        final String desClusterName = param.getStringParam(2);
+        final String srcDbName = param.getStringParam(1);
+        final String desDbName = param.getStringParam();
+
+        tryLock(true);
+        try {
+            final Cluster desCluster = this.nameToCluster.get(desClusterName);
+            final Database srcDb = fullNameToDb.get(srcDbName);
+            srcDb.writeLock();
+            srcDb.setDbState(DbState.LINK);
+            srcDb.setAttachDb(desDbName);
+            srcDb.writeUnlock();
+            desCluster.addLinkDb(param);
+            fullNameToDb.put(desDbName, srcDb);
+        } finally {
+            unlock();
+        }
+    }
+
+    /**
+     * link src db to dest db. we use java's quotation Mechanism to realize db hard links
+     *
+     * @param stmt
+     * @throws DdlException
+     */
+    public void linkDb(LinkDbStmt stmt) throws DdlException {
+        final String srcClusterName = stmt.getSrcCluster();
+        final String destClusterName = stmt.getDestCluster();
+        final String srcDbName = stmt.getSrcDb();
+        final String destDbName = stmt.getDestDb();
+
+        if (!tryLock(false)) {
+            throw new DdlException("Failed to acquire catalog lock. Try again");
+        }
+        try {
+            if (!nameToCluster.containsKey(srcClusterName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NOT_EXIST, srcClusterName);
+            }
+
+            if (!nameToCluster.containsKey(destClusterName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DEST_CLUSTER_NOT_EXIST, destClusterName);
+            }
+
+            if (srcClusterName.equals(destClusterName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_SAME_CLUSTER);
+            }
+
+            if (fullNameToDb.containsKey(destDbName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, destDbName);
+            }
+
+            final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
+            final Cluster destCluster = this.nameToCluster.get(destClusterName);
+
+            if (!srcCluster.containDb(srcDbName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_DB_NOT_EXIST, srcDbName);
+            }
+            final Database srcDb = fullNameToDb.get(srcDbName);
+
+            if (srcDb.getDbState() != DbState.NORMAL) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
+                        ClusterNamespace.getNameFromFullName(srcDbName));
+            }
+
+            srcDb.writeLock();
+            try {
+                srcDb.setDbState(DbState.LINK);
+                srcDb.setAttachDb(destDbName);
+            } finally {
+                srcDb.writeUnlock();
+            }
+
+            final long id = Catalog.getCurrentCatalog().getNextId();
+            final BaseParam param = new BaseParam();
+            param.addStringParam(destDbName);
+            param.addStringParam(srcDbName);
+            param.addLongParam(id);
+            param.addLongParam(srcDb.getId());
+            param.addStringParam(destClusterName);
+            param.addStringParam(srcClusterName);
+            destCluster.addLinkDb(param);
+            fullNameToDb.put(destDbName, srcDb);
+            Catalog.getCurrentCatalog().getEditLog().logLinkCluster(param);
+        } finally {
+            unlock();
+        }
+    }
+
+    public Cluster getCluster(String clusterName) {
+        return nameToCluster.get(clusterName);
+    }
+
+    public List<String> getClusterNames() {
+        return new ArrayList<String>(nameToCluster.keySet());
+    }
+
+    /**
+     * get migrate progress , when finish migration, next cloneCheck will reset dbState
+     *
+     * @return
+     */
+    public Set<BaseParam> getMigrations() {
+        final Set<BaseParam> infos = Sets.newHashSet();
+        for (Database db : fullNameToDb.values()) {
+            db.readLock();
+            try {
+                if (db.getDbState() == DbState.MOVE) {
+                    int tabletTotal = 0;
+                    int tabletQuorum = 0;
+                    final Set<Long> beIds =
+                            Sets.newHashSet(Catalog.getCurrentSystemInfo().getClusterBackendIds(db.getClusterName()));
+                    final Set<String> tableNames = db.getTableNamesWithLock();
+                    for (String tableName : tableNames) {
+
+                        Table table = db.getTableNullable(tableName);
+                        if (table == null || table.getType() != TableType.OLAP) {
+                            continue;
+                        }
+
+                        OlapTable olapTable = (OlapTable) table;
+                        olapTable.readLock();
+                        try {
+                            for (Partition partition : olapTable.getPartitions()) {
+                                ReplicaAllocation replicaAlloc =
+                                        olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
+                                short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+                                for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(
+                                        IndexExtState.ALL)) {
+                                    if (materializedIndex.getState() != IndexState.NORMAL) {
+                                        continue;
+                                    }
+                                    for (Tablet tablet : materializedIndex.getTablets()) {
+                                        int replicaNum = 0;
+                                        int quorum = totalReplicaNum / 2 + 1;
+                                        for (Replica replica : tablet.getReplicas()) {
+                                            if (replica.getState() != ReplicaState.CLONE && beIds.contains(
+                                                    replica.getBackendId())) {
+                                                replicaNum++;
+                                            }
+                                        }
+                                        if (replicaNum > quorum) {
+                                            replicaNum = quorum;
+                                        }
+
+                                        tabletQuorum = tabletQuorum + replicaNum;
+                                        tabletTotal = tabletTotal + quorum;
+                                    }
+                                }
+                            }
+                        } finally {
+                            olapTable.readUnlock();
+                        }
+                    }
+                    final BaseParam info = new BaseParam();
+                    info.addStringParam(db.getClusterName());
+                    info.addStringParam(db.getAttachDb());
+                    info.addStringParam(db.getFullName());
+                    final float percentage = tabletTotal > 0 ? (float) tabletQuorum / (float) tabletTotal : 0f;
+                    info.addFloatParam(percentage);
+                    infos.add(info);
+                }
+            } finally {
+                db.readUnlock();
+            }
+        }
+
+        return infos;
+    }
+
+    public long loadCluster(DataInputStream dis, long checksum) throws IOException, DdlException {
+        int clusterCount = dis.readInt();
+        checksum ^= clusterCount;
+        for (long i = 0; i < clusterCount; ++i) {
+            final Cluster cluster = Cluster.read(dis);
+            checksum ^= cluster.getId();
+
+            List<Long> latestBackendIds = Catalog.getCurrentSystemInfo().getClusterBackendIds(cluster.getName());
+            if (latestBackendIds.size() != cluster.getBackendIdList().size()) {
+                LOG.warn(
+                        "Cluster:" + cluster.getName() + ", backends in Cluster is " + cluster.getBackendIdList().size()
+                                + ", backends in SystemInfoService is " + cluster.getBackendIdList().size());
+            }
+            // The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
+            // SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are
+            // for adding BE to some Cluster, but loadCluster is after loadBackend.
+            cluster.setBackendIdList(latestBackendIds);
+
+            String dbName = InfoSchemaDb.getFullInfoSchemaDbName(cluster.getName());
+            // Use real Catalog instance to avoid InfoSchemaDb id continuously increment
+            // when checkpoint thread load image.
+            InfoSchemaDb db = (InfoSchemaDb) Catalog.getServingCatalog().getDbNullable(dbName);
+            if (db == null) {
+                db = new InfoSchemaDb(cluster.getName());
+                db.setClusterName(cluster.getName());
+            }
+            String errMsg = "InfoSchemaDb id shouldn't larger than 10000, please restart your FE server";
+            // Every time we construct the InfoSchemaDb, which id will increment.
+            // When InfoSchemaDb id larger than 10000 and put it to idToDb,
+            // which may be overwrite the normal db meta in idToDb,
+            // so we ensure InfoSchemaDb id less than 10000.
+            Preconditions.checkState(db.getId() < Catalog.NEXT_ID_INIT_VALUE, errMsg);
+            idToDb.put(db.getId(), db);
+            fullNameToDb.put(db.getFullName(), db);
+            cluster.addDb(dbName, db.getId());
+            idToCluster.put(cluster.getId(), cluster);
+            nameToCluster.put(cluster.getName(), cluster);
+        }
+        LOG.info("finished replay cluster from image");
+        return checksum;
+    }
+
+    public void initDefaultCluster() {
+        final List<Long> backendList = Lists.newArrayList();
+        final List<Backend> defaultClusterBackends =
+                Catalog.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
+        for (Backend backend : defaultClusterBackends) {
+            backendList.add(backend.getId());
+        }
+
+        final long id = Catalog.getCurrentCatalog().getNextId();
+        final Cluster cluster = new Cluster(SystemInfoService.DEFAULT_CLUSTER, id);
+
+        // make sure one host hold only one backend.
+        Set<String> beHost = Sets.newHashSet();
+        for (Backend be : defaultClusterBackends) {
+            if (beHost.contains(be.getHost())) {
+                // we can not handle this situation automatically.
+                LOG.error("found more than one backends in same host: {}", be.getHost());
+                System.exit(-1);
+            } else {
+                beHost.add(be.getHost());
+            }
+        }
+
+        // we create default_cluster to meet the need for ease of use, because
+        // most users have no multi tenant needs.
+        cluster.setBackendIdList(backendList);
+        unprotectCreateCluster(cluster);
+        for (Database db : idToDb.values()) {
+            db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
+            cluster.addDb(db.getFullName(), db.getId());
+        }
+
+        // no matter default_cluster is created or not,
+        // mark isDefaultClusterCreated as true
+        Catalog.getCurrentCatalog().setDefaultClusterCreated(true);
+        Catalog.getCurrentCatalog().getEditLog().logCreateCluster(cluster);
+    }
+
+    public void replayUpdateDb(DatabaseInfo info) {
+        final Database db = fullNameToDb.get(info.getDbName());
+        db.setClusterName(info.getClusterName());
+        db.setDbState(info.getDbState());
+    }
+
+    public long saveCluster(CountingDataOutputStream dos, long checksum) throws IOException {
+        final int clusterCount = idToCluster.size();
+        checksum ^= clusterCount;
+        dos.writeInt(clusterCount);
+        for (Map.Entry<Long, Cluster> entry : idToCluster.entrySet()) {
+            long clusterId = entry.getKey();
+            if (clusterId >= Catalog.NEXT_ID_INIT_VALUE) {
+                checksum ^= clusterId;
+                final Cluster cluster = entry.getValue();
+                cluster.write(dos);
+            }
+        }
+        return checksum;
+    }
+
+    public void replayUpdateClusterAndBackends(BackendIdsUpdateInfo info) {
+        for (long id : info.getBackendList()) {
+            final Backend backend = Catalog.getCurrentSystemInfo().getBackend(id);
+            final Cluster cluster = nameToCluster.get(backend.getOwnerClusterName());
+            cluster.removeBackend(id);
+            backend.setDecommissioned(false);
+            backend.clearClusterName();
+            backend.setBackendState(BackendState.free);
+        }
+    }
+
+    public List<String> getClusterDbNames(String clusterName) throws AnalysisException {
+        final Cluster cluster = nameToCluster.get(clusterName);
+        if (cluster == null) {
+            throw new AnalysisException("No cluster selected");
+        }
+        return Lists.newArrayList(cluster.getDbNames());
+    }
+
+    public long saveDb(CountingDataOutputStream dos, long checksum) throws IOException {
+        int dbCount = idToDb.size() - nameToCluster.keySet().size();
+        checksum ^= dbCount;
+        dos.writeInt(dbCount);
+        for (Map.Entry<Long, Database> entry : idToDb.entrySet()) {
+            Database db = entry.getValue();
+            String dbName = db.getFullName();
+            // Don't write information_schema db meta
+            if (!InfoSchemaDb.isInfoSchemaDb(dbName)) {
+                checksum ^= entry.getKey();
+                db.write(dos);
+            }
+        }
+        return checksum;
+    }
+
+    public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlException {
+        int dbCount = dis.readInt();
+        long newChecksum = checksum ^ dbCount;
+        for (long i = 0; i < dbCount; ++i) {
+            Database db = new Database();
+            db.readFields(dis);
+            newChecksum ^= db.getId();
+            idToDb.put(db.getId(), db);
+            fullNameToDb.put(db.getFullName(), db);
+            if (db.getDbState() == DbState.LINK) {
+                fullNameToDb.put(db.getAttachDb(), db);
+            }
+            Catalog.getCurrentGlobalTransactionMgr().addDatabaseTransactionMgr(db.getId());
+        }
+        LOG.info("finished replay databases from image");
+        return newChecksum;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java
index 09d3893cd3..6b9b7f6818 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java
@@ -50,7 +50,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
@@ -168,19 +167,19 @@ public class ShowAction extends RestBaseController {
         Map<String, Long> oneEntry = Maps.newHashMap();
 
         String dbName = request.getParameter(DB_KEY);
-        ConcurrentHashMap<String, Database> fullNameToDb = Catalog.getCurrentCatalog().getFullNameToDb();
         long totalSize = 0;
         if (dbName != null) {
             String fullDbName = getFullDbName(dbName);
-            Database db = fullNameToDb.get(fullDbName);
+            Database db = Catalog.getCurrentCatalog().getDbNullable(fullDbName);
             if (db == null) {
                 return ResponseEntityBuilder.okWithCommonError("database " + fullDbName + " not found.");
             }
             totalSize = getDataSizeOfDatabase(db);
             oneEntry.put(fullDbName, totalSize);
         } else {
-            for (Database db : fullNameToDb.values()) {
-                if (db.isInfoSchemaDb()) {
+            for (long dbId : Catalog.getCurrentCatalog().getDbIds()) {
+                Database db = Catalog.getCurrentCatalog().getDbNullable(dbId);
+                if (db == null && db.isInfoSchemaDb()) {
                     continue;
                 }
                 totalSize += getDataSizeOfDatabase(db);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index 6fb0abbf4f..bffcc9c4f5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -41,7 +41,6 @@ public class FakeEditLog extends MockUp<EditLog> {
     @Mock
     public void $init(String nodeName) { // CHECKSTYLE IGNORE THIS LINE
         // do nothing
-        System.out.println("abc");
     }
 
     @Mock


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org