You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/08 14:02:28 UTC
[incubator-doris] branch master updated: [feature-wip](multi-catalog)(step2) Introduce Internal Data Source (#9953)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5f56e17ef2 [feature-wip](multi-catalog)(step2) Introduce Internal Data Source (#9953)
5f56e17ef2 is described below
commit 5f56e17ef215e932990da880182d8ad421bc62c1
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Jun 8 22:02:22 2022 +0800
[feature-wip](multi-catalog)(step2) Introduce Internal Data Source (#9953)
---
.../java/org/apache/doris/catalog/Catalog.java | 2784 +----------------
.../java/org/apache/doris/catalog/Database.java | 4 +-
.../java/org/apache/doris/catalog/DatabaseIf.java | 17 +-
.../main/java/org/apache/doris/common/Config.java | 7 +
.../java/org/apache/doris/common/MetaReader.java | 2 +-
.../org/apache/doris/datasource/DataSourceIf.java | 31 +-
.../org/apache/doris/datasource/DataSourceMgr.java | 116 +-
.../doris/datasource/DataSourceMgrProperty.java | 48 +
.../doris/datasource/DataSourceProperty.java | 45 +
.../doris/datasource/EsExternalDataSource.java | 10 -
.../doris/datasource/ExternalDataSource.java | 96 +-
.../doris/datasource/HMSExternalDataSource.java | 12 -
.../doris/datasource/InternalDataSource.java | 3157 +++++++++++++++++++-
.../org/apache/doris/httpv2/rest/ShowAction.java | 9 +-
.../java/org/apache/doris/catalog/FakeEditLog.java | 1 -
15 files changed, 3514 insertions(+), 2825 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 94c42a4786..a42add1079 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -20,19 +20,16 @@ package org.apache.doris.catalog;
import org.apache.doris.alter.Alter;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.AlterJobV2.JobType;
-import org.apache.doris.alter.DecommissionType;
import org.apache.doris.alter.MaterializedViewHandler;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.alter.SystemHandler;
import org.apache.doris.analysis.AddPartitionClause;
-import org.apache.doris.analysis.AddRollupClause;
import org.apache.doris.analysis.AdminCheckTabletsStmt;
import org.apache.doris.analysis.AdminCheckTabletsStmt.CheckType;
import org.apache.doris.analysis.AdminCleanTrashStmt;
import org.apache.doris.analysis.AdminCompactTableStmt;
import org.apache.doris.analysis.AdminSetConfigStmt;
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
-import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterClusterStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt.QuotaType;
@@ -40,13 +37,10 @@ import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterSystemStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
-import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelBackupStmt;
-import org.apache.doris.analysis.ColumnDef;
-import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.analysis.ColumnRenameClause;
import org.apache.doris.analysis.CreateClusterStmt;
import org.apache.doris.analysis.CreateDbStmt;
@@ -55,11 +49,8 @@ import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateTableAsSelectStmt;
import org.apache.doris.analysis.CreateTableLikeStmt;
import org.apache.doris.analysis.CreateTableStmt;
-import org.apache.doris.analysis.CreateUserStmt;
import org.apache.doris.analysis.CreateViewStmt;
-import org.apache.doris.analysis.DataSortInfo;
import org.apache.doris.analysis.DdlStmt;
-import org.apache.doris.analysis.DecommissionBackendClause;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropClusterStmt;
import org.apache.doris.analysis.DropDbStmt;
@@ -67,17 +58,12 @@ import org.apache.doris.analysis.DropFunctionStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.DropTableStmt;
-import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionName;
-import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.InstallPluginStmt;
-import org.apache.doris.analysis.KeysDesc;
import org.apache.doris.analysis.LinkDbStmt;
import org.apache.doris.analysis.MigrateDbStmt;
import org.apache.doris.analysis.ModifyDistributionClause;
-import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionRenameClause;
-import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
import org.apache.doris.analysis.RecoverTableStmt;
@@ -85,24 +71,15 @@ import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.RollupRenameClause;
import org.apache.doris.analysis.ShowAlterStmt.AlterType;
-import org.apache.doris.analysis.SinglePartitionDesc;
-import org.apache.doris.analysis.TableName;
-import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TableRenameClause;
import org.apache.doris.analysis.TruncateTableStmt;
-import org.apache.doris.analysis.TypeDef;
import org.apache.doris.analysis.UninstallPluginStmt;
-import org.apache.doris.analysis.UserDesc;
-import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.BackupHandler;
import org.apache.doris.blockrule.SqlBlockRuleMgr;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
-import org.apache.doris.catalog.Database.DbState;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
-import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.OlapTable.OlapTableState;
-import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Replica.ReplicaStatus;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.clone.ColocateTableCheckerAndBalancer;
@@ -112,7 +89,6 @@ import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.clone.TabletSchedulerStat;
import org.apache.doris.cluster.BaseParam;
import org.apache.doris.cluster.Cluster;
-import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
@@ -122,8 +98,6 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
-import org.apache.doris.common.FeNameFormat;
-import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaHeader;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.MetaReader;
@@ -141,19 +115,17 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.QueryableReentrantLock;
import org.apache.doris.common.util.SmallFileMgr;
-import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.consistency.ConsistencyChecker;
+import org.apache.doris.datasource.DataSourceMgr;
+import org.apache.doris.datasource.InternalDataSource;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.deploy.impl.AmbariDeployManager;
import org.apache.doris.deploy.impl.K8sDeployManager;
import org.apache.doris.deploy.impl.LocalFileDeployManager;
import org.apache.doris.external.elasticsearch.EsRepository;
-import org.apache.doris.external.hudi.HudiProperty;
import org.apache.doris.external.hudi.HudiTable;
-import org.apache.doris.external.hudi.HudiUtils;
-import org.apache.doris.external.iceberg.IcebergCatalogMgr;
import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.FrontendNodeType;
@@ -194,10 +166,7 @@ import org.apache.doris.persist.BackendIdsUpdateInfo;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.ClusterInfo;
-import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.DatabaseInfo;
-import org.apache.doris.persist.DropDbInfo;
-import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.EditLog;
@@ -233,25 +202,18 @@ import org.apache.doris.statistics.StatisticsJobScheduler;
import org.apache.doris.statistics.StatisticsManager;
import org.apache.doris.statistics.StatisticsTaskScheduler;
import org.apache.doris.system.Backend;
-import org.apache.doris.system.Backend.BackendState;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.HeartbeatMgr;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
-import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CompactionTask;
-import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
-import org.apache.doris.thrift.TStorageType;
-import org.apache.doris.thrift.TTabletType;
-import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.PublishVersionDaemon;
@@ -264,13 +226,11 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
+import lombok.Setter;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@@ -287,7 +247,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -327,16 +286,10 @@ public class Catalog {
// Using QueryableReentrantLock to print owner thread in debug mode.
private QueryableReentrantLock lock;
- private ConcurrentHashMap<Long, Database> idToDb;
- private ConcurrentHashMap<String, Database> fullNameToDb;
-
- private ConcurrentHashMap<Long, Cluster> idToCluster;
- private ConcurrentHashMap<String, Cluster> nameToCluster;
-
+ private DataSourceMgr dataSourceMgr;
private Load load;
private LoadManager loadManager;
private StreamLoadRecordMgr streamLoadRecordMgr;
- private IcebergTableCreationRecordMgr icebergTableCreationRecordMgr;
private RoutineLoadManager routineLoadManager;
private SqlBlockRuleMgr sqlBlockRuleMgr;
private ExportMgr exportMgr;
@@ -355,7 +308,6 @@ public class Catalog {
private Daemon replayer;
private Daemon timePrinter;
private Daemon listener;
- private EsRepository esRepository; // it is a daemon, so add it here
private boolean isFirstTimeStartUp = false;
private boolean isElectable;
@@ -369,6 +321,7 @@ public class Catalog {
private BlockingQueue<FrontendNodeType> typeTransferQueue;
// false if default_cluster is not created.
+ @Setter
private boolean isDefaultClusterCreated = false;
// node name is used for bdbje NodeName.
@@ -522,6 +475,14 @@ public class Catalog {
return this.dynamicPartitionScheduler;
}
+ public DataSourceMgr getDataSourceMgr() {
+ return dataSourceMgr;
+ }
+
+ public InternalDataSource getInternalDataSource() {
+ return dataSourceMgr.getInternalDataSource();
+ }
+
private static class SingletonHolder {
private static final Catalog INSTANCE = new Catalog();
}
@@ -532,8 +493,7 @@ public class Catalog {
// if isCheckpointCatalog is true, it means that we should not collect thread pool metric
private Catalog(boolean isCheckpointCatalog) {
- this.idToDb = new ConcurrentHashMap<>();
- this.fullNameToDb = new ConcurrentHashMap<>();
+ this.dataSourceMgr = new DataSourceMgr();
this.load = new Load();
this.routineLoadManager = new RoutineLoadManager();
this.sqlBlockRuleMgr = new SqlBlockRuleMgr();
@@ -575,9 +535,6 @@ public class Catalog {
this.metaReplayState = new MetaReplayState();
- this.idToCluster = new ConcurrentHashMap<>();
- this.nameToCluster = new ConcurrentHashMap<>();
-
this.isDefaultClusterCreated = false;
this.brokerMgr = new BrokerMgr();
@@ -595,8 +552,6 @@ public class Catalog {
this.auth = new PaloAuth();
this.domainResolver = new DomainResolver(auth);
- this.esRepository = new EsRepository();
-
this.metaContext = new MetaContext();
this.metaContext.setThreadLocalInfo();
@@ -617,7 +572,6 @@ public class Catalog {
this.loadJobScheduler = new LoadJobScheduler();
this.loadManager = new LoadManager(loadJobScheduler);
this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000);
- this.icebergTableCreationRecordMgr = new IcebergTableCreationRecordMgr();
this.loadEtlChecker = new LoadEtlChecker(loadManager);
this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
@@ -696,10 +650,6 @@ public class Catalog {
return tabletChecker;
}
- public ConcurrentHashMap<String, Database> getFullNameToDb() {
- return fullNameToDb;
- }
-
public AuditEventProcessor getAuditEventProcessor() {
return auditEventProcessor;
}
@@ -766,7 +716,6 @@ public class Catalog {
return statisticsTaskScheduler;
}
-
// Use tryLock to avoid potential dead lock
private boolean tryLock(boolean mustLock) {
while (true) {
@@ -1408,7 +1357,7 @@ public class Catalog {
// start daemon thread to update global partition in memory information periodically
partitionInMemoryInfoCollector.start();
streamLoadRecordMgr.start();
- icebergTableCreationRecordMgr.start();
+ getInternalDataSource().getIcebergTableCreationRecordMgr().start();
}
// start threads that should running on all FE
@@ -1416,8 +1365,8 @@ public class Catalog {
tabletStatMgr.start();
// load and export job label cleaner thread
labelCleaner.start();
- // ES state store
- esRepository.start();
+ // es repository
+ getInternalDataSource().getEsRepository().start();
// domain resolver
domainResolver.start();
}
@@ -1627,44 +1576,6 @@ public class Catalog {
MetaReader.read(curFile, this);
}
- public void recreateTabletInvertIndex() {
- if (isCheckpointThread()) {
- return;
- }
-
- // create inverted index
- TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
- for (Database db : this.fullNameToDb.values()) {
- long dbId = db.getId();
- for (Table table : db.getTables()) {
- if (table.getType() != TableType.OLAP) {
- continue;
- }
-
- OlapTable olapTable = (OlapTable) table;
- long tableId = olapTable.getId();
- Collection<Partition> allPartitions = olapTable.getAllPartitions();
- for (Partition partition : allPartitions) {
- long partitionId = partition.getId();
- TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(
- partitionId).getStorageMedium();
- for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
- long indexId = index.getId();
- int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
- for (Tablet tablet : index.getTablets()) {
- long tabletId = tablet.getId();
- invertedIndex.addTablet(tabletId, tabletMeta);
- for (Replica replica : tablet.getReplicas()) {
- invertedIndex.addReplica(tabletId, replica);
- }
- }
- } // end for indices
- } // end for partitions
- } // end for tables
- } // end for dbs
- }
-
public long loadHeader(DataInputStream dis, MetaHeader metaHeader, long checksum) throws IOException, DdlException {
switch (metaHeader.getMetaFormat()) {
case COR1:
@@ -1721,21 +1632,7 @@ public class Catalog {
}
public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlException {
- int dbCount = dis.readInt();
- long newChecksum = checksum ^ dbCount;
- for (long i = 0; i < dbCount; ++i) {
- Database db = new Database();
- db.readFields(dis);
- newChecksum ^= db.getId();
- idToDb.put(db.getId(), db);
- fullNameToDb.put(db.getFullName(), db);
- if (db.getDbState() == DbState.LINK) {
- fullNameToDb.put(db.getAttachDb(), db);
- }
- globalTransactionMgr.addDatabaseTransactionMgr(db.getId());
- }
- LOG.info("finished replay databases from image");
- return newChecksum;
+ return getInternalDataSource().loadDb(dis, checksum);
}
public long loadLoadJob(DataInputStream dis, long checksum) throws IOException, DdlException {
@@ -2044,19 +1941,7 @@ public class Catalog {
}
public long saveDb(CountingDataOutputStream dos, long checksum) throws IOException {
- int dbCount = idToDb.size() - nameToCluster.keySet().size();
- checksum ^= dbCount;
- dos.writeInt(dbCount);
- for (Map.Entry<Long, Database> entry : idToDb.entrySet()) {
- Database db = entry.getValue();
- String dbName = db.getFullName();
- // Don't write information_schema db meta
- if (!InfoSchemaDb.isInfoSchemaDb(dbName)) {
- checksum ^= entry.getKey();
- db.write(dos);
- }
- }
- return checksum;
+ return getInternalDataSource().saveDb(dos, checksum);
}
public long saveLoadJob(CountingDataOutputStream dos, long checksum) throws IOException {
@@ -2592,312 +2477,47 @@ public class Catalog {
return null;
}
-
// The interface which DdlExecutor needs.
public void createDb(CreateDbStmt stmt) throws DdlException {
- final String clusterName = stmt.getClusterName();
- String fullDbName = stmt.getFullDbName();
- Map<String, String> properties = stmt.getProperties();
-
- long id = getNextId();
- Database db = new Database(id, fullDbName);
- db.setClusterName(clusterName);
- // check and analyze database properties before create database
- db.setDbProperties(new DatabaseProperty(properties).checkAndBuildProperties());
-
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- try {
- if (!nameToCluster.containsKey(clusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_SELECT_CLUSTER, clusterName);
- }
- if (fullNameToDb.containsKey(fullDbName)) {
- if (stmt.isSetIfNotExists()) {
- LOG.info("create database[{}] which already exists", fullDbName);
- return;
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, fullDbName);
- }
- } else {
- unprotectCreateDb(db);
- editLog.logCreateDb(db);
- }
- } finally {
- unlock();
- }
- LOG.info("createDb dbName = " + fullDbName + ", id = " + id);
-
- // create tables in iceberg database
- if (db.getDbProperties().getIcebergProperty().isExist()) {
- icebergTableCreationRecordMgr.registerDb(db);
- }
+ getInternalDataSource().createDb(stmt);
}
// For replay edit log, need't lock metadata
public void unprotectCreateDb(Database db) {
- idToDb.put(db.getId(), db);
- fullNameToDb.put(db.getFullName(), db);
- final Cluster cluster = nameToCluster.get(db.getClusterName());
- cluster.addDb(db.getFullName(), db.getId());
- globalTransactionMgr.addDatabaseTransactionMgr(db.getId());
+ getInternalDataSource().unprotectCreateDb(db);
}
// for test
public void addCluster(Cluster cluster) {
- nameToCluster.put(cluster.getName(), cluster);
- idToCluster.put(cluster.getId(), cluster);
+ getInternalDataSource().addCluster(cluster);
}
public void replayCreateDb(Database db) {
- tryLock(true);
- try {
- unprotectCreateDb(db);
- } finally {
- unlock();
- }
+ getInternalDataSource().replayCreateDb(db);
}
public void dropDb(DropDbStmt stmt) throws DdlException {
- String dbName = stmt.getDbName();
-
- // 1. check if database exists
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- try {
- if (!fullNameToDb.containsKey(dbName)) {
- if (stmt.isSetIfExists()) {
- LOG.info("drop database[{}] which does not exist", dbName);
- return;
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
- }
- }
-
- // 2. drop tables in db
- Database db = this.fullNameToDb.get(dbName);
- db.writeLock();
- try {
- if (!stmt.isForceDrop()) {
- if (Catalog.getCurrentCatalog().getGlobalTransactionMgr().existCommittedTxns(db.getId(), null, null)) {
- throw new DdlException("There are still some transactions in the COMMITTED state waiting to be completed. "
- + "The database [" + dbName + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
- + " please use \"DROP database FORCE\".");
- }
- }
- if (db.getDbState() == DbState.LINK && dbName.equals(db.getAttachDb())) {
- // We try to drop a hard link.
- final DropLinkDbAndUpdateDbInfo info = new DropLinkDbAndUpdateDbInfo();
- fullNameToDb.remove(db.getAttachDb());
- db.setDbState(DbState.NORMAL);
- info.setUpdateDbState(DbState.NORMAL);
- final Cluster cluster = nameToCluster
- .get(ClusterNamespace.getClusterNameFromFullName(db.getAttachDb()));
- final BaseParam param = new BaseParam();
- param.addStringParam(db.getAttachDb());
- param.addLongParam(db.getId());
- cluster.removeLinkDb(param);
- info.setDropDbCluster(cluster.getName());
- info.setDropDbId(db.getId());
- info.setDropDbName(db.getAttachDb());
- editLog.logDropLinkDb(info);
- return;
- }
-
- if (db.getDbState() == DbState.LINK && dbName.equals(db.getFullName())) {
- // We try to drop a db which other dbs attach to it,
- // which is not allowed.
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
- ClusterNamespace.getNameFromFullName(dbName));
- return;
- }
-
- if (dbName.equals(db.getAttachDb()) && db.getDbState() == DbState.MOVE) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
- ClusterNamespace.getNameFromFullName(dbName));
- return;
- }
-
- // save table names for recycling
- Set<String> tableNames = db.getTableNamesWithLock();
- List<Table> tableList = db.getTablesOnIdOrder();
- MetaLockUtils.writeLockTables(tableList);
- try {
- if (!stmt.isForceDrop()) {
- for (Table table : tableList) {
- if (table.getType() == TableType.OLAP) {
- OlapTable olapTable = (OlapTable) table;
- if (olapTable.getState() != OlapTableState.NORMAL) {
- throw new DdlException("The table [" + olapTable.getState() + "]'s state is " + olapTable.getState() + ", cannot be dropped."
- + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered),"
- + " please use \"DROP table FORCE\".");
- }
- }
- }
- }
- unprotectDropDb(db, stmt.isForceDrop(), false);
- } finally {
- MetaLockUtils.writeUnlockTables(tableList);
- }
-
- if (!stmt.isForceDrop()) {
- Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
- } else {
- Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
- }
- } finally {
- db.writeUnlock();
- }
-
- // 3. remove db from catalog
- idToDb.remove(db.getId());
- fullNameToDb.remove(db.getFullName());
- final Cluster cluster = nameToCluster.get(db.getClusterName());
- cluster.removeDb(dbName, db.getId());
- DropDbInfo info = new DropDbInfo(dbName, stmt.isForceDrop());
- editLog.logDropDb(info);
- } finally {
- unlock();
- }
-
- LOG.info("finish drop database[{}], is force : {}", dbName, stmt.isForceDrop());
- }
-
- public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay) {
- // drop Iceberg database table creation records
- if (db.getDbProperties().getIcebergProperty().isExist()) {
- icebergTableCreationRecordMgr.deregisterDb(db);
- }
- for (Table table : db.getTables()) {
- unprotectDropTable(db, table, isForeDrop, isReplay);
- }
- db.markDropped();
+ getInternalDataSource().dropDb(stmt);
}
public void replayDropLinkDb(DropLinkDbAndUpdateDbInfo info) {
- tryLock(true);
- try {
- final Database db = this.fullNameToDb.remove(info.getDropDbName());
- db.setDbState(info.getUpdateDbState());
- final Cluster cluster = nameToCluster
- .get(info.getDropDbCluster());
- final BaseParam param = new BaseParam();
- param.addStringParam(db.getAttachDb());
- param.addLongParam(db.getId());
- cluster.removeLinkDb(param);
- } finally {
- unlock();
- }
+ getInternalDataSource().replayDropLinkDb(info);
}
public void replayDropDb(String dbName, boolean isForceDrop) throws DdlException {
- tryLock(true);
- try {
- Database db = fullNameToDb.get(dbName);
- db.writeLock();
- try {
- Set<String> tableNames = db.getTableNamesWithLock();
- List<Table> tableList = db.getTablesOnIdOrder();
- MetaLockUtils.writeLockTables(tableList);
- try {
- unprotectDropDb(db, isForceDrop, true);
- } finally {
- MetaLockUtils.writeUnlockTables(tableList);
- }
- if (!isForceDrop) {
- Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
- } else {
- Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
- }
- } finally {
- db.writeUnlock();
- }
-
- fullNameToDb.remove(dbName);
- idToDb.remove(db.getId());
- final Cluster cluster = nameToCluster.get(db.getClusterName());
- cluster.removeDb(dbName, db.getId());
- } finally {
- unlock();
- }
+ getInternalDataSource().replayDropDb(dbName, isForceDrop);
}
public void recoverDatabase(RecoverDbStmt recoverStmt) throws DdlException {
- // check is new db with same name already exist
- if (getDb(recoverStmt.getDbName()).isPresent()) {
- throw new DdlException("Database[" + recoverStmt.getDbName() + "] already exist.");
- }
-
- Database db = Catalog.getCurrentRecycleBin().recoverDatabase(recoverStmt.getDbName());
-
- // add db to catalog
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- db.writeLock();
- List<Table> tableList = db.getTablesOnIdOrder();
- MetaLockUtils.writeLockTables(tableList);
- try {
- if (fullNameToDb.containsKey(db.getFullName())) {
- throw new DdlException("Database[" + db.getFullName() + "] already exist.");
- // it's ok that we do not put db back to CatalogRecycleBin
- // cause this db cannot recover any more
- }
-
- fullNameToDb.put(db.getFullName(), db);
- idToDb.put(db.getId(), db);
- final Cluster cluster = nameToCluster.get(db.getClusterName());
- cluster.addDb(db.getFullName(), db.getId());
-
- // log
- RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L);
- editLog.logRecoverDb(recoverInfo);
- db.unmarkDropped();
- } finally {
- MetaLockUtils.writeUnlockTables(tableList);
- db.writeUnlock();
- unlock();
- }
-
- LOG.info("recover database[{}]", db.getId());
+ getInternalDataSource().recoverDatabase(recoverStmt);
}
public void recoverTable(RecoverTableStmt recoverStmt) throws DdlException {
- String dbName = recoverStmt.getDbName();
- String tableName = recoverStmt.getTableName();
-
- Database db = this.getDbOrDdlException(dbName);
- db.writeLockOrDdlException();
- try {
- if (db.getTable(tableName).isPresent()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
- if (!Catalog.getCurrentRecycleBin().recoverTable(db, tableName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
- }
- } finally {
- db.writeUnlock();
- }
+ getInternalDataSource().recoverTable(recoverStmt);
}
public void recoverPartition(RecoverPartitionStmt recoverStmt) throws DdlException {
- String dbName = recoverStmt.getDbName();
- String tableName = recoverStmt.getTableName();
-
- Database db = this.getDbOrDdlException(dbName);
- OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
- olapTable.writeLockOrDdlException();
- try {
- String partitionName = recoverStmt.getPartitionName();
- if (olapTable.getPartition(partitionName) != null) {
- throw new DdlException("partition[" + partitionName + "] already exist in table[" + tableName + "]");
- }
-
- Catalog.getCurrentRecycleBin().recoverPartition(db.getId(), olapTable, partitionName);
- } finally {
- olapTable.writeUnlock();
- }
+ getInternalDataSource().recoverPartition(recoverStmt);
}
public void replayEraseDatabase(long dbId) throws DdlException {
@@ -2915,104 +2535,19 @@ public class Catalog {
}
public void alterDatabaseQuota(AlterDatabaseQuotaStmt stmt) throws DdlException {
- String dbName = stmt.getDbName();
- Database db = this.getDbOrDdlException(dbName);
- QuotaType quotaType = stmt.getQuotaType();
- db.writeLockOrDdlException();
- try {
- if (quotaType == QuotaType.DATA) {
- db.setDataQuota(stmt.getQuota());
- } else if (quotaType == QuotaType.REPLICA) {
- db.setReplicaQuota(stmt.getQuota());
- }
- long quota = stmt.getQuota();
- DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType);
- editLog.logAlterDb(dbInfo);
- } finally {
- db.writeUnlock();
- }
+ getInternalDataSource().alterDatabaseQuota(stmt);
}
public void replayAlterDatabaseQuota(String dbName, long quota, QuotaType quotaType) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(dbName);
- db.writeLock();
- try {
- if (quotaType == QuotaType.DATA) {
- db.setDataQuota(quota);
- } else if (quotaType == QuotaType.REPLICA) {
- db.setReplicaQuota(quota);
- }
- } finally {
- db.writeUnlock();
- }
+ getInternalDataSource().replayAlterDatabaseQuota(dbName, quota, quotaType);
}
public void renameDatabase(AlterDatabaseRename stmt) throws DdlException {
- String fullDbName = stmt.getDbName();
- String newFullDbName = stmt.getNewDbName();
- String clusterName = stmt.getClusterName();
-
- if (fullDbName.equals(newFullDbName)) {
- throw new DdlException("Same database name");
- }
-
- Database db = null;
- Cluster cluster = null;
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- try {
- cluster = nameToCluster.get(clusterName);
- if (cluster == null) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
- }
- // check if db exists
- db = fullNameToDb.get(fullDbName);
- if (db == null) {
- ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, fullDbName);
- }
-
- if (db.getDbState() == DbState.LINK || db.getDbState() == DbState.MOVE) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_RENAME_DB_ERR, fullDbName);
- }
- // check if name is already used
- if (fullNameToDb.get(newFullDbName) != null) {
- throw new DdlException("Database name[" + newFullDbName + "] is already used");
- }
-
- cluster.removeDb(db.getFullName(), db.getId());
- cluster.addDb(newFullDbName, db.getId());
- // 1. rename db
- db.setNameWithLock(newFullDbName);
-
- // 2. add to meta. check again
- fullNameToDb.remove(fullDbName);
- fullNameToDb.put(newFullDbName, db);
-
- DatabaseInfo dbInfo = new DatabaseInfo(fullDbName, newFullDbName, -1L, QuotaType.NONE);
- editLog.logDatabaseRename(dbInfo);
- } finally {
- unlock();
- }
-
- LOG.info("rename database[{}] to [{}]", fullDbName, newFullDbName);
+ getInternalDataSource().renameDatabase(stmt);
}
public void replayRenameDatabase(String dbName, String newDbName) {
- tryLock(true);
- try {
- Database db = fullNameToDb.get(dbName);
- Cluster cluster = nameToCluster.get(db.getClusterName());
- cluster.removeDb(db.getFullName(), db.getId());
- db.setName(newDbName);
- cluster.addDb(newDbName, db.getId());
- fullNameToDb.remove(dbName);
- fullNameToDb.put(newDbName, db);
- } finally {
- unlock();
- }
-
- LOG.info("replay rename database {} to {}", dbName, newDbName);
+ getInternalDataSource().replayRenameDatabase(dbName, newDbName);
}
/**
@@ -3035,1142 +2570,39 @@ public class Catalog {
* 11. add this table to ColocateGroup if necessary
*/
public void createTable(CreateTableStmt stmt) throws UserException {
- String engineName = stmt.getEngineName();
- String dbName = stmt.getDbName();
- String tableName = stmt.getTableName();
-
- // check if db exists
- Database db = this.getDbOrDdlException(dbName);
-
- // only internal table should check quota and cluster capacity
- if (!stmt.isExternal()) {
- // check cluster capacity
- Catalog.getCurrentSystemInfo().checkClusterCapacity(stmt.getClusterName());
- // check db quota
- db.checkQuota();
- }
-
- // check if table exists in db
- if (db.getTable(tableName).isPresent()) {
- if (stmt.isSetIfNotExists()) {
- LOG.info("create table[{}] which already exists", tableName);
- return;
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
- }
-
- if (engineName.equals("olap")) {
- createOlapTable(db, stmt);
- return;
- } else if (engineName.equals("odbc")) {
- createOdbcTable(db, stmt);
- return;
- } else if (engineName.equals("mysql")) {
- createMysqlTable(db, stmt);
- return;
- } else if (engineName.equals("broker")) {
- createBrokerTable(db, stmt);
- return;
- } else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
- createEsTable(db, stmt);
- return;
- } else if (engineName.equalsIgnoreCase("hive")) {
- createHiveTable(db, stmt);
- return;
- } else if (engineName.equalsIgnoreCase("iceberg")) {
- IcebergCatalogMgr.createIcebergTable(db, stmt);
- return;
- } else if (engineName.equalsIgnoreCase("hudi")) {
- createHudiTable(db, stmt);
- return;
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
- }
- Preconditions.checkState(false);
+ getInternalDataSource().createTable(stmt);
}
public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
- try {
- Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getExistedDbName());
- Table table = db.getTableOrDdlException(stmt.getExistedTableName());
-
- if (table.getType() == TableType.VIEW) {
- throw new DdlException("Not support create table from a View");
- }
-
- List<String> createTableStmt = Lists.newArrayList();
- table.readLock();
- try {
- if (table.getType() == TableType.OLAP) {
- if (!CollectionUtils.isEmpty(stmt.getRollupNames())) {
- OlapTable olapTable = (OlapTable) table;
- for (String rollupIndexName : stmt.getRollupNames()) {
- if (!olapTable.hasMaterializedIndex(rollupIndexName)) {
- throw new DdlException("Rollup index[" + rollupIndexName + "] not exists in Table[" + olapTable.getName() + "]");
- }
- }
- }
- } else if (!CollectionUtils.isEmpty(stmt.getRollupNames()) || stmt.isWithAllRollup()) {
- throw new DdlException("Table[" + table.getName() + "] is external, not support rollup copy");
- }
-
- Catalog.getDdlStmt(stmt, stmt.getDbName(), table, createTableStmt, null, null, false, false, true);
- if (createTableStmt.isEmpty()) {
- ErrorReport.reportDdlException(ErrorCode.ERROR_CREATE_TABLE_LIKE_EMPTY, "CREATE");
- }
- } finally {
- table.readUnlock();
- }
- CreateTableStmt parsedCreateTableStmt = (CreateTableStmt) SqlParserUtils.parseAndAnalyzeStmt(createTableStmt.get(0), ConnectContext.get());
- parsedCreateTableStmt.setTableName(stmt.getTableName());
- parsedCreateTableStmt.setIfNotExists(stmt.isIfNotExists());
- createTable(parsedCreateTableStmt);
- } catch (UserException e) {
- throw new DdlException("Failed to execute CREATE TABLE LIKE " + stmt.getExistedTableName() + ". Reason: " + e.getMessage());
- }
+ getInternalDataSource().createTableLike(stmt);
}
public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlException {
- try {
- List<String> columnNames = stmt.getColumnNames();
- CreateTableStmt createTableStmt = stmt.getCreateTableStmt();
- QueryStmt queryStmt = stmt.getQueryStmt();
- ArrayList<Expr> resultExprs = queryStmt.getResultExprs();
- ArrayList<String> colLabels = queryStmt.getColLabels();
- int size = resultExprs.size();
- // Check columnNames
- int colNameIndex = 0;
- for (int i = 0; i < size; ++i) {
- String name;
- if (columnNames != null) {
- // use custom column names
- name = columnNames.get(i);
- } else {
- name = colLabels.get(i);
- }
- try {
- FeNameFormat.checkColumnName(name);
- } catch (AnalysisException exception) {
- name = "_col" + (colNameIndex++);
- }
- TypeDef typeDef;
- Expr resultExpr = resultExprs.get(i);
- if (resultExpr.getType().isStringType() && resultExpr.getType().getLength() < 0) {
- typeDef = new TypeDef(Type.STRING);
- } else {
- typeDef = new TypeDef(resultExpr.getType());
- }
- ColumnDef columnDef;
- if (resultExpr.getSrcSlotRef() == null) {
- columnDef = new ColumnDef(name, typeDef, false, null, true, new DefaultValue(false, null), "");
- } else {
- Column column = resultExpr.getSrcSlotRef().getDesc().getColumn();
- boolean setDefault = StringUtils.isNotBlank(column.getDefaultValue());
- columnDef = new ColumnDef(name, typeDef, column.isKey(),
- column.getAggregationType(), column.isAllowNull(),
- new DefaultValue(setDefault, column.getDefaultValue()), column.getComment());
- }
- createTableStmt.addColumnDef(columnDef);
- // set first column as default distribution
- if (createTableStmt.getDistributionDesc() == null && i == 0) {
- createTableStmt.setDistributionDesc(new HashDistributionDesc(10, Lists.newArrayList(name)));
- }
- }
- Analyzer dummyRootAnalyzer = new Analyzer(this, ConnectContext.get());
- createTableStmt.analyze(dummyRootAnalyzer);
- createTable(createTableStmt);
- } catch (UserException e) {
- throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
- }
+ getInternalDataSource().createTableAsSelect(stmt);
}
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
- SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
- DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
- boolean isTempPartition = addPartitionClause.isTempPartition();
+ getInternalDataSource().addPartition(db, tableName, addPartitionClause);
+ }
- DistributionInfo distributionInfo;
- Map<Long, MaterializedIndexMeta> indexIdToMeta;
- Set<String> bfColumns;
- String partitionName = singlePartitionDesc.getPartitionName();
-
- // check
- Table table = db.getOlapTableOrDdlException(tableName);
- // check state
- OlapTable olapTable = (OlapTable) table;
-
- olapTable.readLock();
- try {
- if (olapTable.getState() != OlapTableState.NORMAL) {
- throw new DdlException("Table[" + tableName + "]'s state is not NORMAL");
- }
-
- // check partition type
- PartitionInfo partitionInfo = olapTable.getPartitionInfo();
- if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
- throw new DdlException("Only support adding partition to range and list partitioned table");
- }
-
- // check partition name
- if (olapTable.checkPartitionNameExist(partitionName)) {
- if (singlePartitionDesc.isSetIfNotExists()) {
- LOG.info("add partition[{}] which already exists", partitionName);
- return;
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
- }
- }
-
- Map<String, String> properties = singlePartitionDesc.getProperties();
- // partition properties should inherit table properties
- ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation();
- if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)
- && !properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
- properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt());
- }
- if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
- properties.put(PropertyAnalyzer.PROPERTIES_INMEMORY, olapTable.isInMemory().toString());
- }
-
- singlePartitionDesc.analyze(partitionInfo.getPartitionColumns().size(), properties);
- partitionInfo.createAndCheckPartitionItem(singlePartitionDesc, isTempPartition);
-
- // get distributionInfo
- List<Column> baseSchema = olapTable.getBaseSchema();
- DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
- if (distributionDesc != null) {
- distributionInfo = distributionDesc.toDistributionInfo(baseSchema);
- // for now. we only support modify distribution's bucket num
- if (distributionInfo.getType() != defaultDistributionInfo.getType()) {
- throw new DdlException("Cannot assign different distribution type. default is: "
- + defaultDistributionInfo.getType());
- }
-
- if (distributionInfo.getType() == DistributionInfoType.HASH) {
- HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
- List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
- List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo)
- .getDistributionColumns();
- if (!newDistriCols.equals(defaultDistriCols)) {
- throw new DdlException("Cannot assign hash distribution with different distribution cols. "
- + "default is: " + defaultDistriCols);
- }
- if (hashDistributionInfo.getBucketNum() <= 0) {
- throw new DdlException("Cannot assign hash distribution buckets less than 1");
- }
- }
- } else {
- // make sure partition-dristribution-info is deep copied from default-distribution-info
- distributionInfo = defaultDistributionInfo.toDistributionDesc().toDistributionInfo(baseSchema);
- }
-
- // check colocation
- if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
- String fullGroupName = db.getId() + "_" + olapTable.getColocateGroup();
- ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName);
- Preconditions.checkNotNull(groupSchema);
- groupSchema.checkDistribution(distributionInfo);
- groupSchema.checkReplicaAllocation(singlePartitionDesc.getReplicaAlloc());
- }
-
- indexIdToMeta = olapTable.getCopiedIndexIdToMeta();
- bfColumns = olapTable.getCopiedBfColumns();
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- } finally {
- olapTable.readUnlock();
- }
-
- Preconditions.checkNotNull(distributionInfo);
- Preconditions.checkNotNull(olapTable);
- Preconditions.checkNotNull(indexIdToMeta);
-
- // create partition outside db lock
- DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty();
- Preconditions.checkNotNull(dataProperty);
- // check replica quota if this operation done
- long indexNum = indexIdToMeta.size();
- long bucketNum = distributionInfo.getBucketNum();
- long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum();
- long totalReplicaNum = indexNum * bucketNum * replicaNum;
- if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
- throw new DdlException("Database " + db.getFullName() + " table " + tableName
- + " add partition increasing " + totalReplicaNum
- + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
- }
- Set<Long> tabletIdSet = new HashSet<Long>();
- try {
- long partitionId = getNextId();
- Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
- olapTable.getId(),
- olapTable.getBaseIndexId(),
- partitionId, partitionName,
- indexIdToMeta,
- distributionInfo,
- dataProperty.getStorageMedium(),
- singlePartitionDesc.getReplicaAlloc(),
- singlePartitionDesc.getVersionInfo(),
- bfColumns, olapTable.getBfFpp(),
- tabletIdSet, olapTable.getCopiedIndexes(),
- singlePartitionDesc.isInMemory(),
- olapTable.getStorageFormat(),
- singlePartitionDesc.getTabletType(),
- olapTable.getCompressionType(),
- olapTable.getDataSortInfo()
- );
-
- // check again
- table = db.getOlapTableOrDdlException(tableName);
- table.writeLockOrDdlException();
- try {
- olapTable = (OlapTable) table;
- if (olapTable.getState() != OlapTableState.NORMAL) {
- throw new DdlException("Table[" + tableName + "]'s state is not NORMAL");
- }
-
- // check partition name
- if (olapTable.checkPartitionNameExist(partitionName)) {
- if (singlePartitionDesc.isSetIfNotExists()) {
- LOG.info("add partition[{}] which already exists", partitionName);
- return;
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
- }
- }
-
- // check if meta changed
- // rollup index may be added or dropped during add partition operation.
- // schema may be changed during add partition operation.
- boolean metaChanged = false;
- if (olapTable.getIndexNameToId().size() != indexIdToMeta.size()) {
- metaChanged = true;
- } else {
- // compare schemaHash
- for (Map.Entry<Long, MaterializedIndexMeta> entry : olapTable.getIndexIdToMeta().entrySet()) {
- long indexId = entry.getKey();
- if (!indexIdToMeta.containsKey(indexId)) {
- metaChanged = true;
- break;
- }
- if (indexIdToMeta.get(indexId).getSchemaHash() != entry.getValue().getSchemaHash()) {
- metaChanged = true;
- break;
- }
- }
- }
-
- if (metaChanged) {
- throw new DdlException("Table[" + tableName + "]'s meta has been changed. try again.");
- }
-
- // check partition type
- PartitionInfo partitionInfo = olapTable.getPartitionInfo();
- if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
- throw new DdlException("Only support adding partition to range and list partitioned table");
- }
-
- // update partition info
- partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId, isTempPartition);
-
- if (isTempPartition) {
- olapTable.addTempPartition(partition);
- } else {
- olapTable.addPartition(partition);
- }
-
- // log
- PartitionPersistInfo info = null;
- if (partitionInfo.getType() == PartitionType.RANGE) {
- info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
- partitionInfo.getItem(partitionId).getItems(),
- ListPartitionItem.DUMMY_ITEM,
- dataProperty,
- partitionInfo.getReplicaAllocation(partitionId),
- partitionInfo.getIsInMemory(partitionId),
- isTempPartition);
- } else if (partitionInfo.getType() == PartitionType.LIST) {
- info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
- RangePartitionItem.DUMMY_ITEM,
- partitionInfo.getItem(partitionId), dataProperty,
- partitionInfo.getReplicaAllocation(partitionId),
- partitionInfo.getIsInMemory(partitionId),
- isTempPartition);
- }
- editLog.logAddPartition(info);
-
- LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition);
- } finally {
- table.writeUnlock();
- }
- } catch (DdlException e) {
- for (Long tabletId : tabletIdSet) {
- Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
- }
- throw e;
- }
- }
-
- public void replayAddPartition(PartitionPersistInfo info) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(info.getDbId());
- OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
- olapTable.writeLock();
- try {
- Partition partition = info.getPartition();
- PartitionInfo partitionInfo = olapTable.getPartitionInfo();
- if (info.isTempPartition()) {
- olapTable.addTempPartition(partition);
- } else {
- olapTable.addPartition(partition);
- }
-
- PartitionItem partitionItem = null;
- if (partitionInfo.getType() == PartitionType.RANGE) {
- partitionItem = new RangePartitionItem(info.getRange());
- } else if (partitionInfo.getType() == PartitionType.LIST) {
- partitionItem = info.getListPartitionItem();
- }
-
- partitionInfo.unprotectHandleNewSinglePartitionDesc(partition.getId(), info.isTempPartition(),
- partitionItem, info.getDataProperty(), info.getReplicaAlloc(), info.isInMemory());
-
- if (!isCheckpointThread()) {
- // add to inverted index
- TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
- for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
- long indexId = index.getId();
- int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(),
- index.getId(), schemaHash, info.getDataProperty().getStorageMedium());
- for (Tablet tablet : index.getTablets()) {
- long tabletId = tablet.getId();
- invertedIndex.addTablet(tabletId, tabletMeta);
- for (Replica replica : tablet.getReplicas()) {
- invertedIndex.addReplica(tabletId, replica);
- }
- }
- }
- }
- } finally {
- olapTable.writeUnlock();
- }
- }
+ public void replayAddPartition(PartitionPersistInfo info) throws MetaNotFoundException {
+ getInternalDataSource().replayAddPartition(info);
+ }
public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause clause) throws DdlException {
- Preconditions.checkArgument(olapTable.isWriteLockHeldByCurrentThread());
-
- String partitionName = clause.getPartitionName();
- boolean isTempPartition = clause.isTempPartition();
-
- if (olapTable.getState() != OlapTableState.NORMAL) {
- throw new DdlException("Table[" + olapTable.getName() + "]'s state is not NORMAL");
- }
-
- if (!olapTable.checkPartitionNameExist(partitionName, isTempPartition)) {
- if (clause.isSetIfExists()) {
- LOG.info("drop partition[{}] which does not exist", partitionName);
- return;
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_DROP_PARTITION_NON_EXISTENT, partitionName);
- }
- }
-
- PartitionInfo partitionInfo = olapTable.getPartitionInfo();
- if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
- throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table");
- }
-
- // drop
- if (isTempPartition) {
- olapTable.dropTempPartition(partitionName, true);
- } else {
- if (!clause.isForceDrop()) {
- Partition partition = olapTable.getPartition(partitionName);
- if (partition != null) {
- if (Catalog.getCurrentCatalog().getGlobalTransactionMgr().existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) {
- throw new DdlException("There are still some transactions in the COMMITTED state waiting to be completed."
- + " The partition [" + partitionName + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
- + " please use \"DROP partition FORCE\".");
- }
- }
- }
- olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop());
- }
-
- // log
- DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition, clause.isForceDrop());
- editLog.logDropPartition(info);
-
- LOG.info("succeed in dropping partition[{}], is temp : {}, is force : {}", partitionName, isTempPartition, clause.isForceDrop());
+ getInternalDataSource().dropPartition(db, olapTable, clause);
}
public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(info.getDbId());
- OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
- olapTable.writeLock();
- try {
- if (info.isTempPartition()) {
- olapTable.dropTempPartition(info.getPartitionName(), true);
- } else {
- olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop());
- }
- } finally {
- olapTable.writeUnlock();
- }
+ getInternalDataSource().replayDropPartition(info);
}
public void replayErasePartition(long partitionId) {
- Catalog.getCurrentRecycleBin().replayErasePartition(partitionId);
+ getInternalDataSource().replayErasePartition(partitionId);
}
public void replayRecoverPartition(RecoverInfo info) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(info.getDbId());
- OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
- olapTable.writeLock();
- try {
- Catalog.getCurrentRecycleBin().replayRecoverPartition(olapTable, info.getPartitionId());
- } finally {
- olapTable.writeUnlock();
- }
- }
-
- private Partition createPartitionWithIndices(String clusterName, long dbId, long tableId,
- long baseIndexId, long partitionId, String partitionName,
- Map<Long, MaterializedIndexMeta> indexIdToMeta,
- DistributionInfo distributionInfo,
- TStorageMedium storageMedium,
- ReplicaAllocation replicaAlloc,
- Long versionInfo,
- Set<String> bfColumns,
- double bfFpp,
- Set<Long> tabletIdSet,
- List<Index> indexes,
- boolean isInMemory,
- TStorageFormat storageFormat,
- TTabletType tabletType,
- TCompressionType compressionType,
- DataSortInfo dataSortInfo) throws DdlException {
- // create base index first.
- Preconditions.checkArgument(baseIndexId != -1);
- MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
-
- // create partition with base index
- Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);
-
- // add to index map
- Map<Long, MaterializedIndex> indexMap = new HashMap<>();
- indexMap.put(baseIndexId, baseIndex);
-
- // create rollup index if has
- for (long indexId : indexIdToMeta.keySet()) {
- if (indexId == baseIndexId) {
- continue;
- }
-
- MaterializedIndex rollup = new MaterializedIndex(indexId, IndexState.NORMAL);
- indexMap.put(indexId, rollup);
- }
-
- // version and version hash
- if (versionInfo != null) {
- partition.updateVisibleVersion(versionInfo);
- }
- long version = partition.getVisibleVersion();
-
- short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
- for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
- long indexId = entry.getKey();
- MaterializedIndex index = entry.getValue();
- MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
-
- // create tablets
- int schemaHash = indexMeta.getSchemaHash();
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
- createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version,
- replicaAlloc, tabletMeta, tabletIdSet);
-
- boolean ok = false;
- String errMsg = null;
-
- // add create replica task for olap
- short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
- TStorageType storageType = indexMeta.getStorageType();
- List<Column> schema = indexMeta.getSchema();
- KeysType keysType = indexMeta.getKeysType();
- int totalTaskNum = index.getTablets().size() * totalReplicaNum;
- MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);
- AgentBatchTask batchTask = new AgentBatchTask();
- for (Tablet tablet : index.getTablets()) {
- long tabletId = tablet.getId();
- for (Replica replica : tablet.getReplicas()) {
- long backendId = replica.getBackendId();
- countDownLatch.addMark(backendId, tabletId);
- CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
- partitionId, indexId, tabletId,
- shortKeyColumnCount, schemaHash,
- version,
- keysType,
- storageType, storageMedium,
- schema, bfColumns, bfFpp,
- countDownLatch,
- indexes,
- isInMemory,
- tabletType,
- dataSortInfo,
- compressionType);
- task.setStorageFormat(storageFormat);
- batchTask.addTask(task);
- // add to AgentTaskQueue for handling finish report.
- // not for resending task
- AgentTaskQueue.addTask(task);
- }
- }
- AgentTaskExecutor.submit(batchTask);
-
- // estimate timeout
- long timeout = Config.tablet_create_timeout_second * 1000L * totalTaskNum;
- timeout = Math.min(timeout, Config.max_create_table_timeout_second * 1000);
- try {
- ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.warn("InterruptedException: ", e);
- ok = false;
- }
-
- if (!ok || !countDownLatch.getStatus().ok()) {
- errMsg = "Failed to create partition[" + partitionName + "]. Timeout.";
- // clear tasks
- AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
-
- if (!countDownLatch.getStatus().ok()) {
- errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg();
- } else {
- List<Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks();
- // only show at most 3 results
- List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 3));
- if (!subList.isEmpty()) {
- errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList);
- }
- }
- LOG.warn(errMsg);
- throw new DdlException(errMsg);
- }
-
- if (index.getId() != baseIndexId) {
- // add rollup index to partition
- partition.createRollupIndex(index);
- }
- } // end for indexMap
- return partition;
- }
-
- // Create olap table and related base index synchronously.
- private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
- String tableName = stmt.getTableName();
- LOG.debug("begin create olap table: {}", tableName);
-
- // create columns
- List<Column> baseSchema = stmt.getColumns();
- validateColumns(baseSchema);
-
- // create partition info
- PartitionDesc partitionDesc = stmt.getPartitionDesc();
- PartitionInfo partitionInfo = null;
- Map<String, Long> partitionNameToId = Maps.newHashMap();
- if (partitionDesc != null) {
- // gen partition id first
- PartitionDesc partDesc = partitionDesc;
- for (SinglePartitionDesc desc : partDesc.getSinglePartitionDescs()) {
- long partitionId = getNextId();
- partitionNameToId.put(desc.getPartitionName(), partitionId);
- }
- partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
- } else {
- if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) {
- throw new DdlException("Only support dynamic partition properties on range partition table");
- }
- long partitionId = getNextId();
- // use table name as single partition name
- partitionNameToId.put(tableName, partitionId);
- partitionInfo = new SinglePartitionInfo();
- }
-
- // get keys type
- KeysDesc keysDesc = stmt.getKeysDesc();
- Preconditions.checkNotNull(keysDesc);
- KeysType keysType = keysDesc.getKeysType();
-
- // create distribution info
- DistributionDesc distributionDesc = stmt.getDistributionDesc();
- Preconditions.checkNotNull(distributionDesc);
- DistributionInfo defaultDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
-
- // calc short key column count
- short shortKeyColumnCount = Catalog.calcShortKeyColumnCount(baseSchema, stmt.getProperties());
- LOG.debug("create table[{}] short key column count: {}", tableName, shortKeyColumnCount);
-
- // indexes
- TableIndexes indexes = new TableIndexes(stmt.getIndexes());
-
- // create table
- long tableId = Catalog.getCurrentCatalog().getNextId();
- OlapTable olapTable = new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo,
- defaultDistributionInfo, indexes);
- olapTable.setComment(stmt.getComment());
-
- // set base index id
- long baseIndexId = getNextId();
- olapTable.setBaseIndexId(baseIndexId);
-
- // set base index info to table
- // this should be done before create partition.
- Map<String, String> properties = stmt.getProperties();
-
- // get storage format
- TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2
- try {
- storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- olapTable.setStorageFormat(storageFormat);
-
- // get compression type
- TCompressionType compressionType = TCompressionType.LZ4;
- try {
- compressionType = PropertyAnalyzer.analyzeCompressionType(properties);
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- olapTable.setCompressionType(compressionType);
-
- // check data sort properties
- DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
- keysDesc.keysColumnSize(), storageFormat);
- olapTable.setDataSortInfo(dataSortInfo);
-
- // analyze bloom filter columns
- Set<String> bfColumns = null;
- double bfFpp = 0;
- try {
- bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(properties, baseSchema, keysType);
- if (bfColumns != null && bfColumns.isEmpty()) {
- bfColumns = null;
- }
-
- bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(properties);
- if (bfColumns != null && bfFpp == 0) {
- bfFpp = FeConstants.default_bloom_filter_fpp;
- } else if (bfColumns == null) {
- bfFpp = 0;
- }
-
- olapTable.setBloomFilterInfo(bfColumns, bfFpp);
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
-
- // analyze replica allocation
- ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
- if (replicaAlloc.isNotSet()) {
- replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
- }
- olapTable.setReplicationAllocation(replicaAlloc);
-
- // set in memory
- boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false);
- olapTable.setIsInMemory(isInMemory);
-
- // set remote storage
- String resourceName = PropertyAnalyzer.analyzeRemoteStorageResource(properties);
- olapTable.setRemoteStorageResource(resourceName);
-
- TTabletType tabletType;
- try {
- tabletType = PropertyAnalyzer.analyzeTabletType(properties);
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
-
-
- if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
- // if this is an unpartitioned table, we should analyze data property and replication num here.
- // if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase.
-
- // use table name as this single partition name
- long partitionId = partitionNameToId.get(tableName);
- DataProperty dataProperty = null;
- try {
- dataProperty = PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
- DataProperty.DEFAULT_DATA_PROPERTY);
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- Preconditions.checkNotNull(dataProperty);
- partitionInfo.setDataProperty(partitionId, dataProperty);
- partitionInfo.setReplicaAllocation(partitionId, replicaAlloc);
- partitionInfo.setIsInMemory(partitionId, isInMemory);
- partitionInfo.setTabletType(partitionId, tabletType);
- }
-
- // check colocation properties
- try {
- String colocateGroup = PropertyAnalyzer.analyzeColocate(properties);
- if (colocateGroup != null) {
- if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
- throw new AnalysisException("Random distribution for colocate table is unsupported");
- }
- String fullGroupName = db.getId() + "_" + colocateGroup;
- ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName);
- if (groupSchema != null) {
- // group already exist, check if this table can be added to this group
- groupSchema.checkColocateSchema(olapTable);
- }
- // add table to this group, if group does not exist, create a new one
- getColocateTableIndex().addTableToGroup(db.getId(), olapTable, colocateGroup,
- null /* generate group id inside */);
- olapTable.setColocateGroup(colocateGroup);
- }
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
-
- // get base index storage type. default is COLUMN
- TStorageType baseIndexStorageType = null;
- try {
- baseIndexStorageType = PropertyAnalyzer.analyzeStorageType(properties);
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- Preconditions.checkNotNull(baseIndexStorageType);
- // set base index meta
- int schemaVersion = 0;
- try {
- schemaVersion = PropertyAnalyzer.analyzeSchemaVersion(properties);
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- int schemaHash = Util.generateSchemaHash();
- olapTable.setIndexMeta(baseIndexId, tableName, baseSchema, schemaVersion, schemaHash,
- shortKeyColumnCount, baseIndexStorageType, keysType);
-
- for (AlterClause alterClause : stmt.getRollupAlterClauseList()) {
- AddRollupClause addRollupClause = (AddRollupClause) alterClause;
-
- Long baseRollupIndex = olapTable.getIndexIdByName(tableName);
-
- // get storage type for rollup index
- TStorageType rollupIndexStorageType = null;
- try {
- rollupIndexStorageType = PropertyAnalyzer.analyzeStorageType(addRollupClause.getProperties());
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- Preconditions.checkNotNull(rollupIndexStorageType);
- // set rollup index meta to olap table
- List<Column> rollupColumns = getMaterializedViewHandler().checkAndPrepareMaterializedView(addRollupClause,
- olapTable, baseRollupIndex, false);
- short rollupShortKeyColumnCount = Catalog.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties());
- int rollupSchemaHash = Util.generateSchemaHash();
- long rollupIndexId = getCurrentCatalog().getNextId();
- olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion,
- rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType);
- }
-
- // analyse sequence column
- Type sequenceColType = null;
- try {
- sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType());
- if (sequenceColType != null) {
- olapTable.setSequenceInfo(sequenceColType);
- }
- } catch (Exception e) {
- throw new DdlException(e.getMessage());
- }
-
- // analyze version info
- Long versionInfo = null;
- try {
- versionInfo = PropertyAnalyzer.analyzeVersionInfo(properties);
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- Preconditions.checkNotNull(versionInfo);
-
- // a set to record every new tablet created when create table
- // if failed in any step, use this set to do clear things
- Set<Long> tabletIdSet = new HashSet<>();
- // create partition
- try {
- if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
- // this is a 1-level partitioned table
- // use table name as partition name
- DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
- String partitionName = tableName;
- long partitionId = partitionNameToId.get(partitionName);
-
- // check replica quota if this operation done
- long indexNum = olapTable.getIndexIdToMeta().size();
- long bucketNum = partitionDistributionInfo.getBucketNum();
- long replicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
- long totalReplicaNum = indexNum * bucketNum * replicaNum;
- if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
- throw new DdlException("Database " + db.getFullName() + " create unpartitioned table "
- + tableName + " increasing " + totalReplicaNum
- + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
- }
- // create partition
- Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
- olapTable.getId(), olapTable.getBaseIndexId(),
- partitionId, partitionName,
- olapTable.getIndexIdToMeta(),
- partitionDistributionInfo,
- partitionInfo.getDataProperty(partitionId).getStorageMedium(),
- partitionInfo.getReplicaAllocation(partitionId),
- versionInfo, bfColumns, bfFpp,
- tabletIdSet, olapTable.getCopiedIndexes(),
- isInMemory, storageFormat, tabletType, compressionType, olapTable.getDataSortInfo());
- olapTable.addPartition(partition);
- } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
- try {
- // just for remove entries in stmt.getProperties(),
- // and then check if there still has unknown properties
- PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_DATA_PROPERTY);
- if (partitionInfo.getType() == PartitionType.RANGE) {
- DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties);
-
- } else if (partitionInfo.getType() == PartitionType.LIST) {
- if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
- throw new DdlException("Only support dynamic partition properties on range partition table");
-
- }
- }
-
- if (properties != null && !properties.isEmpty()) {
- // here, all properties should be checked
- throw new DdlException("Unknown properties: " + properties);
- }
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
-
- // check replica quota if this operation done
- long totalReplicaNum = 0;
- for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
- long indexNum = olapTable.getIndexIdToMeta().size();
- long bucketNum = defaultDistributionInfo.getBucketNum();
- long replicaNum = partitionInfo.getReplicaAllocation(entry.getValue()).getTotalReplicaNum();
- totalReplicaNum += indexNum * bucketNum * replicaNum;
- }
- if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
- throw new DdlException("Database " + db.getFullName() + " create table "
- + tableName + " increasing " + totalReplicaNum
- + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
- }
-
- // this is a 2-level partitioned tables
- for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
- DataProperty dataProperty = partitionInfo.getDataProperty(entry.getValue());
- DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
- Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
- olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(),
- olapTable.getIndexIdToMeta(), partitionDistributionInfo,
- dataProperty.getStorageMedium(),
- partitionInfo.getReplicaAllocation(entry.getValue()),
- versionInfo, bfColumns, bfFpp,
- tabletIdSet, olapTable.getCopiedIndexes(),
- isInMemory, storageFormat,
- partitionInfo.getTabletType(entry.getValue()),
- compressionType, olapTable.getDataSortInfo());
- olapTable.addPartition(partition);
- }
- } else {
- throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
- }
-
- Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
- if (!result.first) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
-
- if (result.second) {
- if (getColocateTableIndex().isColocateTable(tableId)) {
- // if this is a colocate join table, its table id is already added to colocate group
- // so we should remove the tableId here
- getColocateTableIndex().removeTable(tableId);
- }
- for (Long tabletId : tabletIdSet) {
- Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
- }
- LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
- } else {
- // we have added these index to memory, only need to persist here
- if (getColocateTableIndex().isColocateTable(tableId)) {
- GroupId groupId = getColocateTableIndex().getGroup(tableId);
- Map<Tag, List<List<Long>>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
- ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
- editLog.logColocateAddTable(info);
- }
- LOG.info("successfully create table[{};{}]", tableName, tableId);
- // register or remove table from DynamicPartition after table created
- DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
- dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
- tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
- }
- } catch (DdlException e) {
- for (Long tabletId : tabletIdSet) {
- Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
- }
- // only remove from memory, because we have not persist it
- if (getColocateTableIndex().isColocateTable(tableId)) {
- getColocateTableIndex().removeTable(tableId);
- }
-
- throw e;
- }
- }
-
- private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
- String tableName = stmt.getTableName();
-
- List<Column> columns = stmt.getColumns();
-
- long tableId = Catalog.getCurrentCatalog().getNextId();
- MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
- mysqlTable.setComment(stmt.getComment());
- if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
- LOG.info("successfully create table[{}-{}]", tableName, tableId);
- return;
- }
-
- private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
- String tableName = stmt.getTableName();
- List<Column> columns = stmt.getColumns();
-
- long tableId = Catalog.getCurrentCatalog().getNextId();
- OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
- odbcTable.setComment(stmt.getComment());
- if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
- LOG.info("successfully create table[{}-{}]", tableName, tableId);
- return;
- }
-
- private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException {
- String tableName = stmt.getTableName();
-
- // create columns
- List<Column> baseSchema = stmt.getColumns();
- validateColumns(baseSchema);
-
- // create partition info
- PartitionDesc partitionDesc = stmt.getPartitionDesc();
- PartitionInfo partitionInfo = null;
- Map<String, Long> partitionNameToId = Maps.newHashMap();
- if (partitionDesc != null) {
- partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
- } else {
- long partitionId = getNextId();
- // use table name as single partition name
- partitionNameToId.put(tableName, partitionId);
- partitionInfo = new SinglePartitionInfo();
- }
-
- long tableId = Catalog.getCurrentCatalog().getNextId();
- EsTable esTable = new EsTable(tableId, tableName, baseSchema, stmt.getProperties(), partitionInfo);
- esTable.setComment(stmt.getComment());
-
- if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
- LOG.info("successfully create table{} with id {}", tableName, tableId);
- return esTable;
- }
-
- private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
- String tableName = stmt.getTableName();
-
- List<Column> columns = stmt.getColumns();
-
- long tableId = Catalog.getCurrentCatalog().getNextId();
- BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, stmt.getProperties());
- brokerTable.setComment(stmt.getComment());
- brokerTable.setBrokerProperties(stmt.getExtProperties());
-
- if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
- LOG.info("successfully create table[{}-{}]", tableName, tableId);
-
- return;
- }
-
- private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
- String tableName = stmt.getTableName();
- List<Column> columns = stmt.getColumns();
- long tableId = getNextId();
- HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
- hiveTable.setComment(stmt.getComment());
- // check hive table whether exists in hive database
- HiveMetaStoreClient hiveMetaStoreClient =
- HiveMetaStoreClientHelper.getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
- if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, hiveTable.getHiveDb(), hiveTable.getHiveTable())) {
- throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
- }
- // check hive table if exists in doris database
- if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
- LOG.info("successfully create table[{}-{}]", tableName, tableId);
- }
-
- private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlException {
- String tableName = stmt.getTableName();
- List<Column> columns = stmt.getColumns();
- long tableId = getNextId();
- HudiTable hudiTable = new HudiTable(tableId, tableName, columns, stmt.getProperties());
- hudiTable.setComment(stmt.getComment());
- // check hudi properties in create stmt.
- HudiUtils.validateCreateTable(hudiTable);
- // check hudi table whether exists in hive database
- String metastoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
- HiveMetaStoreClient hiveMetaStoreClient = HiveMetaStoreClientHelper.getClient(metastoreUris);
- if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient,
- hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName())) {
- throw new DdlException(String.format("Table [%s] dose not exist in Hive Metastore.",
- hudiTable.getHmsTableIdentifer()));
- }
- org.apache.hadoop.hive.metastore.api.Table hiveTable = HiveMetaStoreClientHelper.getTable(
- hudiTable.getHmsDatabaseName(),
- hudiTable.getHmsTableName(),
- metastoreUris);
- if (!HudiUtils.isHudiTable(hiveTable)) {
- throw new DdlException(String.format("Table [%s] is not a hudi table.", hudiTable.getHmsTableIdentifer()));
- }
- // after support snapshot query for mor, we should remove the check.
- if (HudiUtils.isHudiRealtimeTable(hiveTable)) {
- throw new DdlException(String.format("Can not support hudi realtime table.", hudiTable.getHmsTableName()));
- }
- // check table's schema when user specify the schema
- if (!hudiTable.getFullSchema().isEmpty()) {
- HudiUtils.validateColumns(hudiTable, hiveTable);
- }
- switch (hiveTable.getTableType()) {
- case "EXTERNAL_TABLE":
- case "MANAGED_TABLE":
- break;
- case "VIRTUAL_VIEW":
- default:
- throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "].");
- }
- // check hive table if exists in doris database
- if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
- }
- LOG.info("successfully create table[{}-{}]", tableName, tableId);
+ getInternalDataSource().replayRecoverPartition(info);
}
public static void getDdlStmt(Table table, List<String> createTableStmt, List<String> addPartitionStmt,
@@ -4576,306 +3008,48 @@ public class Catalog {
}
public void replayCreateTable(String dbName, Table table) throws MetaNotFoundException {
- Database db = this.fullNameToDb.get(dbName);
- try {
- db.createTableWithLock(table, true, false);
- } catch (DdlException e) {
- throw new MetaNotFoundException(e.getMessage());
- }
- if (!isCheckpointThread()) {
- // add to inverted index
- if (table.getType() == TableType.OLAP) {
- TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
- OlapTable olapTable = (OlapTable) table;
- long dbId = db.getId();
- long tableId = table.getId();
- for (Partition partition : olapTable.getAllPartitions()) {
- long partitionId = partition.getId();
- TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(
- partitionId).getStorageMedium();
- for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
- long indexId = mIndex.getId();
- int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
- for (Tablet tablet : mIndex.getTablets()) {
- long tabletId = tablet.getId();
- invertedIndex.addTablet(tabletId, tabletMeta);
- for (Replica replica : tablet.getReplicas()) {
- invertedIndex.addReplica(tabletId, replica);
- }
- }
- }
- } // end for partitions
- DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable, true);
- }
- }
+ getInternalDataSource().replayCreateTable(dbName, table);
}
public void replayAlterExternalTableSchema(String dbName, String tableName, List<Column> newSchema) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(dbName);
- Table table = db.getTableOrMetaException(tableName);
- table.writeLock();
- try {
- table.setNewFullSchema(newSchema);
- } finally {
- table.writeUnlock();
- }
- }
-
- private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
- DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc,
- TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
- ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
- Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
- GroupId groupId = null;
- if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
- if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
- throw new DdlException("Random distribution for colocate table is unsupported");
- }
- // if this is a colocate table, try to get backend seqs from colocation index.
- groupId = colocateIndex.getGroup(tabletMeta.getTableId());
- backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId);
- }
-
- // chooseBackendsArbitrary is true, means this may be the first table of colocation group,
- // or this is just a normal table, and we can choose backends arbitrary.
- // otherwise, backends should be chosen from backendsPerBucketSeq;
- boolean chooseBackendsArbitrary = backendsPerBucketSeq == null || backendsPerBucketSeq.isEmpty();
- if (chooseBackendsArbitrary) {
- backendsPerBucketSeq = Maps.newHashMap();
- }
- for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
- // create a new tablet with random chosen backends
- Tablet tablet = new Tablet(getNextId());
-
- // add tablet to inverted index first
- index.addTablet(tablet, tabletMeta);
- tabletIdSet.add(tablet.getId());
-
- // get BackendIds
- Map<Tag, List<Long>> chosenBackendIds;
- if (chooseBackendsArbitrary) {
- // This is the first colocate table in the group, or just a normal table,
- // randomly choose backends
- if (!Config.disable_storage_medium_check) {
- chosenBackendIds =
- getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
- tabletMeta.getStorageMedium());
- } else {
- chosenBackendIds =
- getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
- }
-
- for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
- backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
- backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
- }
- } else {
- // get backends from existing backend sequence
- chosenBackendIds = Maps.newHashMap();
- for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) {
- chosenBackendIds.put(entry.getKey(), entry.getValue().get(i));
- }
- }
- // create replicas
- short totalReplicaNum = (short) 0;
- for (List<Long> backendIds : chosenBackendIds.values()) {
- for (long backendId : backendIds) {
- long replicaId = getNextId();
- Replica replica = new Replica(replicaId, backendId, replicaState, version, tabletMeta.getOldSchemaHash());
- tablet.addReplica(replica);
- totalReplicaNum++;
- }
- }
- Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum(),
- totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum());
- }
-
- if (groupId != null && chooseBackendsArbitrary) {
- colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
- ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
- editLog.logColocateBackendsPerBucketSeq(info);
- }
+ getInternalDataSource().replayAlterExternalTableSchema(dbName, tableName, newSchema);
}
// Drop table
public void dropTable(DropTableStmt stmt) throws DdlException {
- String dbName = stmt.getDbName();
- String tableName = stmt.getTableName();
-
- // check database
- Database db = this.getDbOrDdlException(dbName);
- db.writeLockOrDdlException();
- try {
- Table table = db.getTableNullable(tableName);
- if (table == null) {
- if (stmt.isSetIfExists()) {
- LOG.info("drop table[{}] which does not exist", tableName);
- return;
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
- }
- }
- // Check if a view
- if (stmt.isView()) {
- if (!(table instanceof View)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "VIEW");
- }
- } else {
- if (table instanceof View) {
- ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "TABLE");
- }
- }
-
- if (!stmt.isForceDrop()) {
- if (Catalog.getCurrentCatalog().getGlobalTransactionMgr().existCommittedTxns(db.getId(), table.getId(), null)) {
- throw new DdlException("There are still some transactions in the COMMITTED state waiting to be completed. "
- + "The table [" + tableName + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
- + " please use \"DROP table FORCE\".");
- }
- }
- DropInfo info = new DropInfo(db.getId(), table.getId(), -1L, stmt.isForceDrop());
- table.writeLock();
- try {
- if (table instanceof OlapTable && !stmt.isForceDrop()) {
- OlapTable olapTable = (OlapTable) table;
- if ((olapTable.getState() != OlapTableState.NORMAL)) {
- throw new DdlException("The table [" + tableName + "]'s state is " + olapTable.getState() + ", cannot be dropped."
- + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered),"
- + " please use \"DROP table FORCE\".");
- }
- }
- unprotectDropTable(db, table, stmt.isForceDrop(), false);
- } finally {
- table.writeUnlock();
- }
- editLog.logDropTable(info);
- } finally {
- db.writeUnlock();
- }
- LOG.info("finished dropping table: {} from db: {}, is force: {}", tableName, dbName, stmt.isForceDrop());
+ getInternalDataSource().dropTable(stmt);
}
public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay) {
- if (table.getType() == TableType.ELASTICSEARCH) {
- esRepository.deRegisterTable(table.getId());
- } else if (table.getType() == TableType.OLAP) {
- // drop all temp partitions of this table, so that there is no temp partitions in recycle bin,
- // which make things easier.
- ((OlapTable) table).dropAllTempPartitions();
- } else if (table.getType() == TableType.ICEBERG) {
- // drop Iceberg database table creation record
- icebergTableCreationRecordMgr.deregisterTable(db, (IcebergTable) table);
- }
-
- db.dropTable(table.getName());
- if (!isForceDrop) {
- Catalog.getCurrentRecycleBin().recycleTable(db.getId(), table);
- } else {
- if (table.getType() == TableType.OLAP) {
- Catalog.getCurrentCatalog().onEraseOlapTable((OlapTable) table, isReplay);
- }
- }
-
- LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName());
- return true;
+ return getInternalDataSource().unprotectDropTable(db, table, isForceDrop, isReplay);
}
public void replayDropTable(Database db, long tableId, boolean isForceDrop) throws MetaNotFoundException {
- Table table = db.getTableOrMetaException(tableId);
- db.writeLock();
- table.writeLock();
- try {
- unprotectDropTable(db, table, isForceDrop, true);
- } finally {
- table.writeUnlock();
- db.writeUnlock();
- }
+ getInternalDataSource().replayDropTable(db, tableId, isForceDrop);
}
public void replayEraseTable(long tableId) {
- Catalog.getCurrentRecycleBin().replayEraseTable(tableId);
+ getInternalDataSource().replayEraseTable(tableId);
}
public void replayRecoverTable(RecoverInfo info) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(info.getDbId());
- db.writeLock();
- try {
- Catalog.getCurrentRecycleBin().replayRecoverTable(db, info.getTableId());
- } finally {
- db.writeUnlock();
- }
- }
-
- private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo info) {
- LOG.debug("replay add a replica {}", info);
- Partition partition = olapTable.getPartition(info.getPartitionId());
- MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
- Tablet tablet = materializedIndex.getTablet(info.getTabletId());
-
- // for compatibility
- int schemaHash = info.getSchemaHash();
- if (schemaHash == -1) {
- schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId());
- }
-
- Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(),
- schemaHash, info.getDataSize(), info.getRowCount(),
- ReplicaState.NORMAL,
- info.getLastFailedVersion(),
- info.getLastSuccessVersion());
- tablet.addReplica(replica);
- }
-
- private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info) {
- LOG.debug("replay update a replica {}", info);
- Partition partition = olapTable.getPartition(info.getPartitionId());
- MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
- Tablet tablet = materializedIndex.getTablet(info.getTabletId());
- Replica replica = tablet.getReplicaByBackendId(info.getBackendId());
- Preconditions.checkNotNull(replica, info);
- replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRowCount());
- replica.setBad(false);
+ getInternalDataSource().replayRecoverTable(info);
}
public void replayAddReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(info.getDbId());
- OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
- olapTable.writeLock();
- try {
- unprotectAddReplica(olapTable, info);
- } finally {
- olapTable.writeUnlock();
- }
+ getInternalDataSource().replayAddReplica(info);
}
public void replayUpdateReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(info.getDbId());
- OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
- olapTable.writeLock();
- try {
- unprotectUpdateReplica(olapTable, info);
- } finally {
- olapTable.writeUnlock();
- }
+ getInternalDataSource().replayUpdateReplica(info);
}
public void unprotectDeleteReplica(OlapTable olapTable, ReplicaPersistInfo info) {
- Partition partition = olapTable.getPartition(info.getPartitionId());
- MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
- Tablet tablet = materializedIndex.getTablet(info.getTabletId());
- tablet.deleteReplicaByBackendId(info.getBackendId());
+ getInternalDataSource().unprotectDeleteReplica(olapTable, info);
}
-
- public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(info.getDbId());
- OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
- olapTable.writeLock();
- try {
- unprotectDeleteReplica(olapTable, info);
- } finally {
- olapTable.writeUnlock();
- }
+
+ public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
+ getInternalDataSource().replayDeleteReplica(info);
}
public void replayAddFrontend(Frontend fe) {
@@ -4940,26 +3114,12 @@ public class Catalog {
@Nullable
public Database getDbNullable(String dbName) {
- if (fullNameToDb.containsKey(dbName)) {
- return fullNameToDb.get(dbName);
- } else {
- // This maybe a information_schema db request, and information_schema db name is case insensitive.
- // So, we first extract db name to check if it is information_schema.
- // Then we reassemble the origin cluster name with lower case db name,
- // and finally get information_schema db from the name map.
- String fullName = ClusterNamespace.getNameFromFullName(dbName);
- if (fullName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME)) {
- String clusterName = ClusterNamespace.getClusterNameFromFullName(dbName);
- fullName = ClusterNamespace.getFullName(clusterName, fullName.toLowerCase());
- return fullNameToDb.get(fullName);
- }
- }
- return null;
+ return (Database) getInternalDataSource().getDbNullable(dbName);
}
@Nullable
public Database getDbNullable(long dbId) {
- return idToDb.get(dbId);
+ return (Database) getInternalDataSource().getDbNullable(dbId);
}
public Optional<Database> getDb(String dbName) {
@@ -4988,33 +3148,33 @@ public class Catalog {
}
public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
- return getDbOrException(dbName, s -> new MetaNotFoundException("unknown databases, dbName=" + s,
- ErrorCode.ERR_BAD_DB_ERROR));
+ return getDbOrException(dbName,
+ s -> new MetaNotFoundException("unknown databases, dbName=" + s, ErrorCode.ERR_BAD_DB_ERROR));
}
public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
- return getDbOrException(dbId, s -> new MetaNotFoundException("unknown databases, dbId=" + s,
- ErrorCode.ERR_BAD_DB_ERROR));
+ return getDbOrException(dbId,
+ s -> new MetaNotFoundException("unknown databases, dbId=" + s, ErrorCode.ERR_BAD_DB_ERROR));
}
public Database getDbOrDdlException(String dbName) throws DdlException {
- return getDbOrException(dbName, s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s),
- ErrorCode.ERR_BAD_DB_ERROR));
+ return getDbOrException(dbName,
+ s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
}
public Database getDbOrDdlException(long dbId) throws DdlException {
- return getDbOrException(dbId, s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s),
- ErrorCode.ERR_BAD_DB_ERROR));
+ return getDbOrException(dbId,
+ s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
}
public Database getDbOrAnalysisException(String dbName) throws AnalysisException {
- return getDbOrException(dbName, s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s),
- ErrorCode.ERR_BAD_DB_ERROR));
+ return getDbOrException(dbName,
+ s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
}
public Database getDbOrAnalysisException(long dbId) throws AnalysisException {
- return getDbOrException(dbId, s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s),
- ErrorCode.ERR_BAD_DB_ERROR));
+ return getDbOrException(dbId,
+ s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
}
public EditLog getEditLog() {
@@ -5027,19 +3187,15 @@ public class Catalog {
}
public List<String> getDbNames() {
- return Lists.newArrayList(fullNameToDb.keySet());
+ return getInternalDataSource().getDbNames();
}
public List<String> getClusterDbNames(String clusterName) throws AnalysisException {
- final Cluster cluster = nameToCluster.get(clusterName);
- if (cluster == null) {
- throw new AnalysisException("No cluster selected");
- }
- return Lists.newArrayList(cluster.getDbNames());
+ return getInternalDataSource().getClusterDbNames(clusterName);
}
public List<Long> getDbIds() {
- return Lists.newArrayList(idToDb.keySet());
+ return getInternalDataSource().getDbIds();
}
public HashMap<Long, TStorageMedium> getPartitionIdToStorageMediumMap() {
@@ -5196,7 +3352,7 @@ public class Catalog {
}
public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() {
- return icebergTableCreationRecordMgr;
+ return getInternalDataSource().getIcebergTableCreationRecordMgr();
}
public MasterTaskExecutor getPendingLoadTaskScheduler() {
@@ -5302,7 +3458,7 @@ public class Catalog {
}
public EsRepository getEsRepository() {
- return this.esRepository;
+ return getInternalDataSource().getEsRepository();
}
public PolicyMgr getPolicyMgr() {
@@ -5997,32 +4153,6 @@ public class Catalog {
this.alter.getClusterHandler().cancel(stmt);
}
- /*
- * generate and check columns' order and key's existence
- */
- private void validateColumns(List<Column> columns) throws DdlException {
- if (columns.isEmpty()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
- }
-
- boolean encounterValue = false;
- boolean hasKey = false;
- for (Column column : columns) {
- if (column.isKey()) {
- if (encounterValue) {
- ErrorReport.reportDdlException(ErrorCode.ERR_OLAP_KEY_MUST_BEFORE_VALUE);
- }
- hasKey = true;
- } else {
- encounterValue = true;
- }
- }
-
- if (!hasKey) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_MUST_HAVE_KEYS);
- }
- }
-
// Change current database of this session.
public void changeDb(ConnectContext ctx, String qualifiedDb) throws DdlException {
if (!auth.checkDbPriv(ctx, qualifiedDb, PrivPredicate.SHOW)) {
@@ -6035,12 +4165,7 @@ public class Catalog {
// for test only
public void clear() {
- if (SingletonHolder.INSTANCE.idToDb != null) {
- SingletonHolder.INSTANCE.idToDb.clear();
- }
- if (SingletonHolder.INSTANCE.fullNameToDb != null) {
- SingletonHolder.INSTANCE.fullNameToDb.clear();
- }
+ getInternalDataSource().clearDbs();
if (load.getIdToLoadJob() != null) {
load.getIdToLoadJob().clear();
// load = null;
@@ -6122,68 +4247,7 @@ public class Catalog {
* @throws DdlException
*/
public void createCluster(CreateClusterStmt stmt) throws DdlException {
- final String clusterName = stmt.getClusterName();
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- try {
- if (nameToCluster.containsKey(clusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_HAS_EXIST, clusterName);
- } else {
- List<Long> backendList = systemInfo.createCluster(clusterName, stmt.getInstanceNum());
- // 1: BE returned is less than requested, throws DdlException.
- // 2: BE returned is more than or equal to 0, succeeds.
- if (backendList != null || stmt.getInstanceNum() == 0) {
- final long id = getNextId();
- final Cluster cluster = new Cluster(clusterName, id);
- cluster.setBackendIdList(backendList);
- unprotectCreateCluster(cluster);
- if (clusterName.equals(SystemInfoService.DEFAULT_CLUSTER)) {
- for (Database db : idToDb.values()) {
- if (db.getClusterName().equals(SystemInfoService.DEFAULT_CLUSTER)) {
- cluster.addDb(db.getFullName(), db.getId());
- }
- }
- }
- editLog.logCreateCluster(cluster);
- LOG.info("finish to create cluster: {}", clusterName);
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BE_NOT_ENOUGH);
- }
- }
- } finally {
- unlock();
- }
-
- // create super user for this cluster
- UserIdentity adminUser = new UserIdentity(PaloAuth.ADMIN_USER, "%");
- try {
- adminUser.analyze(stmt.getClusterName());
- } catch (AnalysisException e) {
- LOG.error("should not happen", e);
- }
- auth.createUser(new CreateUserStmt(new UserDesc(adminUser, "", true)));
- }
-
- private void unprotectCreateCluster(Cluster cluster) {
- for (Long id : cluster.getBackendIdList()) {
- final Backend backend = systemInfo.getBackend(id);
- backend.setOwnerClusterName(cluster.getName());
- backend.setBackendState(BackendState.using);
- }
-
- idToCluster.put(cluster.getId(), cluster);
- nameToCluster.put(cluster.getName(), cluster);
-
- // create info schema db
- final InfoSchemaDb infoDb = new InfoSchemaDb(cluster.getName());
- infoDb.setClusterName(cluster.getName());
- unprotectCreateDb(infoDb);
-
- // only need to create default cluster once.
- if (cluster.getName().equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
- isDefaultClusterCreated = true;
- }
+ getInternalDataSource().createCluster(stmt);
}
/**
@@ -6192,12 +4256,7 @@ public class Catalog {
* @param cluster
*/
public void replayCreateCluster(Cluster cluster) {
- tryLock(true);
- try {
- unprotectCreateCluster(cluster);
- } finally {
- unlock();
- }
+ getInternalDataSource().replayCreateCluster(cluster);
}
/**
@@ -6207,77 +4266,15 @@ public class Catalog {
* @throws DdlException
*/
public void dropCluster(DropClusterStmt stmt) throws DdlException {
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- try {
- final String clusterName = stmt.getClusterName();
- final Cluster cluster = nameToCluster.get(clusterName);
- if (cluster == null) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
- }
- final List<Backend> backends = systemInfo.getClusterBackends(clusterName);
- for (Backend backend : backends) {
- if (backend.isDecommissioned()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_IN_DECOMMISSION, clusterName);
- }
- }
-
- // check if there still have databases undropped, except for information_schema db
- if (cluster.getDbNames().size() > 1) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DELETE_DB_EXIST, clusterName);
- }
-
- systemInfo.releaseBackends(clusterName, false /* is not replay */);
- final ClusterInfo info = new ClusterInfo(clusterName, cluster.getId());
- unprotectDropCluster(info, false /* is not replay */);
- editLog.logDropCluster(info);
- } finally {
- unlock();
- }
-
- // drop user of this cluster
- // set is replay to true, not write log
- auth.dropUserOfCluster(stmt.getClusterName(), true /* is replay */);
- }
-
- private void unprotectDropCluster(ClusterInfo info, boolean isReplay) {
- systemInfo.releaseBackends(info.getClusterName(), isReplay);
- idToCluster.remove(info.getClusterId());
- nameToCluster.remove(info.getClusterName());
- final Database infoSchemaDb = fullNameToDb.get(InfoSchemaDb.getFullInfoSchemaDbName(info.getClusterName()));
- fullNameToDb.remove(infoSchemaDb.getFullName());
- idToDb.remove(infoSchemaDb.getId());
+ getInternalDataSource().dropCluster(stmt);
}
public void replayDropCluster(ClusterInfo info) throws DdlException {
- tryLock(true);
- try {
- unprotectDropCluster(info, true/* is replay */);
- } finally {
- unlock();
- }
-
- auth.dropUserOfCluster(info.getClusterName(), true /* is replay */);
+ getInternalDataSource().replayDropCluster(info);
}
public void replayExpandCluster(ClusterInfo info) {
- tryLock(true);
- try {
- final Cluster cluster = nameToCluster.get(info.getClusterName());
- cluster.addBackends(info.getBackendIdList());
-
- for (Long beId : info.getBackendIdList()) {
- Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
- if (be == null) {
- continue;
- }
- be.setOwnerClusterName(info.getClusterName());
- be.setBackendState(BackendState.using);
- }
- } finally {
- unlock();
- }
+ getInternalDataSource().replayExpandCluster(info);
}
/**
@@ -6287,72 +4284,7 @@ public class Catalog {
* @throws DdlException
*/
public void processModifyCluster(AlterClusterStmt stmt) throws UserException {
- final String clusterName = stmt.getAlterClusterName();
- final int newInstanceNum = stmt.getInstanceNum();
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- try {
- Cluster cluster = nameToCluster.get(clusterName);
- if (cluster == null) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
- }
-
- // check if this cluster has backend in decommission
- final List<Long> backendIdsInCluster = cluster.getBackendIdList();
- for (Long beId : backendIdsInCluster) {
- Backend be = systemInfo.getBackend(beId);
- if (be.isDecommissioned()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_IN_DECOMMISSION, clusterName);
- }
- }
-
- final int oldInstanceNum = backendIdsInCluster.size();
- if (newInstanceNum > oldInstanceNum) {
- // expansion
- final List<Long> expandBackendIds = systemInfo.calculateExpansionBackends(clusterName,
- newInstanceNum - oldInstanceNum);
- if (expandBackendIds == null) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BE_NOT_ENOUGH);
- }
- cluster.addBackends(expandBackendIds);
- final ClusterInfo info = new ClusterInfo(clusterName, cluster.getId(), expandBackendIds);
- editLog.logExpandCluster(info);
- } else if (newInstanceNum < oldInstanceNum) {
- // shrink
- final List<Long> decomBackendIds = systemInfo.calculateDecommissionBackends(clusterName,
- oldInstanceNum - newInstanceNum);
- if (decomBackendIds == null || decomBackendIds.size() == 0) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BACKEND_ERROR);
- }
-
- List<String> hostPortList = Lists.newArrayList();
- for (Long id : decomBackendIds) {
- final Backend backend = systemInfo.getBackend(id);
- hostPortList.add(new StringBuilder().append(backend.getHost()).append(":")
- .append(backend.getHeartbeatPort()).toString());
- }
-
- // here we reuse the process of decommission backends. but set backend's decommission type to
- // ClusterDecommission, which means this backend will not be removed from the system
- // after decommission is done.
- final DecommissionBackendClause clause = new DecommissionBackendClause(hostPortList);
- try {
- clause.analyze(null);
- clause.setType(DecommissionType.ClusterDecommission);
- AlterSystemStmt alterStmt = new AlterSystemStmt(clause);
- alterStmt.setClusterName(clusterName);
- this.alter.processAlterCluster(alterStmt);
- } catch (AnalysisException e) {
- Preconditions.checkState(false, "should not happened: " + e.getMessage());
- }
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_NO_CHANGE, newInstanceNum);
- }
-
- } finally {
- unlock();
- }
+ getInternalDataSource().processModifyCluster(stmt);
}
/**
@@ -6361,16 +4293,7 @@ public class Catalog {
* @throws DdlException
*/
public void changeCluster(ConnectContext ctx, String clusterName) throws DdlException {
- if (!Catalog.getCurrentCatalog().getAuth().checkCanEnterCluster(ConnectContext.get(), clusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_AUTHORITY,
- ConnectContext.get().getQualifiedUser(), "enter");
- }
-
- if (!nameToCluster.containsKey(clusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
- }
-
- ctx.setCluster(clusterName);
+ getInternalDataSource().changeCluster(ctx, clusterName);
}
/**
@@ -6380,121 +4303,15 @@ public class Catalog {
* @throws DdlException
*/
public void migrateDb(MigrateDbStmt stmt) throws DdlException {
- final String srcClusterName = stmt.getSrcCluster();
- final String destClusterName = stmt.getDestCluster();
- final String srcDbName = stmt.getSrcDb();
- final String destDbName = stmt.getDestDb();
-
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- try {
- if (!nameToCluster.containsKey(srcClusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NOT_EXIST, srcClusterName);
- }
- if (!nameToCluster.containsKey(destClusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DEST_CLUSTER_NOT_EXIST, destClusterName);
- }
-
- if (srcClusterName.equals(destClusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_SAME_CLUSTER);
- }
-
- final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
- if (!srcCluster.containDb(srcDbName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_DB_NOT_EXIST, srcDbName);
- }
- final Cluster destCluster = this.nameToCluster.get(destClusterName);
- if (!destCluster.containLink(destDbName, srcDbName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATION_NO_LINK, srcDbName, destDbName);
- }
-
- final Database db = fullNameToDb.get(srcDbName);
-
- // if the max replication num of the src db is larger then the backends num of the dest cluster,
- // the migration will not be processed.
- final int maxReplicationNum = db.getMaxReplicationNum();
- if (maxReplicationNum > destCluster.getBackendIdList().size()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_BE_NOT_ENOUGH, destClusterName);
- }
-
- if (db.getDbState() == DbState.LINK) {
- final BaseParam param = new BaseParam();
- param.addStringParam(destDbName);
- param.addLongParam(db.getId());
- param.addStringParam(srcDbName);
- param.addStringParam(destClusterName);
- param.addStringParam(srcClusterName);
- fullNameToDb.remove(db.getFullName());
- srcCluster.removeDb(db.getFullName(), db.getId());
- destCluster.removeLinkDb(param);
- destCluster.addDb(destDbName, db.getId());
- db.writeLock();
- try {
- db.setDbState(DbState.MOVE);
- // set cluster to the dest cluster.
- // and Clone process will do the migration things.
- db.setClusterName(destClusterName);
- db.setName(destDbName);
- db.setAttachDb(srcDbName);
- } finally {
- db.writeUnlock();
- }
- editLog.logMigrateCluster(param);
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATION_NO_LINK, srcDbName, destDbName);
- }
- } finally {
- unlock();
- }
+ getInternalDataSource().migrateDb(stmt);
}
public void replayMigrateDb(BaseParam param) {
- final String desDbName = param.getStringParam();
- final String srcDbName = param.getStringParam(1);
- final String desClusterName = param.getStringParam(2);
- final String srcClusterName = param.getStringParam(3);
- tryLock(true);
- try {
- final Cluster desCluster = this.nameToCluster.get(desClusterName);
- final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
- final Database db = fullNameToDb.get(srcDbName);
- if (db.getDbState() == DbState.LINK) {
- fullNameToDb.remove(db.getFullName());
- srcCluster.removeDb(db.getFullName(), db.getId());
- desCluster.removeLinkDb(param);
- desCluster.addDb(param.getStringParam(), db.getId());
-
- db.writeLock();
- db.setName(desDbName);
- db.setAttachDb(srcDbName);
- db.setDbState(DbState.MOVE);
- db.setClusterName(desClusterName);
- db.writeUnlock();
- }
- } finally {
- unlock();
- }
+ getInternalDataSource().replayMigrateDb(param);
}
public void replayLinkDb(BaseParam param) {
- final String desClusterName = param.getStringParam(2);
- final String srcDbName = param.getStringParam(1);
- final String desDbName = param.getStringParam();
-
- tryLock(true);
- try {
- final Cluster desCluster = this.nameToCluster.get(desClusterName);
- final Database srcDb = fullNameToDb.get(srcDbName);
- srcDb.writeLock();
- srcDb.setDbState(DbState.LINK);
- srcDb.setAttachDb(desDbName);
- srcDb.writeUnlock();
- desCluster.addLinkDb(param);
- fullNameToDb.put(desDbName, srcDb);
- } finally {
- unlock();
- }
+ getInternalDataSource().replayLinkDb(param);
}
/**
@@ -6504,74 +4321,15 @@ public class Catalog {
* @throws DdlException
*/
public void linkDb(LinkDbStmt stmt) throws DdlException {
- final String srcClusterName = stmt.getSrcCluster();
- final String destClusterName = stmt.getDestCluster();
- final String srcDbName = stmt.getSrcDb();
- final String destDbName = stmt.getDestDb();
-
- if (!tryLock(false)) {
- throw new DdlException("Failed to acquire catalog lock. Try again");
- }
- try {
- if (!nameToCluster.containsKey(srcClusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NOT_EXIST, srcClusterName);
- }
-
- if (!nameToCluster.containsKey(destClusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DEST_CLUSTER_NOT_EXIST, destClusterName);
- }
-
- if (srcClusterName.equals(destClusterName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_SAME_CLUSTER);
- }
-
- if (fullNameToDb.containsKey(destDbName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, destDbName);
- }
-
- final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
- final Cluster destCluster = this.nameToCluster.get(destClusterName);
-
- if (!srcCluster.containDb(srcDbName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_DB_NOT_EXIST, srcDbName);
- }
- final Database srcDb = fullNameToDb.get(srcDbName);
-
- if (srcDb.getDbState() != DbState.NORMAL) {
- ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
- ClusterNamespace.getNameFromFullName(srcDbName));
- }
-
- srcDb.writeLock();
- try {
- srcDb.setDbState(DbState.LINK);
- srcDb.setAttachDb(destDbName);
- } finally {
- srcDb.writeUnlock();
- }
-
- final long id = getNextId();
- final BaseParam param = new BaseParam();
- param.addStringParam(destDbName);
- param.addStringParam(srcDbName);
- param.addLongParam(id);
- param.addLongParam(srcDb.getId());
- param.addStringParam(destClusterName);
- param.addStringParam(srcClusterName);
- destCluster.addLinkDb(param);
- fullNameToDb.put(destDbName, srcDb);
- editLog.logLinkCluster(param);
- } finally {
- unlock();
- }
+ getInternalDataSource().linkDb(stmt);
}
public Cluster getCluster(String clusterName) {
- return nameToCluster.get(clusterName);
+ return getInternalDataSource().getCluster(clusterName);
}
public List<String> getClusterNames() {
- return new ArrayList<String>(nameToCluster.keySet());
+ return getInternalDataSource().getClusterNames();
}
/**
@@ -6580,171 +4338,23 @@ public class Catalog {
* @return
*/
public Set<BaseParam> getMigrations() {
- final Set<BaseParam> infos = Sets.newHashSet();
- for (Database db : fullNameToDb.values()) {
- db.readLock();
- try {
- if (db.getDbState() == DbState.MOVE) {
- int tabletTotal = 0;
- int tabletQuorum = 0;
- final Set<Long> beIds = Sets.newHashSet(systemInfo.getClusterBackendIds(db.getClusterName()));
- final Set<String> tableNames = db.getTableNamesWithLock();
- for (String tableName : tableNames) {
-
- Table table = db.getTableNullable(tableName);
- if (table == null || table.getType() != TableType.OLAP) {
- continue;
- }
-
- OlapTable olapTable = (OlapTable) table;
- olapTable.readLock();
- try {
- for (Partition partition : olapTable.getPartitions()) {
- ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo()
- .getReplicaAllocation(partition.getId());
- short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
- for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
- if (materializedIndex.getState() != IndexState.NORMAL) {
- continue;
- }
- for (Tablet tablet : materializedIndex.getTablets()) {
- int replicaNum = 0;
- int quorum = totalReplicaNum / 2 + 1;
- for (Replica replica : tablet.getReplicas()) {
- if (replica.getState() != ReplicaState.CLONE
- && beIds.contains(replica.getBackendId())) {
- replicaNum++;
- }
- }
- if (replicaNum > quorum) {
- replicaNum = quorum;
- }
-
- tabletQuorum = tabletQuorum + replicaNum;
- tabletTotal = tabletTotal + quorum;
- }
- }
- }
- } finally {
- olapTable.readUnlock();
- }
- }
- final BaseParam info = new BaseParam();
- info.addStringParam(db.getClusterName());
- info.addStringParam(db.getAttachDb());
- info.addStringParam(db.getFullName());
- final float percentage = tabletTotal > 0 ? (float) tabletQuorum / (float) tabletTotal : 0f;
- info.addFloatParam(percentage);
- infos.add(info);
- }
- } finally {
- db.readUnlock();
- }
- }
-
- return infos;
+ return getInternalDataSource().getMigrations();
}
public long loadCluster(DataInputStream dis, long checksum) throws IOException, DdlException {
- int clusterCount = dis.readInt();
- checksum ^= clusterCount;
- for (long i = 0; i < clusterCount; ++i) {
- final Cluster cluster = Cluster.read(dis);
- checksum ^= cluster.getId();
-
- List<Long> latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName());
- if (latestBackendIds.size() != cluster.getBackendIdList().size()) {
- LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is "
- + cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
- + cluster.getBackendIdList().size());
- }
- // The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
- // SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are
- // for adding BE to some Cluster, but loadCluster is after loadBackend.
- cluster.setBackendIdList(latestBackendIds);
-
- String dbName = InfoSchemaDb.getFullInfoSchemaDbName(cluster.getName());
- InfoSchemaDb db;
- // Use real Catalog instance to avoid InfoSchemaDb id continuously increment
- // when checkpoint thread load image.
- if (Catalog.getServingCatalog().getFullNameToDb().containsKey(dbName)) {
- db = (InfoSchemaDb) Catalog.getServingCatalog().getFullNameToDb().get(dbName);
- } else {
- db = new InfoSchemaDb(cluster.getName());
- db.setClusterName(cluster.getName());
- }
- String errMsg = "InfoSchemaDb id shouldn't larger than 10000, please restart your FE server";
- // Every time we construct the InfoSchemaDb, which id will increment.
- // When InfoSchemaDb id larger than 10000 and put it to idToDb,
- // which may be overwrite the normal db meta in idToDb,
- // so we ensure InfoSchemaDb id less than 10000.
- Preconditions.checkState(db.getId() < NEXT_ID_INIT_VALUE, errMsg);
- idToDb.put(db.getId(), db);
- fullNameToDb.put(db.getFullName(), db);
- cluster.addDb(dbName, db.getId());
- idToCluster.put(cluster.getId(), cluster);
- nameToCluster.put(cluster.getName(), cluster);
- }
- LOG.info("finished replay cluster from image");
- return checksum;
+ return getInternalDataSource().loadCluster(dis, checksum);
}
public void initDefaultCluster() {
- final List<Long> backendList = Lists.newArrayList();
- final List<Backend> defaultClusterBackends = systemInfo.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
- for (Backend backend : defaultClusterBackends) {
- backendList.add(backend.getId());
- }
-
- final long id = getNextId();
- final Cluster cluster = new Cluster(SystemInfoService.DEFAULT_CLUSTER, id);
-
- // make sure one host hold only one backend.
- Set<String> beHost = Sets.newHashSet();
- for (Backend be : defaultClusterBackends) {
- if (beHost.contains(be.getHost())) {
- // we can not handle this situation automatically.
- LOG.error("found more than one backends in same host: {}", be.getHost());
- System.exit(-1);
- } else {
- beHost.add(be.getHost());
- }
- }
-
- // we create default_cluster to meet the need for ease of use, because
- // most users have no multi tenant needs.
- cluster.setBackendIdList(backendList);
- unprotectCreateCluster(cluster);
- for (Database db : idToDb.values()) {
- db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
- cluster.addDb(db.getFullName(), db.getId());
- }
-
- // no matter default_cluster is created or not,
- // mark isDefaultClusterCreated as true
- isDefaultClusterCreated = true;
- editLog.logCreateCluster(cluster);
+ getInternalDataSource().initDefaultCluster();
}
public void replayUpdateDb(DatabaseInfo info) {
- final Database db = fullNameToDb.get(info.getDbName());
- db.setClusterName(info.getClusterName());
- db.setDbState(info.getDbState());
+ getInternalDataSource().replayUpdateDb(info);
}
public long saveCluster(CountingDataOutputStream dos, long checksum) throws IOException {
- final int clusterCount = idToCluster.size();
- checksum ^= clusterCount;
- dos.writeInt(clusterCount);
- for (Map.Entry<Long, Cluster> entry : idToCluster.entrySet()) {
- long clusterId = entry.getKey();
- if (clusterId >= NEXT_ID_INIT_VALUE) {
- checksum ^= clusterId;
- final Cluster cluster = entry.getValue();
- cluster.write(dos);
- }
- }
- return checksum;
+ return getInternalDataSource().saveCluster(dos, checksum);
}
public long saveBrokers(CountingDataOutputStream dos, long checksum) throws IOException {
@@ -6786,14 +4396,7 @@ public class Catalog {
}
public void replayUpdateClusterAndBackends(BackendIdsUpdateInfo info) {
- for (long id : info.getBackendList()) {
- final Backend backend = systemInfo.getBackend(id);
- final Cluster cluster = nameToCluster.get(backend.getOwnerClusterName());
- cluster.removeBackend(id);
- backend.setDecommissioned(false);
- backend.clearClusterName();
- backend.setBackendState(BackendState.free);
- }
+ getInternalDataSource().replayUpdateClusterAndBackends(info);
}
public String dumpImage() {
@@ -6858,200 +4461,11 @@ public class Catalog {
*
*/
public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlException {
- TableRef tblRef = truncateTableStmt.getTblRef();
- TableName dbTbl = tblRef.getName();
-
- // check, and save some info which need to be checked again later
- Map<String, Long> origPartitions = Maps.newHashMap();
- Map<Long, DistributionInfo> partitionsDistributionInfo = Maps.newHashMap();
- OlapTable copiedTbl;
-
- boolean truncateEntireTable = tblRef.getPartitionNames() == null;
-
- Database db = this.getDbOrDdlException(dbTbl.getDb());
- OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
-
- olapTable.readLock();
- try {
- if (olapTable.getState() != OlapTableState.NORMAL) {
- throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
- }
-
- if (!truncateEntireTable) {
- for (String partName : tblRef.getPartitionNames().getPartitionNames()) {
- Partition partition = olapTable.getPartition(partName);
- if (partition == null) {
- throw new DdlException("Partition " + partName + " does not exist");
- }
- origPartitions.put(partName, partition.getId());
- partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
- }
- } else {
- for (Partition partition : olapTable.getPartitions()) {
- origPartitions.put(partition.getName(), partition.getId());
- partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
- }
- }
- copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), IndexExtState.VISIBLE, false);
- } finally {
- olapTable.readUnlock();
- }
-
- // 2. use the copied table to create partitions
- List<Partition> newPartitions = Lists.newArrayList();
- // tabletIdSet to save all newly created tablet ids.
- Set<Long> tabletIdSet = Sets.newHashSet();
- try {
- for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
- // the new partition must use new id
- // If we still use the old partition id, the behavior of current load jobs on this partition
- // will be undefined.
- // By using a new id, load job will be aborted(just like partition is dropped),
- // which is the right behavior.
- long oldPartitionId = entry.getValue();
- long newPartitionId = getNextId();
- Partition newPartition = createPartitionWithIndices(db.getClusterName(),
- db.getId(), copiedTbl.getId(), copiedTbl.getBaseIndexId(),
- newPartitionId, entry.getKey(),
- copiedTbl.getIndexIdToMeta(),
- partitionsDistributionInfo.get(oldPartitionId),
- copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).getStorageMedium(),
- copiedTbl.getPartitionInfo().getReplicaAllocation(oldPartitionId),
- null /* version info */,
- copiedTbl.getCopiedBfColumns(),
- copiedTbl.getBfFpp(),
- tabletIdSet,
- copiedTbl.getCopiedIndexes(),
- copiedTbl.isInMemory(),
- copiedTbl.getStorageFormat(),
- copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
- copiedTbl.getCompressionType(),
- copiedTbl.getDataSortInfo());
- newPartitions.add(newPartition);
- }
- } catch (DdlException e) {
- // create partition failed, remove all newly created tablets
- for (Long tabletId : tabletIdSet) {
- Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
- }
- throw e;
- }
- Preconditions.checkState(origPartitions.size() == newPartitions.size());
-
- // all partitions are created successfully, try to replace the old partitions.
- // before replacing, we need to check again.
- // Things may be changed outside the table lock.
- olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId());
- olapTable.writeLockOrDdlException();
- try {
- if (olapTable.getState() != OlapTableState.NORMAL) {
- throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
- }
-
- // check partitions
- for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
- Partition partition = copiedTbl.getPartition(entry.getValue());
- if (partition == null || !partition.getName().equalsIgnoreCase(entry.getKey())) {
- throw new DdlException("Partition [" + entry.getKey() + "] is changed");
- }
- }
-
- // check if meta changed
- // rollup index may be added or dropped, and schema may be changed during creating partition operation.
- boolean metaChanged = false;
- if (olapTable.getIndexNameToId().size() != copiedTbl.getIndexNameToId().size()) {
- metaChanged = true;
- } else {
- // compare schemaHash
- Map<Long, Integer> copiedIndexIdToSchemaHash = copiedTbl.getIndexIdToSchemaHash();
- for (Map.Entry<Long, Integer> entry : olapTable.getIndexIdToSchemaHash().entrySet()) {
- long indexId = entry.getKey();
- if (!copiedIndexIdToSchemaHash.containsKey(indexId)) {
- metaChanged = true;
- break;
- }
- if (!copiedIndexIdToSchemaHash.get(indexId).equals(entry.getValue())) {
- metaChanged = true;
- break;
- }
- }
- }
-
- if (metaChanged) {
- throw new DdlException("Table[" + copiedTbl.getName() + "]'s meta has been changed. try again.");
- }
-
- // replace
- truncateTableInternal(olapTable, newPartitions, truncateEntireTable);
-
- // write edit log
- TruncateTableInfo info = new TruncateTableInfo(db.getId(), olapTable.getId(), newPartitions,
- truncateEntireTable);
- editLog.logTruncateTable(info);
- } finally {
- olapTable.writeUnlock();
- }
-
- LOG.info("finished to truncate table {}, partitions: {}",
- tblRef.getName().toSql(), tblRef.getPartitionNames());
- }
-
- private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions, boolean isEntireTable) {
- // use new partitions to replace the old ones.
- Set<Long> oldTabletIds = Sets.newHashSet();
- for (Partition newPartition : newPartitions) {
- Partition oldPartition = olapTable.replacePartition(newPartition);
- // save old tablets to be removed
- for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) {
- index.getTablets().stream().forEach(t -> {
- oldTabletIds.add(t.getId());
- });
- }
- }
-
- if (isEntireTable) {
- // drop all temp partitions
- olapTable.dropAllTempPartitions();
- }
-
- // remove the tablets in old partitions
- for (Long tabletId : oldTabletIds) {
- Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
- }
+ getInternalDataSource().truncateTable(truncateTableStmt);
}
public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException {
- Database db = this.getDbOrMetaException(info.getDbId());
- OlapTable olapTable = db.getTableOrMetaException(info.getTblId(), TableType.OLAP);
- olapTable.writeLock();
- try {
- truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable());
-
- if (!Catalog.isCheckpointThread()) {
- // add tablet to inverted index
- TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
- for (Partition partition : info.getPartitions()) {
- long partitionId = partition.getId();
- TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(
- partitionId).getStorageMedium();
- for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
- long indexId = mIndex.getId();
- int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(),
- partitionId, indexId, schemaHash, medium);
- for (Tablet tablet : mIndex.getTablets()) {
- long tabletId = tablet.getId();
- invertedIndex.addTablet(tabletId, tabletMeta);
- for (Replica replica : tablet.getReplicas()) {
- invertedIndex.addReplica(tabletId, replica);
- }
- }
- }
- }
- }
- } finally {
- olapTable.writeUnlock();
- }
+ getInternalDataSource().replayTruncateTable(info);
}
public void createFunction(CreateFunctionStmt stmt) throws UserException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 945315d391..bffcb41263 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -71,7 +71,7 @@ import javax.annotation.Nullable;
* if the table has never been loaded * if the table loading failed on the
* previous attempt
*/
-public class Database extends MetaObject implements Writable {
+public class Database extends MetaObject implements Writable, DatabaseIf {
private static final Logger LOG = LogManager.getLogger(Database.class);
private long id;
@@ -314,7 +314,7 @@ public class Database extends MetaObject implements Writable {
checkReplicaQuota();
}
- private boolean isTableExist(String tableName) {
+ public boolean isTableExist(String tableName) {
if (Catalog.isTableNamesCaseInsensitive()) {
tableName = lowerCaseToTableName.get(tableName.toLowerCase());
if (tableName == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
index 060f96bcff..a34912150c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
@@ -48,8 +48,7 @@ public interface DatabaseIf {
boolean writeLockIfExist();
- <E extends Exception>
- void writeLockOrException(E e) throws E;
+ <E extends Exception> void writeLockOrException(E e) throws E;
void writeLockOrDdlException() throws DdlException;
@@ -79,23 +78,21 @@ public interface DatabaseIf {
Optional<Table> getTable(long tableId);
- <E extends Exception>
- Table getTableOrException(String tableName, java.util.function.Function<String, E> e) throws E;
+ <E extends Exception> Table getTableOrException(String tableName, java.util.function.Function<String, E> e)
+ throws E;
- <E extends Exception>
- Table getTableOrException(long tableId, java.util.function.Function<Long, E> e) throws E;
+ <E extends Exception> Table getTableOrException(long tableId, java.util.function.Function<Long, E> e) throws E;
Table getTableOrMetaException(String tableName) throws MetaNotFoundException;
Table getTableOrMetaException(long tableId) throws MetaNotFoundException;
@SuppressWarnings("unchecked")
- <T extends Table>
- T getTableOrMetaException(String tableName, Table.TableType tableType) throws MetaNotFoundException;
+ <T extends Table> T getTableOrMetaException(String tableName, Table.TableType tableType)
+ throws MetaNotFoundException;
@SuppressWarnings("unchecked")
- <T extends Table>
- T getTableOrMetaException(long tableId, Table.TableType tableType) throws MetaNotFoundException;
+ <T extends Table> T getTableOrMetaException(long tableId, Table.TableType tableType) throws MetaNotFoundException;
Table getTableOrDdlException(String tableName) throws DdlException;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index e45f384f87..1657e03b1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1646,4 +1646,11 @@ public class Config extends ConfigBase {
@ConfField(mutable = false, masterOnly = true)
public static int backend_rpc_timeout_ms = 60000; // 1 min
+ /**
+ * Temp config for multi catalog feature.
+ * Should be removed when this feature is ready.
+ */
+ @ConfField(mutable = false, masterOnly = true)
+ public static boolean enable_multi_catalog = false; // 1 min
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
index f285b4d0a5..bbe474f0df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
@@ -79,7 +79,7 @@ public class MetaReader {
checksum = Catalog.getCurrentSystemInfo().loadBackends(dis, checksum);
checksum = catalog.loadDb(dis, checksum);
// ATTN: this should be done after load Db, and before loadAlterJob
- catalog.recreateTabletInvertIndex();
+ catalog.getInternalDataSource().recreateTabletInvertIndex();
// rebuild es state state
catalog.getEsRepository().loadTableFromCatalog();
checksum = catalog.loadLoadJob(dis, checksum);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java
index 384c6e3099..fa1ba1352b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java
@@ -17,11 +17,12 @@
package org.apache.doris.datasource;
-import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
@@ -33,32 +34,36 @@ public interface DataSourceIf {
// Type of this data source
String getType();
+ long getId();
+
// Name of this data source
String getName();
+ List<String> getDbNames();
+
@Nullable
- Database getDbNullable(String dbName);
+ DatabaseIf getDbNullable(String dbName);
@Nullable
- Database getDbNullable(long dbId);
+ DatabaseIf getDbNullable(long dbId);
- Optional<Database> getDb(String dbName);
+ Optional<DatabaseIf> getDb(String dbName);
- Optional<Database> getDb(long dbId);
+ Optional<DatabaseIf> getDb(long dbId);
- <E extends Exception> Database getDbOrException(String dbName, java.util.function.Function<String, E> e) throws E;
+ <E extends Exception> DatabaseIf getDbOrException(String dbName, java.util.function.Function<String, E> e) throws E;
- <E extends Exception> Database getDbOrException(long dbId, java.util.function.Function<Long, E> e) throws E;
+ <E extends Exception> DatabaseIf getDbOrException(long dbId, java.util.function.Function<Long, E> e) throws E;
- Database getDbOrMetaException(String dbName) throws MetaNotFoundException;
+ DatabaseIf getDbOrMetaException(String dbName) throws MetaNotFoundException;
- Database getDbOrMetaException(long dbId) throws MetaNotFoundException;
+ DatabaseIf getDbOrMetaException(long dbId) throws MetaNotFoundException;
- Database getDbOrDdlException(String dbName) throws DdlException;
+ DatabaseIf getDbOrDdlException(String dbName) throws DdlException;
- Database getDbOrDdlException(long dbId) throws DdlException;
+ DatabaseIf getDbOrDdlException(long dbId) throws DdlException;
- Database getDbOrAnalysisException(String dbName) throws AnalysisException;
+ DatabaseIf getDbOrAnalysisException(String dbName) throws AnalysisException;
- Database getDbOrAnalysisException(long dbId) throws AnalysisException;
+ DatabaseIf getDbOrAnalysisException(long dbId) throws AnalysisException;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
index b735413070..4d5c5be1c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
@@ -17,47 +17,83 @@
package org.apache.doris.datasource;
-import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Writable;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.Map;
/**
* DataSourceMgr will loaded all data sources at FE startup,
* and save them in maps mapping with id and name.
*/
-public class DataSourceMgr {
+public class DataSourceMgr implements Writable {
+ private static final Logger LOG = LogManager.getLogger(DataSourceMgr.class);
- private Map<Long, ExternalDataSource> idToDataSource = Maps.newConcurrentMap();
- private Map<String, ExternalDataSource> nameToDataSource = Maps.newConcurrentMap();
+ private Map<Long, DataSourceIf> idToDataSource = Maps.newConcurrentMap();
+ private Map<String, DataSourceIf> nameToDataSource = Maps.newConcurrentMap();
+ private DataSourceMgrProperty dsMgrProperty = new DataSourceMgrProperty();
+
+ // Use a separate instance to facilitate access.
+ // internalDataSource still exists in idToDataSource and nameToDataSource
+ private InternalDataSource internalDataSource;
public DataSourceMgr() {
- loadDataSources();
+ initInternalDataSource();
+ }
+
+ private void initInternalDataSource() {
+ internalDataSource = new InternalDataSource();
+ idToDataSource.put(internalDataSource.getId(), internalDataSource);
+ nameToDataSource.put(internalDataSource.getName(), internalDataSource);
}
- public void loadDataSources() {
- // TODO: Actually, we should initialize the data source object where user executing "create catalog" cmd,
- // not loaded from config file.
+ private void registerNewDataSource(ExternalDataSource ds) {
+ // TODO
}
- private void registerDataSource(ExternalDataSource ds) {
- ds.setId(Catalog.getCurrentCatalog().getNextId());
- idToDataSource.put(ds.getId(), ds);
- nameToDataSource.put(ds.getName(), ds);
+ public InternalDataSource getInternalDataSource() {
+ return internalDataSource;
}
- public <E extends MetaNotFoundException> ExternalDataSource getDataSourceOrException(long id, java.util.function.Function<Long, E> e) throws E {
- ExternalDataSource ds = idToDataSource.get(id);
+ /**
+ * get data source by id.
+ *
+ * @param id
+ * @param e
+ * @param <E>
+ * @return
+ * @throws E
+ */
+ public <E extends MetaNotFoundException> DataSourceIf getDataSourceOrException(long id,
+ java.util.function.Function<Long, E> e) throws E {
+ DataSourceIf ds = idToDataSource.get(id);
if (ds == null) {
throw e.apply(id);
}
return ds;
}
- public <E extends MetaNotFoundException> ExternalDataSource getDataSourceOrException(String name, java.util.function.Function<String, E> e) throws E {
- ExternalDataSource ds = nameToDataSource.get(name);
+ /**
+ * get data source by name.
+ *
+ * @param name
+ * @param e
+ * @param <E>
+ * @return
+ * @throws E
+ */
+ public <E extends MetaNotFoundException> DataSourceIf getDataSourceOrException(String name,
+ java.util.function.Function<String, E> e) throws E {
+ DataSourceIf ds = nameToDataSource.get(name);
if (ds == null) {
throw e.apply(name);
}
@@ -67,4 +103,52 @@ public class DataSourceMgr {
public boolean hasDataSource(String name) {
return nameToDataSource.containsKey(name);
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (Config.disable_cluster_feature) {
+ return;
+ }
+ Preconditions.checkState(false, "Do not call this until multi catalog feature is ready");
+ int size = idToDataSource.size();
+ if (idToDataSource.get(InternalDataSource.INTERNAL_DS_ID) != null) {
+ // No need to persis internal data source
+ size -= 1;
+ }
+ out.writeInt(size);
+ for (DataSourceIf ds : idToDataSource.values()) {
+ if (ds.getId() == InternalDataSource.INTERNAL_DS_ID) {
+ continue;
+ }
+ ExternalDataSource extDs = (ExternalDataSource) ds;
+ extDs.write(out);
+ }
+ dsMgrProperty.write(out);
+ }
+
+ /**
+ * read from image.
+ *
+ * @param in
+ * @return
+ * @throws IOException
+ */
+ public static DataSourceMgr read(DataInput in) throws IOException {
+ if (Config.disable_cluster_feature) {
+ return null;
+ }
+ DataSourceMgr mgr = new DataSourceMgr();
+ mgr.readFields(in);
+ return mgr;
+ }
+
+ private void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ for (int i = 0; i < size; ++i) {
+ ExternalDataSource extDs = ExternalDataSource.read(in);
+ idToDataSource.put(extDs.getId(), extDs);
+ nameToDataSource.put(extDs.getName(), extDs);
+ }
+ dsMgrProperty = DataSourceMgrProperty.read(in);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgrProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgrProperty.java
new file mode 100644
index 0000000000..bcba2d8a35
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgrProperty.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Save the properties of a data source.
+ */
+public class DataSourceMgrProperty implements Writable {
+ @SerializedName(value = "properties")
+ private Map<String, String> properties = Maps.newHashMap();
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static DataSourceMgrProperty read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, DataSourceMgrProperty.class);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
new file mode 100644
index 0000000000..43bc87fb7d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+public class DataSourceProperty implements Writable {
+ @SerializedName(value = "properties")
+ private Map<String, String> properties = Maps.newHashMap();
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static DataSourceProperty read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, DataSourceProperty.class);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
index 983e4dd6fd..37e16708e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
@@ -27,16 +27,6 @@ import java.util.List;
*/
public class EsExternalDataSource extends ExternalDataSource {
- @Override
- public String getType() {
- return "es";
- }
-
- @Override
- public String getName() {
- return "es";
- }
-
@Override
public List<String> listDatabaseNames(SessionContext ctx) {
return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
index f75409f30b..a4c6c6390f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
@@ -18,54 +18,39 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
import org.apache.doris.external.ExternalScanRange;
+import org.apache.doris.persist.gson.GsonUtils;
-import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
import org.jetbrains.annotations.Nullable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
/**
* The abstract class for all types of external data sources.
*/
-public abstract class ExternalDataSource implements DataSourceIf {
-
+public abstract class ExternalDataSource implements DataSourceIf, Writable {
// Unique id of this data source, will be assigned after data source is loaded.
+ @SerializedName(value = "id")
private long id;
-
+ @SerializedName(value = "name")
+ private String name;
+ @SerializedName(value = "type")
+ private String type;
// save properties of this data source, such as hive meta store url.
- private Map<String, String> properties = Maps.newHashMap();
-
- private Map<Long, ExternalDatabase> idToDbs = Maps.newConcurrentMap();
- private Map<String, ExternalDatabase> nameToDbs = Maps.newConcurrentMap();
-
- public void setId(long id) {
- this.id = id;
- }
-
- public long getId() {
- return id;
- }
-
- public String getProperty(String key) {
- return properties.get(key);
- }
-
- public String getPropertyOrException(String key) throws DataSourceException {
- if (properties.containsKey(key)) {
- return properties.get(key);
- }
- throw new DataSourceException("Not found property " + key + " in data source " + getName());
- }
-
+ @SerializedName(value = "dsProperty")
+ private DataSourceProperty dsProperty = new DataSourceProperty();
/**
* @return names of database in this data source.
@@ -101,75 +86,96 @@ public abstract class ExternalDataSource implements DataSourceIf {
*/
public abstract List<ExternalScanRange> getExternalScanRanges(SessionContext ctx);
+
@Override
- public String getType() {
- return null;
+ public long getId() {
+ return id;
}
@Override
public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public List<String> getDbNames() {
return null;
}
@Nullable
@Override
- public Database getDbNullable(String dbName) {
+ public DatabaseIf getDbNullable(String dbName) {
return null;
}
@Nullable
@Override
- public Database getDbNullable(long dbId) {
+ public DatabaseIf getDbNullable(long dbId) {
return null;
}
@Override
- public Optional<Database> getDb(String dbName) {
+ public Optional<DatabaseIf> getDb(String dbName) {
return Optional.empty();
}
@Override
- public Optional<Database> getDb(long dbId) {
+ public Optional<DatabaseIf> getDb(long dbId) {
return Optional.empty();
}
@Override
- public <E extends Exception> Database getDbOrException(String dbName, Function<String, E> e) throws E {
+ public <E extends Exception> DatabaseIf getDbOrException(String dbName, Function<String, E> e) throws E {
return null;
}
@Override
- public <E extends Exception> Database getDbOrException(long dbId, Function<Long, E> e) throws E {
+ public <E extends Exception> DatabaseIf getDbOrException(long dbId, Function<Long, E> e) throws E {
return null;
}
@Override
- public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
+ public DatabaseIf getDbOrMetaException(String dbName) throws MetaNotFoundException {
return null;
}
@Override
- public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
+ public DatabaseIf getDbOrMetaException(long dbId) throws MetaNotFoundException {
return null;
}
@Override
- public Database getDbOrDdlException(String dbName) throws DdlException {
+ public DatabaseIf getDbOrDdlException(String dbName) throws DdlException {
return null;
}
@Override
- public Database getDbOrDdlException(long dbId) throws DdlException {
+ public DatabaseIf getDbOrDdlException(long dbId) throws DdlException {
return null;
}
@Override
- public Database getDbOrAnalysisException(String dbName) throws AnalysisException {
+ public DatabaseIf getDbOrAnalysisException(String dbName) throws AnalysisException {
return null;
}
@Override
- public Database getDbOrAnalysisException(long dbId) throws AnalysisException {
+ public DatabaseIf getDbOrAnalysisException(long dbId) throws AnalysisException {
return null;
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static ExternalDataSource read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, ExternalDataSource.class);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
index 50aebbacef..8b2ada2946 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
@@ -30,21 +30,9 @@ import java.util.List;
*/
public class HMSExternalDataSource extends ExternalDataSource {
private static final Logger LOG = LogManager.getLogger(HMSExternalDataSource.class);
-
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public HMSExternalDataSource() {
-
- }
-
- @Override
- public String getType() {
- return "hms";
- }
-
- @Override
- public String getName() {
- return "hms";
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index 17831cacd3..f421c59e9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -17,22 +17,209 @@
package org.apache.doris.datasource;
+import org.apache.doris.alter.DecommissionType;
+import org.apache.doris.analysis.AddPartitionClause;
+import org.apache.doris.analysis.AddRollupClause;
+import org.apache.doris.analysis.AlterClause;
+import org.apache.doris.analysis.AlterClusterStmt;
+import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
+import org.apache.doris.analysis.AlterDatabaseQuotaStmt.QuotaType;
+import org.apache.doris.analysis.AlterDatabaseRename;
+import org.apache.doris.analysis.AlterSystemStmt;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.ColumnDef.DefaultValue;
+import org.apache.doris.analysis.CreateClusterStmt;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableAsSelectStmt;
+import org.apache.doris.analysis.CreateTableLikeStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.CreateUserStmt;
+import org.apache.doris.analysis.DataSortInfo;
+import org.apache.doris.analysis.DecommissionBackendClause;
+import org.apache.doris.analysis.DistributionDesc;
+import org.apache.doris.analysis.DropClusterStmt;
+import org.apache.doris.analysis.DropDbStmt;
+import org.apache.doris.analysis.DropPartitionClause;
+import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.HashDistributionDesc;
+import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.LinkDbStmt;
+import org.apache.doris.analysis.MigrateDbStmt;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.RecoverDbStmt;
+import org.apache.doris.analysis.RecoverPartitionStmt;
+import org.apache.doris.analysis.RecoverTableStmt;
+import org.apache.doris.analysis.SinglePartitionDesc;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.analysis.TruncateTableStmt;
+import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.analysis.UserDesc;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.BrokerTable;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.ColocateGroupSchema;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Database.DbState;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.DatabaseProperty;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
+import org.apache.doris.catalog.EsTable;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.HiveTable;
+import org.apache.doris.catalog.IcebergTable;
+import org.apache.doris.catalog.Index;
+import org.apache.doris.catalog.InfoSchemaDb;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.MaterializedIndex.IndexState;
+import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.MysqlTable;
+import org.apache.doris.catalog.OdbcTable;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.OlapTable.OlapTableState;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.RangePartitionItem;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.SinglePartitionInfo;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Table.TableType;
+import org.apache.doris.catalog.TableIndexes;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.View;
+import org.apache.doris.clone.DynamicPartitionScheduler;
+import org.apache.doris.cluster.BaseParam;
+import org.apache.doris.cluster.Cluster;
+import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.CountingDataOutputStream;
+import org.apache.doris.common.util.DynamicPartitionUtil;
+import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.common.util.QueryableReentrantLock;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.external.elasticsearch.EsRepository;
+import org.apache.doris.external.hudi.HudiProperty;
+import org.apache.doris.external.hudi.HudiTable;
+import org.apache.doris.external.hudi.HudiUtils;
+import org.apache.doris.external.iceberg.IcebergCatalogMgr;
+import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.persist.BackendIdsUpdateInfo;
+import org.apache.doris.persist.ClusterInfo;
+import org.apache.doris.persist.ColocatePersistInfo;
+import org.apache.doris.persist.DatabaseInfo;
+import org.apache.doris.persist.DropDbInfo;
+import org.apache.doris.persist.DropInfo;
+import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
+import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.PartitionPersistInfo;
+import org.apache.doris.persist.RecoverInfo;
+import org.apache.doris.persist.ReplicaPersistInfo;
+import org.apache.doris.persist.TruncateTableInfo;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.Backend.BackendState;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.CreateReplicaTask;
+import org.apache.doris.thrift.TCompressionType;
+import org.apache.doris.thrift.TStorageFormat;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.thrift.TStorageType;
+import org.apache.doris.thrift.TTabletType;
+import org.apache.doris.thrift.TTaskType;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.Getter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* The Internal data source will manage all self-managed meta object in a Doris cluster.
* Such as Database, tables, etc.
- * There is only one internal data source in a cluster.
+ * There is only one internal data source in a cluster. And its id is always 0.
*/
public class InternalDataSource implements DataSourceIf {
+ public static final String INTERNAL_DS_NAME = "__internal";
+ public static final long INTERNAL_DS_ID = 0L;
+
+ private static final Logger LOG = LogManager.getLogger(InternalDataSource.class);
+
+ private QueryableReentrantLock lock = new QueryableReentrantLock(true);
+ private ConcurrentHashMap<Long, Database> idToDb = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Database> fullNameToDb = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<Long, Cluster> idToCluster = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Cluster> nameToCluster = new ConcurrentHashMap<>();
+
+ @Getter
+ private EsRepository esRepository = new EsRepository();
+ @Getter
+ private IcebergTableCreationRecordMgr icebergTableCreationRecordMgr = new IcebergTableCreationRecordMgr();
+
+ @Override
+ public long getId() {
+ return 0;
+ }
+
@Override
public String getType() {
return "internal";
@@ -40,68 +227,2988 @@ public class InternalDataSource implements DataSourceIf {
@Override
public String getName() {
- return "_internal_";
+ return INTERNAL_DS_NAME;
+ }
+
+
+ @Override
+ public List<String> getDbNames() {
+ return Lists.newArrayList(fullNameToDb.keySet());
}
@Nullable
@Override
- public Database getDbNullable(String dbName) {
+ public DatabaseIf getDbNullable(String dbName) {
+ if (fullNameToDb.containsKey(dbName)) {
+ return fullNameToDb.get(dbName);
+ } else {
+ // This maybe a information_schema db request, and information_schema db name is case insensitive.
+ // So, we first extract db name to check if it is information_schema.
+ // Then we reassemble the origin cluster name with lower case db name,
+ // and finally get information_schema db from the name map.
+ String fullName = ClusterNamespace.getNameFromFullName(dbName);
+ if (fullName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME)) {
+ String clusterName = ClusterNamespace.getClusterNameFromFullName(dbName);
+ fullName = ClusterNamespace.getFullName(clusterName, fullName.toLowerCase());
+ return fullNameToDb.get(fullName);
+ }
+ }
return null;
}
@Nullable
@Override
- public Database getDbNullable(long dbId) {
- return null;
+ public DatabaseIf getDbNullable(long dbId) {
+ return idToDb.get(dbId);
}
@Override
- public Optional<Database> getDb(String dbName) {
- return Optional.empty();
+ public Optional<DatabaseIf> getDb(String dbName) {
+ return Optional.ofNullable(getDbNullable(dbName));
}
@Override
- public Optional<Database> getDb(long dbId) {
- return Optional.empty();
+ public Optional<DatabaseIf> getDb(long dbId) {
+ return Optional.ofNullable(getDbNullable(dbId));
}
@Override
- public <E extends Exception> Database getDbOrException(String dbName, Function<String, E> e) throws E {
- return null;
+ public <E extends Exception> DatabaseIf getDbOrException(String dbName, Function<String, E> e) throws E {
+ DatabaseIf db = getDbNullable(dbName);
+ if (db == null) {
+ throw e.apply(dbName);
+ }
+ return db;
}
@Override
- public <E extends Exception> Database getDbOrException(long dbId, Function<Long, E> e) throws E {
- return null;
+ public <E extends Exception> DatabaseIf getDbOrException(long dbId, Function<Long, E> e) throws E {
+ DatabaseIf db = getDbNullable(dbId);
+ if (db == null) {
+ throw e.apply(dbId);
+ }
+ return db;
}
@Override
- public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
- return null;
+ public DatabaseIf getDbOrMetaException(String dbName) throws MetaNotFoundException {
+ return getDbOrException(dbName,
+ s -> new MetaNotFoundException("unknown databases, dbName=" + s, ErrorCode.ERR_BAD_DB_ERROR));
}
@Override
- public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
- return null;
+ public DatabaseIf getDbOrMetaException(long dbId) throws MetaNotFoundException {
+ return getDbOrException(dbId,
+ s -> new MetaNotFoundException("unknown databases, dbId=" + s, ErrorCode.ERR_BAD_DB_ERROR));
}
@Override
- public Database getDbOrDdlException(String dbName) throws DdlException {
- return null;
+ public DatabaseIf getDbOrDdlException(String dbName) throws DdlException {
+ return getDbOrException(dbName,
+ s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
}
@Override
- public Database getDbOrDdlException(long dbId) throws DdlException {
- return null;
+ public DatabaseIf getDbOrDdlException(long dbId) throws DdlException {
+ return getDbOrException(dbId,
+ s -> new DdlException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
}
@Override
- public Database getDbOrAnalysisException(String dbName) throws AnalysisException {
- return null;
+ public DatabaseIf getDbOrAnalysisException(String dbName) throws AnalysisException {
+ return getDbOrException(dbName,
+ s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
}
@Override
- public Database getDbOrAnalysisException(long dbId) throws AnalysisException {
- return null;
+ public DatabaseIf getDbOrAnalysisException(long dbId) throws AnalysisException {
+ return getDbOrException(dbId,
+ s -> new AnalysisException(ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(s), ErrorCode.ERR_BAD_DB_ERROR));
+ }
+
+ // Use tryLock to avoid potential dead lock
+ private boolean tryLock(boolean mustLock) {
+ while (true) {
+ try {
+ if (!lock.tryLock(Config.catalog_try_lock_timeout_ms, TimeUnit.MILLISECONDS)) {
+ if (LOG.isDebugEnabled()) {
+ // to see which thread held this lock for long time.
+ Thread owner = lock.getOwner();
+ if (owner != null) {
+ LOG.debug("catalog lock is held by: {}", Util.dumpThread(owner, 10));
+ }
+ }
+
+ if (mustLock) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } catch (InterruptedException e) {
+ LOG.warn("got exception while getting catalog lock", e);
+ if (mustLock) {
+ continue;
+ } else {
+ return lock.isHeldByCurrentThread();
+ }
+ }
+ }
+ }
+
+ public List<Long> getDbIds() {
+ return Lists.newArrayList(idToDb.keySet());
+ }
+
+ private void unlock() {
+ if (lock.isHeldByCurrentThread()) {
+ this.lock.unlock();
+ }
+ }
+
+ /**
+ * create the tablet inverted index from metadata.
+ */
+ public void recreateTabletInvertIndex() {
+ if (Catalog.isCheckpointThread()) {
+ return;
+ }
+
+ // create inverted index
+ TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+ for (Database db : this.fullNameToDb.values()) {
+ long dbId = db.getId();
+ for (Table table : db.getTables()) {
+ if (table.getType() != TableType.OLAP) {
+ continue;
+ }
+
+ OlapTable olapTable = (OlapTable) table;
+ long tableId = olapTable.getId();
+ Collection<Partition> allPartitions = olapTable.getAllPartitions();
+ for (Partition partition : allPartitions) {
+ long partitionId = partition.getId();
+ TStorageMedium medium =
+ olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
+ for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
+ long indexId = index.getId();
+ int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
+ for (Tablet tablet : index.getTablets()) {
+ long tabletId = tablet.getId();
+ invertedIndex.addTablet(tabletId, tabletMeta);
+ for (Replica replica : tablet.getReplicas()) {
+ invertedIndex.addReplica(tabletId, replica);
+ }
+ }
+ } // end for indices
+ } // end for partitions
+ } // end for tables
+ } // end for dbs
+ }
+
+ /**
+ * Entry of creating a database.
+ *
+ * @param stmt
+ * @throws DdlException
+ */
+ public void createDb(CreateDbStmt stmt) throws DdlException {
+ final String clusterName = stmt.getClusterName();
+ String fullDbName = stmt.getFullDbName();
+ Map<String, String> properties = stmt.getProperties();
+
+ long id = Catalog.getCurrentCatalog().getNextId();
+ Database db = new Database(id, fullDbName);
+ db.setClusterName(clusterName);
+ // check and analyze database properties before create database
+ db.setDbProperties(new DatabaseProperty(properties).checkAndBuildProperties());
+
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ try {
+ if (!nameToCluster.containsKey(clusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_SELECT_CLUSTER, clusterName);
+ }
+ if (fullNameToDb.containsKey(fullDbName)) {
+ if (stmt.isSetIfNotExists()) {
+ LOG.info("create database[{}] which already exists", fullDbName);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, fullDbName);
+ }
+ } else {
+ unprotectCreateDb(db);
+ Catalog.getCurrentCatalog().getEditLog().logCreateDb(db);
+ }
+ } finally {
+ unlock();
+ }
+ LOG.info("createDb dbName = " + fullDbName + ", id = " + id);
+
+ // create tables in iceberg database
+ if (db.getDbProperties().getIcebergProperty().isExist()) {
+ icebergTableCreationRecordMgr.registerDb(db);
+ }
+ }
+
+ /**
+ * For replaying creating database.
+ *
+ * @param db
+ */
+ public void unprotectCreateDb(Database db) {
+ idToDb.put(db.getId(), db);
+ fullNameToDb.put(db.getFullName(), db);
+ final Cluster cluster = nameToCluster.get(db.getClusterName());
+ cluster.addDb(db.getFullName(), db.getId());
+ Catalog.getCurrentGlobalTransactionMgr().addDatabaseTransactionMgr(db.getId());
+ }
+
+ // for test
+ public void addCluster(Cluster cluster) {
+ nameToCluster.put(cluster.getName(), cluster);
+ idToCluster.put(cluster.getId(), cluster);
+ }
+
+ /**
+ * replayCreateDb.
+ *
+ * @param db
+ */
+ public void replayCreateDb(Database db) {
+ tryLock(true);
+ try {
+ unprotectCreateDb(db);
+ } finally {
+ unlock();
+ }
+ }
+
+ public void dropDb(DropDbStmt stmt) throws DdlException {
+ String dbName = stmt.getDbName();
+
+ // 1. check if database exists
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ try {
+ if (!fullNameToDb.containsKey(dbName)) {
+ if (stmt.isSetIfExists()) {
+ LOG.info("drop database[{}] which does not exist", dbName);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
+ }
+ }
+
+ // 2. drop tables in db
+ Database db = this.fullNameToDb.get(dbName);
+ db.writeLock();
+ try {
+ if (!stmt.isForceDrop()) {
+ if (Catalog.getCurrentCatalog().getGlobalTransactionMgr()
+ .existCommittedTxns(db.getId(), null, null)) {
+ throw new DdlException(
+ "There are still some transactions in the COMMITTED state waiting to be completed. "
+ + "The database [" + dbName
+ + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
+ + " please use \"DROP database FORCE\".");
+ }
+ }
+ if (db.getDbState() == DbState.LINK && dbName.equals(db.getAttachDb())) {
+ // We try to drop a hard link.
+ final DropLinkDbAndUpdateDbInfo info = new DropLinkDbAndUpdateDbInfo();
+ fullNameToDb.remove(db.getAttachDb());
+ db.setDbState(DbState.NORMAL);
+ info.setUpdateDbState(DbState.NORMAL);
+ final Cluster cluster =
+ nameToCluster.get(ClusterNamespace.getClusterNameFromFullName(db.getAttachDb()));
+ final BaseParam param = new BaseParam();
+ param.addStringParam(db.getAttachDb());
+ param.addLongParam(db.getId());
+ cluster.removeLinkDb(param);
+ info.setDropDbCluster(cluster.getName());
+ info.setDropDbId(db.getId());
+ info.setDropDbName(db.getAttachDb());
+ Catalog.getCurrentCatalog().getEditLog().logDropLinkDb(info);
+ return;
+ }
+
+ if (db.getDbState() == DbState.LINK && dbName.equals(db.getFullName())) {
+ // We try to drop a db which other dbs attach to it,
+ // which is not allowed.
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
+ ClusterNamespace.getNameFromFullName(dbName));
+ return;
+ }
+
+ if (dbName.equals(db.getAttachDb()) && db.getDbState() == DbState.MOVE) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
+ ClusterNamespace.getNameFromFullName(dbName));
+ return;
+ }
+
+ // save table names for recycling
+ Set<String> tableNames = db.getTableNamesWithLock();
+ List<Table> tableList = db.getTablesOnIdOrder();
+ MetaLockUtils.writeLockTables(tableList);
+ try {
+ if (!stmt.isForceDrop()) {
+ for (Table table : tableList) {
+ if (table.getType() == TableType.OLAP) {
+ OlapTable olapTable = (OlapTable) table;
+ if (olapTable.getState() != OlapTableState.NORMAL) {
+ throw new DdlException("The table [" + olapTable.getState() + "]'s state is "
+ + olapTable.getState() + ", cannot be dropped."
+ + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered),"
+ + " please use \"DROP table FORCE\".");
+ }
+ }
+ }
+ }
+ unprotectDropDb(db, stmt.isForceDrop(), false);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+
+ if (!stmt.isForceDrop()) {
+ Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
+ } else {
+ Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
+ }
+ } finally {
+ db.writeUnlock();
+ }
+
+ // 3. remove db from catalog
+ idToDb.remove(db.getId());
+ fullNameToDb.remove(db.getFullName());
+ final Cluster cluster = nameToCluster.get(db.getClusterName());
+ cluster.removeDb(dbName, db.getId());
+ DropDbInfo info = new DropDbInfo(dbName, stmt.isForceDrop());
+ Catalog.getCurrentCatalog().getEditLog().logDropDb(info);
+ } finally {
+ unlock();
+ }
+
+ LOG.info("finish drop database[{}], is force : {}", dbName, stmt.isForceDrop());
+ }
+
+ public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay) {
+ // drop Iceberg database table creation records
+ if (db.getDbProperties().getIcebergProperty().isExist()) {
+ icebergTableCreationRecordMgr.deregisterDb(db);
+ }
+ for (Table table : db.getTables()) {
+ unprotectDropTable(db, table, isForeDrop, isReplay);
+ }
+ db.markDropped();
+ }
+
+ public void replayDropLinkDb(DropLinkDbAndUpdateDbInfo info) {
+ tryLock(true);
+ try {
+ final Database db = this.fullNameToDb.remove(info.getDropDbName());
+ db.setDbState(info.getUpdateDbState());
+ final Cluster cluster = nameToCluster.get(info.getDropDbCluster());
+ final BaseParam param = new BaseParam();
+ param.addStringParam(db.getAttachDb());
+ param.addLongParam(db.getId());
+ cluster.removeLinkDb(param);
+ } finally {
+ unlock();
+ }
+ }
+
+ public void replayDropDb(String dbName, boolean isForceDrop) throws DdlException {
+ tryLock(true);
+ try {
+ Database db = fullNameToDb.get(dbName);
+ db.writeLock();
+ try {
+ Set<String> tableNames = db.getTableNamesWithLock();
+ List<Table> tableList = db.getTablesOnIdOrder();
+ MetaLockUtils.writeLockTables(tableList);
+ try {
+ unprotectDropDb(db, isForceDrop, true);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+ if (!isForceDrop) {
+ Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
+ } else {
+ Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
+ }
+ } finally {
+ db.writeUnlock();
+ }
+
+ fullNameToDb.remove(dbName);
+ idToDb.remove(db.getId());
+ final Cluster cluster = nameToCluster.get(db.getClusterName());
+ cluster.removeDb(dbName, db.getId());
+ } finally {
+ unlock();
+ }
+ }
+
+ public void recoverDatabase(RecoverDbStmt recoverStmt) throws DdlException {
+ // check is new db with same name already exist
+ if (getDb(recoverStmt.getDbName()).isPresent()) {
+ throw new DdlException("Database[" + recoverStmt.getDbName() + "] already exist.");
+ }
+
+ Database db = Catalog.getCurrentRecycleBin().recoverDatabase(recoverStmt.getDbName());
+
+ // add db to catalog
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ db.writeLock();
+ List<Table> tableList = db.getTablesOnIdOrder();
+ MetaLockUtils.writeLockTables(tableList);
+ try {
+ if (fullNameToDb.containsKey(db.getFullName())) {
+ throw new DdlException("Database[" + db.getFullName() + "] already exist.");
+ // it's ok that we do not put db back to CatalogRecycleBin
+ // cause this db cannot recover any more
+ }
+
+ fullNameToDb.put(db.getFullName(), db);
+ idToDb.put(db.getId(), db);
+ final Cluster cluster = nameToCluster.get(db.getClusterName());
+ cluster.addDb(db.getFullName(), db.getId());
+
+ // log
+ RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L);
+ Catalog.getCurrentCatalog().getEditLog().logRecoverDb(recoverInfo);
+ db.unmarkDropped();
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ db.writeUnlock();
+ unlock();
+ }
+
+ LOG.info("recover database[{}]", db.getId());
+ }
+
+ public void recoverTable(RecoverTableStmt recoverStmt) throws DdlException {
+ String dbName = recoverStmt.getDbName();
+ String tableName = recoverStmt.getTableName();
+
+ Database db = (Database) getDbOrDdlException(dbName);
+ db.writeLockOrDdlException();
+ try {
+ if (db.getTable(tableName).isPresent()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ if (!Catalog.getCurrentRecycleBin().recoverTable(db, tableName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
+ }
+ } finally {
+ db.writeUnlock();
+ }
+ }
+
+ public void recoverPartition(RecoverPartitionStmt recoverStmt) throws DdlException {
+ String dbName = recoverStmt.getDbName();
+ String tableName = recoverStmt.getTableName();
+
+ Database db = (Database) getDbOrDdlException(dbName);
+ OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
+ olapTable.writeLockOrDdlException();
+ try {
+ String partitionName = recoverStmt.getPartitionName();
+ if (olapTable.getPartition(partitionName) != null) {
+ throw new DdlException("partition[" + partitionName + "] already exist in table[" + tableName + "]");
+ }
+
+ Catalog.getCurrentRecycleBin().recoverPartition(db.getId(), olapTable, partitionName);
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ public void replayEraseDatabase(long dbId) throws DdlException {
+ Catalog.getCurrentRecycleBin().replayEraseDatabase(dbId);
+ }
+
+ public void replayRecoverDatabase(RecoverInfo info) {
+ long dbId = info.getDbId();
+ Database db = Catalog.getCurrentRecycleBin().replayRecoverDatabase(dbId);
+
+ // add db to catalog
+ replayCreateDb(db);
+
+ LOG.info("replay recover db[{}]", dbId);
+ }
+
+ public void alterDatabaseQuota(AlterDatabaseQuotaStmt stmt) throws DdlException {
+ String dbName = stmt.getDbName();
+ Database db = (Database) getDbOrDdlException(dbName);
+ QuotaType quotaType = stmt.getQuotaType();
+ db.writeLockOrDdlException();
+ try {
+ if (quotaType == QuotaType.DATA) {
+ db.setDataQuota(stmt.getQuota());
+ } else if (quotaType == QuotaType.REPLICA) {
+ db.setReplicaQuota(stmt.getQuota());
+ }
+ long quota = stmt.getQuota();
+ DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType);
+ Catalog.getCurrentCatalog().getEditLog().logAlterDb(dbInfo);
+ } finally {
+ db.writeUnlock();
+ }
+ }
+
+ public void replayAlterDatabaseQuota(String dbName, long quota, QuotaType quotaType) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(dbName);
+ db.writeLock();
+ try {
+ if (quotaType == QuotaType.DATA) {
+ db.setDataQuota(quota);
+ } else if (quotaType == QuotaType.REPLICA) {
+ db.setReplicaQuota(quota);
+ }
+ } finally {
+ db.writeUnlock();
+ }
+ }
+
+ public void renameDatabase(AlterDatabaseRename stmt) throws DdlException {
+ String fullDbName = stmt.getDbName();
+ String newFullDbName = stmt.getNewDbName();
+ String clusterName = stmt.getClusterName();
+
+ if (fullDbName.equals(newFullDbName)) {
+ throw new DdlException("Same database name");
+ }
+
+ Database db = null;
+ Cluster cluster = null;
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ try {
+ cluster = nameToCluster.get(clusterName);
+ if (cluster == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
+ }
+ // check if db exists
+ db = fullNameToDb.get(fullDbName);
+ if (db == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, fullDbName);
+ }
+
+ if (db.getDbState() == DbState.LINK || db.getDbState() == DbState.MOVE) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_RENAME_DB_ERR, fullDbName);
+ }
+ // check if name is already used
+ if (fullNameToDb.get(newFullDbName) != null) {
+ throw new DdlException("Database name[" + newFullDbName + "] is already used");
+ }
+
+ cluster.removeDb(db.getFullName(), db.getId());
+ cluster.addDb(newFullDbName, db.getId());
+ // 1. rename db
+ db.setNameWithLock(newFullDbName);
+
+ // 2. add to meta. check again
+ fullNameToDb.remove(fullDbName);
+ fullNameToDb.put(newFullDbName, db);
+
+ DatabaseInfo dbInfo = new DatabaseInfo(fullDbName, newFullDbName, -1L, QuotaType.NONE);
+ Catalog.getCurrentCatalog().getEditLog().logDatabaseRename(dbInfo);
+ } finally {
+ unlock();
+ }
+
+ LOG.info("rename database[{}] to [{}]", fullDbName, newFullDbName);
+ }
+
+ public void replayRenameDatabase(String dbName, String newDbName) {
+ tryLock(true);
+ try {
+ Database db = fullNameToDb.get(dbName);
+ Cluster cluster = nameToCluster.get(db.getClusterName());
+ cluster.removeDb(db.getFullName(), db.getId());
+ db.setName(newDbName);
+ cluster.addDb(newDbName, db.getId());
+ fullNameToDb.remove(dbName);
+ fullNameToDb.put(newDbName, db);
+ } finally {
+ unlock();
+ }
+
+ LOG.info("replay rename database {} to {}", dbName, newDbName);
+ }
+
+ // Drop table
+ public void dropTable(DropTableStmt stmt) throws DdlException {
+ String dbName = stmt.getDbName();
+ String tableName = stmt.getTableName();
+
+ // check database
+ Database db = (Database) getDbOrDdlException(dbName);
+ db.writeLockOrDdlException();
+ try {
+ Table table = db.getTableNullable(tableName);
+ if (table == null) {
+ if (stmt.isSetIfExists()) {
+ LOG.info("drop table[{}] which does not exist", tableName);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
+ }
+ }
+ // Check if a view
+ if (stmt.isView()) {
+ if (!(table instanceof View)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "VIEW");
+ }
+ } else {
+ if (table instanceof View) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "TABLE");
+ }
+ }
+
+ if (!stmt.isForceDrop()) {
+ if (Catalog.getCurrentCatalog().getGlobalTransactionMgr()
+ .existCommittedTxns(db.getId(), table.getId(), null)) {
+ throw new DdlException(
+ "There are still some transactions in the COMMITTED state waiting to be completed. "
+ + "The table [" + tableName
+ + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
+ + " please use \"DROP table FORCE\".");
+ }
+ }
+ DropInfo info = new DropInfo(db.getId(), table.getId(), -1L, stmt.isForceDrop());
+ table.writeLock();
+ try {
+ if (table instanceof OlapTable && !stmt.isForceDrop()) {
+ OlapTable olapTable = (OlapTable) table;
+ if ((olapTable.getState() != OlapTableState.NORMAL)) {
+ throw new DdlException("The table [" + tableName + "]'s state is " + olapTable.getState()
+ + ", cannot be dropped."
+ + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered),"
+ + " please use \"DROP table FORCE\".");
+ }
+ }
+ unprotectDropTable(db, table, stmt.isForceDrop(), false);
+ } finally {
+ table.writeUnlock();
+ }
+ Catalog.getCurrentCatalog().getEditLog().logDropTable(info);
+ } finally {
+ db.writeUnlock();
+ }
+ LOG.info("finished dropping table: {} from db: {}, is force: {}", tableName, dbName, stmt.isForceDrop());
+ }
+
+ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay) {
+ if (table.getType() == TableType.ELASTICSEARCH) {
+ esRepository.deRegisterTable(table.getId());
+ } else if (table.getType() == TableType.OLAP) {
+ // drop all temp partitions of this table, so that there is no temp partitions in recycle bin,
+ // which make things easier.
+ ((OlapTable) table).dropAllTempPartitions();
+ } else if (table.getType() == TableType.ICEBERG) {
+ // drop Iceberg database table creation record
+ icebergTableCreationRecordMgr.deregisterTable(db, (IcebergTable) table);
+ }
+
+ db.dropTable(table.getName());
+ if (!isForceDrop) {
+ Catalog.getCurrentRecycleBin().recycleTable(db.getId(), table);
+ } else {
+ if (table.getType() == TableType.OLAP) {
+ Catalog.getCurrentCatalog().onEraseOlapTable((OlapTable) table, isReplay);
+ }
+ }
+
+ LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName());
+ return true;
+ }
+
+ public void replayDropTable(Database db, long tableId, boolean isForceDrop) throws MetaNotFoundException {
+ Table table = db.getTableOrMetaException(tableId);
+ db.writeLock();
+ table.writeLock();
+ try {
+ unprotectDropTable(db, table, isForceDrop, true);
+ } finally {
+ table.writeUnlock();
+ db.writeUnlock();
+ }
+ }
+
+ public void replayEraseTable(long tableId) {
+ Catalog.getCurrentRecycleBin().replayEraseTable(tableId);
+ }
+
+ public void replayRecoverTable(RecoverInfo info) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(info.getDbId());
+ db.writeLock();
+ try {
+ Catalog.getCurrentRecycleBin().replayRecoverTable(db, info.getTableId());
+ } finally {
+ db.writeUnlock();
+ }
+ }
+
+ private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo info) {
+ LOG.debug("replay add a replica {}", info);
+ Partition partition = olapTable.getPartition(info.getPartitionId());
+ MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
+ Tablet tablet = materializedIndex.getTablet(info.getTabletId());
+
+ // for compatibility
+ int schemaHash = info.getSchemaHash();
+ if (schemaHash == -1) {
+ schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId());
+ }
+
+ Replica replica =
+ new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash, info.getDataSize(),
+ info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
+ info.getLastSuccessVersion());
+ tablet.addReplica(replica);
+ }
+
+ private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info) {
+ LOG.debug("replay update a replica {}", info);
+ Partition partition = olapTable.getPartition(info.getPartitionId());
+ MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
+ Tablet tablet = materializedIndex.getTablet(info.getTabletId());
+ Replica replica = tablet.getReplicaByBackendId(info.getBackendId());
+ Preconditions.checkNotNull(replica, info);
+ replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRowCount());
+ replica.setBad(false);
+ }
+
+ public void replayAddReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(info.getDbId());
+ OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+ olapTable.writeLock();
+ try {
+ unprotectAddReplica(olapTable, info);
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ public void replayUpdateReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(info.getDbId());
+ OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+ olapTable.writeLock();
+ try {
+ unprotectUpdateReplica(olapTable, info);
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ public void unprotectDeleteReplica(OlapTable olapTable, ReplicaPersistInfo info) {
+ Partition partition = olapTable.getPartition(info.getPartitionId());
+ MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
+ Tablet tablet = materializedIndex.getTablet(info.getTabletId());
+ tablet.deleteReplicaByBackendId(info.getBackendId());
+ }
+
+ public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(info.getDbId());
+ OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+ olapTable.writeLock();
+ try {
+ unprotectDeleteReplica(olapTable, info);
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ /**
+ * Following is the step to create an olap table:
+ * 1. create columns
+ * 2. create partition info
+ * 3. create distribution info
+ * 4. set table id and base index id
+ * 5. set bloom filter columns
+ * 6. set and build TableProperty includes:
+ * 6.1. dynamicProperty
+ * 6.2. replicationNum
+ * 6.3. inMemory
+ * 6.4. storageFormat
+ * 6.5. compressionType
+ * 7. set index meta
+ * 8. check colocation properties
+ * 9. create tablet in BE
+ * 10. add this table to FE's meta
+ * 11. add this table to ColocateGroup if necessary
+ */
+ public void createTable(CreateTableStmt stmt) throws UserException {
+ String engineName = stmt.getEngineName();
+ String dbName = stmt.getDbName();
+ String tableName = stmt.getTableName();
+
+ // check if db exists
+ Database db = (Database) getDbOrDdlException(dbName);
+
+ // only internal table should check quota and cluster capacity
+ if (!stmt.isExternal()) {
+ // check cluster capacity
+ Catalog.getCurrentSystemInfo().checkClusterCapacity(stmt.getClusterName());
+ // check db quota
+ db.checkQuota();
+ }
+
+ // check if table exists in db
+ if (db.getTable(tableName).isPresent()) {
+ if (stmt.isSetIfNotExists()) {
+ LOG.info("create table[{}] which already exists", tableName);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ }
+
+ if (engineName.equals("olap")) {
+ createOlapTable(db, stmt);
+ return;
+ } else if (engineName.equals("odbc")) {
+ createOdbcTable(db, stmt);
+ return;
+ } else if (engineName.equals("mysql")) {
+ createMysqlTable(db, stmt);
+ return;
+ } else if (engineName.equals("broker")) {
+ createBrokerTable(db, stmt);
+ return;
+ } else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
+ createEsTable(db, stmt);
+ return;
+ } else if (engineName.equalsIgnoreCase("hive")) {
+ createHiveTable(db, stmt);
+ return;
+ } else if (engineName.equalsIgnoreCase("iceberg")) {
+ IcebergCatalogMgr.createIcebergTable(db, stmt);
+ return;
+ } else if (engineName.equalsIgnoreCase("hudi")) {
+ createHudiTable(db, stmt);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
+ }
+ Preconditions.checkState(false);
+ }
+
+ public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
+ try {
+ Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getExistedDbName());
+ Table table = db.getTableOrDdlException(stmt.getExistedTableName());
+
+ if (table.getType() == TableType.VIEW) {
+ throw new DdlException("Not support create table from a View");
+ }
+
+ List<String> createTableStmt = Lists.newArrayList();
+ table.readLock();
+ try {
+ if (table.getType() == TableType.OLAP) {
+ if (!CollectionUtils.isEmpty(stmt.getRollupNames())) {
+ OlapTable olapTable = (OlapTable) table;
+ for (String rollupIndexName : stmt.getRollupNames()) {
+ if (!olapTable.hasMaterializedIndex(rollupIndexName)) {
+ throw new DdlException("Rollup index[" + rollupIndexName + "] not exists in Table["
+ + olapTable.getName() + "]");
+ }
+ }
+ }
+ } else if (!CollectionUtils.isEmpty(stmt.getRollupNames()) || stmt.isWithAllRollup()) {
+ throw new DdlException("Table[" + table.getName() + "] is external, not support rollup copy");
+ }
+
+ Catalog.getDdlStmt(stmt, stmt.getDbName(), table, createTableStmt, null, null, false, false, true);
+ if (createTableStmt.isEmpty()) {
+ ErrorReport.reportDdlException(ErrorCode.ERROR_CREATE_TABLE_LIKE_EMPTY, "CREATE");
+ }
+ } finally {
+ table.readUnlock();
+ }
+ CreateTableStmt parsedCreateTableStmt =
+ (CreateTableStmt) SqlParserUtils.parseAndAnalyzeStmt(createTableStmt.get(0), ConnectContext.get());
+ parsedCreateTableStmt.setTableName(stmt.getTableName());
+ parsedCreateTableStmt.setIfNotExists(stmt.isIfNotExists());
+ createTable(parsedCreateTableStmt);
+ } catch (UserException e) {
+ throw new DdlException("Failed to execute CREATE TABLE LIKE " + stmt.getExistedTableName() + ". Reason: "
+ + e.getMessage());
+ }
+ }
+
+ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlException {
+ try {
+ List<String> columnNames = stmt.getColumnNames();
+ CreateTableStmt createTableStmt = stmt.getCreateTableStmt();
+ QueryStmt queryStmt = stmt.getQueryStmt();
+ ArrayList<Expr> resultExprs = queryStmt.getResultExprs();
+ ArrayList<String> colLabels = queryStmt.getColLabels();
+ int size = resultExprs.size();
+ // Check columnNames
+ int colNameIndex = 0;
+ for (int i = 0; i < size; ++i) {
+ String name;
+ if (columnNames != null) {
+ // use custom column names
+ name = columnNames.get(i);
+ } else {
+ name = colLabels.get(i);
+ }
+ try {
+ FeNameFormat.checkColumnName(name);
+ } catch (AnalysisException exception) {
+ name = "_col" + (colNameIndex++);
+ }
+ TypeDef typeDef;
+ Expr resultExpr = resultExprs.get(i);
+ if (resultExpr.getType().isStringType() && resultExpr.getType().getLength() < 0) {
+ typeDef = new TypeDef(Type.STRING);
+ } else {
+ typeDef = new TypeDef(resultExpr.getType());
+ }
+ ColumnDef columnDef;
+ if (resultExpr.getSrcSlotRef() == null) {
+ columnDef = new ColumnDef(name, typeDef, false, null, true, new DefaultValue(false, null), "");
+ } else {
+ Column column = resultExpr.getSrcSlotRef().getDesc().getColumn();
+ boolean setDefault = StringUtils.isNotBlank(column.getDefaultValue());
+ columnDef = new ColumnDef(name, typeDef, column.isKey(), column.getAggregationType(),
+ column.isAllowNull(), new DefaultValue(setDefault, column.getDefaultValue()),
+ column.getComment());
+ }
+ createTableStmt.addColumnDef(columnDef);
+ // set first column as default distribution
+ if (createTableStmt.getDistributionDesc() == null && i == 0) {
+ createTableStmt.setDistributionDesc(new HashDistributionDesc(10, Lists.newArrayList(name)));
+ }
+ }
+ Analyzer dummyRootAnalyzer = new Analyzer(Catalog.getCurrentCatalog(), ConnectContext.get());
+ createTableStmt.analyze(dummyRootAnalyzer);
+ createTable(createTableStmt);
+ } catch (UserException e) {
+ throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
+ }
+ }
+
+ public void replayCreateTable(String dbName, Table table) throws MetaNotFoundException {
+ Database db = this.fullNameToDb.get(dbName);
+ try {
+ db.createTableWithLock(table, true, false);
+ } catch (DdlException e) {
+ throw new MetaNotFoundException(e.getMessage());
+ }
+ if (!Catalog.isCheckpointThread()) {
+ // add to inverted index
+ if (table.getType() == TableType.OLAP) {
+ TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+ OlapTable olapTable = (OlapTable) table;
+ long dbId = db.getId();
+ long tableId = table.getId();
+ for (Partition partition : olapTable.getAllPartitions()) {
+ long partitionId = partition.getId();
+ TStorageMedium medium =
+ olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
+ for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
+ long indexId = mIndex.getId();
+ int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
+ for (Tablet tablet : mIndex.getTablets()) {
+ long tabletId = tablet.getId();
+ invertedIndex.addTablet(tabletId, tabletMeta);
+ for (Replica replica : tablet.getReplicas()) {
+ invertedIndex.addReplica(tabletId, replica);
+ }
+ }
+ }
+ } // end for partitions
+ DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable, true);
+ }
+ }
+ }
+
+ public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
+ SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
+ DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
+ boolean isTempPartition = addPartitionClause.isTempPartition();
+
+ DistributionInfo distributionInfo;
+ Map<Long, MaterializedIndexMeta> indexIdToMeta;
+ Set<String> bfColumns;
+ String partitionName = singlePartitionDesc.getPartitionName();
+
+ // check
+ Table table = db.getOlapTableOrDdlException(tableName);
+ // check state
+ OlapTable olapTable = (OlapTable) table;
+
+ olapTable.readLock();
+ try {
+ if (olapTable.getState() != OlapTableState.NORMAL) {
+ throw new DdlException("Table[" + tableName + "]'s state is not NORMAL");
+ }
+
+ // check partition type
+ PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
+ throw new DdlException("Only support adding partition to range and list partitioned table");
+ }
+
+ // check partition name
+ if (olapTable.checkPartitionNameExist(partitionName)) {
+ if (singlePartitionDesc.isSetIfNotExists()) {
+ LOG.info("add partition[{}] which already exists", partitionName);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
+ }
+ }
+
+ Map<String, String> properties = singlePartitionDesc.getProperties();
+ // partition properties should inherit table properties
+ ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation();
+ if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM) && !properties.containsKey(
+ PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
+ properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt());
+ }
+ if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
+ properties.put(PropertyAnalyzer.PROPERTIES_INMEMORY, olapTable.isInMemory().toString());
+ }
+
+ singlePartitionDesc.analyze(partitionInfo.getPartitionColumns().size(), properties);
+ partitionInfo.createAndCheckPartitionItem(singlePartitionDesc, isTempPartition);
+
+ // get distributionInfo
+ List<Column> baseSchema = olapTable.getBaseSchema();
+ DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
+ if (distributionDesc != null) {
+ distributionInfo = distributionDesc.toDistributionInfo(baseSchema);
+ // for now. we only support modify distribution's bucket num
+ if (distributionInfo.getType() != defaultDistributionInfo.getType()) {
+ throw new DdlException("Cannot assign different distribution type. default is: "
+ + defaultDistributionInfo.getType());
+ }
+
+ if (distributionInfo.getType() == DistributionInfoType.HASH) {
+ HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
+ List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
+ List<Column> defaultDistriCols =
+ ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
+ if (!newDistriCols.equals(defaultDistriCols)) {
+ throw new DdlException(
+ "Cannot assign hash distribution with different distribution cols. " + "default is: "
+ + defaultDistriCols);
+ }
+ if (hashDistributionInfo.getBucketNum() <= 0) {
+ throw new DdlException("Cannot assign hash distribution buckets less than 1");
+ }
+ }
+ } else {
+ // make sure partition-dristribution-info is deep copied from default-distribution-info
+ distributionInfo = defaultDistributionInfo.toDistributionDesc().toDistributionInfo(baseSchema);
+ }
+
+ // check colocation
+ if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
+ String fullGroupName = db.getId() + "_" + olapTable.getColocateGroup();
+ ColocateGroupSchema groupSchema = Catalog.getCurrentColocateIndex().getGroupSchema(fullGroupName);
+ Preconditions.checkNotNull(groupSchema);
+ groupSchema.checkDistribution(distributionInfo);
+ groupSchema.checkReplicaAllocation(singlePartitionDesc.getReplicaAlloc());
+ }
+
+ indexIdToMeta = olapTable.getCopiedIndexIdToMeta();
+ bfColumns = olapTable.getCopiedBfColumns();
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ } finally {
+ olapTable.readUnlock();
+ }
+
+ Preconditions.checkNotNull(distributionInfo);
+ Preconditions.checkNotNull(olapTable);
+ Preconditions.checkNotNull(indexIdToMeta);
+
+ // create partition outside db lock
+ DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty();
+ Preconditions.checkNotNull(dataProperty);
+ // check replica quota if this operation done
+ long indexNum = indexIdToMeta.size();
+ long bucketNum = distributionInfo.getBucketNum();
+ long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum();
+ long totalReplicaNum = indexNum * bucketNum * replicaNum;
+ if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
+ throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing "
+ + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
+ }
+ Set<Long> tabletIdSet = new HashSet<Long>();
+ try {
+ long partitionId = Catalog.getCurrentCatalog().getNextId();
+ Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
+ olapTable.getBaseIndexId(), partitionId, partitionName, indexIdToMeta, distributionInfo,
+ dataProperty.getStorageMedium(), singlePartitionDesc.getReplicaAlloc(),
+ singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet,
+ olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(),
+ singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo());
+
+ // check again
+ table = db.getOlapTableOrDdlException(tableName);
+ table.writeLockOrDdlException();
+ try {
+ olapTable = (OlapTable) table;
+ if (olapTable.getState() != OlapTableState.NORMAL) {
+ throw new DdlException("Table[" + tableName + "]'s state is not NORMAL");
+ }
+
+ // check partition name
+ if (olapTable.checkPartitionNameExist(partitionName)) {
+ if (singlePartitionDesc.isSetIfNotExists()) {
+ LOG.info("add partition[{}] which already exists", partitionName);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
+ }
+ }
+
+ // check if meta changed
+ // rollup index may be added or dropped during add partition operation.
+ // schema may be changed during add partition operation.
+ boolean metaChanged = false;
+ if (olapTable.getIndexNameToId().size() != indexIdToMeta.size()) {
+ metaChanged = true;
+ } else {
+ // compare schemaHash
+ for (Map.Entry<Long, MaterializedIndexMeta> entry : olapTable.getIndexIdToMeta().entrySet()) {
+ long indexId = entry.getKey();
+ if (!indexIdToMeta.containsKey(indexId)) {
+ metaChanged = true;
+ break;
+ }
+ if (indexIdToMeta.get(indexId).getSchemaHash() != entry.getValue().getSchemaHash()) {
+ metaChanged = true;
+ break;
+ }
+ }
+ }
+
+ if (metaChanged) {
+ throw new DdlException("Table[" + tableName + "]'s meta has been changed. try again.");
+ }
+
+ // check partition type
+ PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
+ throw new DdlException("Only support adding partition to range and list partitioned table");
+ }
+
+ // update partition info
+ partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId, isTempPartition);
+
+ if (isTempPartition) {
+ olapTable.addTempPartition(partition);
+ } else {
+ olapTable.addPartition(partition);
+ }
+
+ // log
+ PartitionPersistInfo info = null;
+ if (partitionInfo.getType() == PartitionType.RANGE) {
+ info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
+ partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty,
+ partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId),
+ isTempPartition);
+ } else if (partitionInfo.getType() == PartitionType.LIST) {
+ info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition,
+ RangePartitionItem.DUMMY_ITEM, partitionInfo.getItem(partitionId), dataProperty,
+ partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId),
+ isTempPartition);
+ }
+ Catalog.getCurrentCatalog().getEditLog().logAddPartition(info);
+
+ LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition);
+ } finally {
+ table.writeUnlock();
+ }
+ } catch (DdlException e) {
+ for (Long tabletId : tabletIdSet) {
+ Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+ }
+ throw e;
+ }
+ }
+
+ public void replayAddPartition(PartitionPersistInfo info) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(info.getDbId());
+ OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+ olapTable.writeLock();
+ try {
+ Partition partition = info.getPartition();
+ PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ if (info.isTempPartition()) {
+ olapTable.addTempPartition(partition);
+ } else {
+ olapTable.addPartition(partition);
+ }
+
+ PartitionItem partitionItem = null;
+ if (partitionInfo.getType() == PartitionType.RANGE) {
+ partitionItem = new RangePartitionItem(info.getRange());
+ } else if (partitionInfo.getType() == PartitionType.LIST) {
+ partitionItem = info.getListPartitionItem();
+ }
+
+ partitionInfo.unprotectHandleNewSinglePartitionDesc(partition.getId(), info.isTempPartition(),
+ partitionItem, info.getDataProperty(), info.getReplicaAlloc(), info.isInMemory());
+
+ if (!Catalog.isCheckpointThread()) {
+ // add to inverted index
+ TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+ for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
+ long indexId = index.getId();
+ int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+ TabletMeta tabletMeta =
+ new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(), index.getId(),
+ schemaHash, info.getDataProperty().getStorageMedium());
+ for (Tablet tablet : index.getTablets()) {
+ long tabletId = tablet.getId();
+ invertedIndex.addTablet(tabletId, tabletMeta);
+ for (Replica replica : tablet.getReplicas()) {
+ invertedIndex.addReplica(tabletId, replica);
+ }
+ }
+ }
+ }
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause clause) throws DdlException {
+ Preconditions.checkArgument(olapTable.isWriteLockHeldByCurrentThread());
+
+ String partitionName = clause.getPartitionName();
+ boolean isTempPartition = clause.isTempPartition();
+
+ if (olapTable.getState() != OlapTableState.NORMAL) {
+ throw new DdlException("Table[" + olapTable.getName() + "]'s state is not NORMAL");
+ }
+
+ if (!olapTable.checkPartitionNameExist(partitionName, isTempPartition)) {
+ if (clause.isSetIfExists()) {
+ LOG.info("drop partition[{}] which does not exist", partitionName);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_DROP_PARTITION_NON_EXISTENT, partitionName);
+ }
+ }
+
+ PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
+ throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table");
+ }
+
+ // drop
+ if (isTempPartition) {
+ olapTable.dropTempPartition(partitionName, true);
+ } else {
+ if (!clause.isForceDrop()) {
+ Partition partition = olapTable.getPartition(partitionName);
+ if (partition != null) {
+ if (Catalog.getCurrentCatalog().getGlobalTransactionMgr()
+ .existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) {
+ throw new DdlException(
+ "There are still some transactions in the COMMITTED state waiting to be completed."
+ + " The partition [" + partitionName
+ + "] cannot be dropped. If you want to forcibly drop(cannot be recovered),"
+ + " please use \"DROP partition FORCE\".");
+ }
+ }
+ }
+ olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop());
+ }
+
+ // log
+ DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition,
+ clause.isForceDrop());
+ Catalog.getCurrentCatalog().getEditLog().logDropPartition(info);
+
+ LOG.info("succeed in dropping partition[{}], is temp : {}, is force : {}", partitionName, isTempPartition,
+ clause.isForceDrop());
+ }
+
+ public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(info.getDbId());
+ OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+ olapTable.writeLock();
+ try {
+ if (info.isTempPartition()) {
+ olapTable.dropTempPartition(info.getPartitionName(), true);
+ } else {
+ olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop());
+ }
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ public void replayErasePartition(long partitionId) {
+ Catalog.getCurrentRecycleBin().replayErasePartition(partitionId);
+ }
+
+ public void replayRecoverPartition(RecoverInfo info) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(info.getDbId());
+ OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+ olapTable.writeLock();
+ try {
+ Catalog.getCurrentRecycleBin().replayRecoverPartition(olapTable, info.getPartitionId());
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ private Partition createPartitionWithIndices(String clusterName, long dbId, long tableId, long baseIndexId,
+ long partitionId, String partitionName, Map<Long, MaterializedIndexMeta> indexIdToMeta,
+ DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
+ Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
+ boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
+ DataSortInfo dataSortInfo) throws DdlException {
+ // create base index first.
+ Preconditions.checkArgument(baseIndexId != -1);
+ MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
+
+ // create partition with base index
+ Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);
+
+ // add to index map
+ Map<Long, MaterializedIndex> indexMap = new HashMap<>();
+ indexMap.put(baseIndexId, baseIndex);
+
+ // create rollup index if has
+ for (long indexId : indexIdToMeta.keySet()) {
+ if (indexId == baseIndexId) {
+ continue;
+ }
+
+ MaterializedIndex rollup = new MaterializedIndex(indexId, IndexState.NORMAL);
+ indexMap.put(indexId, rollup);
+ }
+
+ // version and version hash
+ if (versionInfo != null) {
+ partition.updateVisibleVersion(versionInfo);
+ }
+ long version = partition.getVisibleVersion();
+
+ short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+ for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
+ long indexId = entry.getKey();
+ MaterializedIndex index = entry.getValue();
+ MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+
+ // create tablets
+ int schemaHash = indexMeta.getSchemaHash();
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
+ createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, tabletMeta,
+ tabletIdSet);
+
+ boolean ok = false;
+ String errMsg = null;
+
+ // add create replica task for olap
+ short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
+ TStorageType storageType = indexMeta.getStorageType();
+ List<Column> schema = indexMeta.getSchema();
+ KeysType keysType = indexMeta.getKeysType();
+ int totalTaskNum = index.getTablets().size() * totalReplicaNum;
+ MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (Tablet tablet : index.getTablets()) {
+ long tabletId = tablet.getId();
+ for (Replica replica : tablet.getReplicas()) {
+ long backendId = replica.getBackendId();
+ countDownLatch.addMark(backendId, tabletId);
+ CreateReplicaTask task =
+ new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId,
+ shortKeyColumnCount, schemaHash, version, keysType, storageType, storageMedium,
+ schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType,
+ dataSortInfo, compressionType);
+ task.setStorageFormat(storageFormat);
+ batchTask.addTask(task);
+ // add to AgentTaskQueue for handling finish report.
+ // not for resending task
+ AgentTaskQueue.addTask(task);
+ }
+ }
+ AgentTaskExecutor.submit(batchTask);
+
+ // estimate timeout
+ long timeout = Config.tablet_create_timeout_second * 1000L * totalTaskNum;
+ timeout = Math.min(timeout, Config.max_create_table_timeout_second * 1000);
+ try {
+ ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("InterruptedException: ", e);
+ ok = false;
+ }
+
+ if (!ok || !countDownLatch.getStatus().ok()) {
+ errMsg = "Failed to create partition[" + partitionName + "]. Timeout.";
+ // clear tasks
+ AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
+
+ if (!countDownLatch.getStatus().ok()) {
+ errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg();
+ } else {
+ List<Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks();
+ // only show at most 3 results
+ List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 3));
+ if (!subList.isEmpty()) {
+ errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList);
+ }
+ }
+ LOG.warn(errMsg);
+ throw new DdlException(errMsg);
+ }
+
+ if (index.getId() != baseIndexId) {
+ // add rollup index to partition
+ partition.createRollupIndex(index);
+ }
+ } // end for indexMap
+ return partition;
+ }
+
+ // Create olap table and related base index synchronously.
+ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
+ String tableName = stmt.getTableName();
+ LOG.debug("begin create olap table: {}", tableName);
+
+ // create columns
+ List<Column> baseSchema = stmt.getColumns();
+ validateColumns(baseSchema);
+
+ // create partition info
+ PartitionDesc partitionDesc = stmt.getPartitionDesc();
+ PartitionInfo partitionInfo = null;
+ Map<String, Long> partitionNameToId = Maps.newHashMap();
+ if (partitionDesc != null) {
+ // gen partition id first
+ PartitionDesc partDesc = partitionDesc;
+ for (SinglePartitionDesc desc : partDesc.getSinglePartitionDescs()) {
+ long partitionId = Catalog.getCurrentCatalog().getNextId();
+ partitionNameToId.put(desc.getPartitionName(), partitionId);
+ }
+ partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
+ } else {
+ if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) {
+ throw new DdlException("Only support dynamic partition properties on range partition table");
+ }
+ long partitionId = Catalog.getCurrentCatalog().getNextId();
+ // use table name as single partition name
+ partitionNameToId.put(tableName, partitionId);
+ partitionInfo = new SinglePartitionInfo();
+ }
+
+ // get keys type
+ KeysDesc keysDesc = stmt.getKeysDesc();
+ Preconditions.checkNotNull(keysDesc);
+ KeysType keysType = keysDesc.getKeysType();
+
+ // create distribution info
+ DistributionDesc distributionDesc = stmt.getDistributionDesc();
+ Preconditions.checkNotNull(distributionDesc);
+ DistributionInfo defaultDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
+
+ // calc short key column count
+ short shortKeyColumnCount = Catalog.calcShortKeyColumnCount(baseSchema, stmt.getProperties());
+ LOG.debug("create table[{}] short key column count: {}", tableName, shortKeyColumnCount);
+
+ // indexes
+ TableIndexes indexes = new TableIndexes(stmt.getIndexes());
+
+ // create table
+ long tableId = Catalog.getCurrentCatalog().getNextId();
+ OlapTable olapTable =
+ new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo, defaultDistributionInfo,
+ indexes);
+ olapTable.setComment(stmt.getComment());
+
+ // set base index id
+ long baseIndexId = Catalog.getCurrentCatalog().getNextId();
+ olapTable.setBaseIndexId(baseIndexId);
+
+ // set base index info to table
+ // this should be done before create partition.
+ Map<String, String> properties = stmt.getProperties();
+
+ // get storage format
+ TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2
+ try {
+ storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ olapTable.setStorageFormat(storageFormat);
+
+ // get compression type
+ TCompressionType compressionType = TCompressionType.LZ4;
+ try {
+ compressionType = PropertyAnalyzer.analyzeCompressionType(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ olapTable.setCompressionType(compressionType);
+
+ // check data sort properties
+ DataSortInfo dataSortInfo =
+ PropertyAnalyzer.analyzeDataSortInfo(properties, keysType, keysDesc.keysColumnSize(), storageFormat);
+ olapTable.setDataSortInfo(dataSortInfo);
+
+ // analyze bloom filter columns
+ Set<String> bfColumns = null;
+ double bfFpp = 0;
+ try {
+ bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(properties, baseSchema, keysType);
+ if (bfColumns != null && bfColumns.isEmpty()) {
+ bfColumns = null;
+ }
+
+ bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(properties);
+ if (bfColumns != null && bfFpp == 0) {
+ bfFpp = FeConstants.default_bloom_filter_fpp;
+ } else if (bfColumns == null) {
+ bfFpp = 0;
+ }
+
+ olapTable.setBloomFilterInfo(bfColumns, bfFpp);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
+ // analyze replica allocation
+ ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
+ if (replicaAlloc.isNotSet()) {
+ replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
+ }
+ olapTable.setReplicationAllocation(replicaAlloc);
+
+ // set in memory
+ boolean isInMemory =
+ PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false);
+ olapTable.setIsInMemory(isInMemory);
+
+ // set remote storage
+ String resourceName = PropertyAnalyzer.analyzeRemoteStorageResource(properties);
+ olapTable.setRemoteStorageResource(resourceName);
+
+ TTabletType tabletType;
+ try {
+ tabletType = PropertyAnalyzer.analyzeTabletType(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
+
+ if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
+ // if this is an unpartitioned table, we should analyze data property and replication num here.
+ // if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase.
+
+ // use table name as this single partition name
+ long partitionId = partitionNameToId.get(tableName);
+ DataProperty dataProperty = null;
+ try {
+ dataProperty =
+ PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_DATA_PROPERTY);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ Preconditions.checkNotNull(dataProperty);
+ partitionInfo.setDataProperty(partitionId, dataProperty);
+ partitionInfo.setReplicaAllocation(partitionId, replicaAlloc);
+ partitionInfo.setIsInMemory(partitionId, isInMemory);
+ partitionInfo.setTabletType(partitionId, tabletType);
+ }
+
+ // check colocation properties
+ try {
+ String colocateGroup = PropertyAnalyzer.analyzeColocate(properties);
+ if (colocateGroup != null) {
+ if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
+ throw new AnalysisException("Random distribution for colocate table is unsupported");
+ }
+ String fullGroupName = db.getId() + "_" + colocateGroup;
+ ColocateGroupSchema groupSchema = Catalog.getCurrentColocateIndex().getGroupSchema(fullGroupName);
+ if (groupSchema != null) {
+ // group already exist, check if this table can be added to this group
+ groupSchema.checkColocateSchema(olapTable);
+ }
+ // add table to this group, if group does not exist, create a new one
+ Catalog.getCurrentColocateIndex()
+ .addTableToGroup(db.getId(), olapTable, colocateGroup, null /* generate group id inside */);
+ olapTable.setColocateGroup(colocateGroup);
+ }
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
+ // get base index storage type. default is COLUMN
+ TStorageType baseIndexStorageType = null;
+ try {
+ baseIndexStorageType = PropertyAnalyzer.analyzeStorageType(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ Preconditions.checkNotNull(baseIndexStorageType);
+ // set base index meta
+ int schemaVersion = 0;
+ try {
+ schemaVersion = PropertyAnalyzer.analyzeSchemaVersion(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ int schemaHash = Util.generateSchemaHash();
+ olapTable.setIndexMeta(baseIndexId, tableName, baseSchema, schemaVersion, schemaHash, shortKeyColumnCount,
+ baseIndexStorageType, keysType);
+
+ for (AlterClause alterClause : stmt.getRollupAlterClauseList()) {
+ AddRollupClause addRollupClause = (AddRollupClause) alterClause;
+
+ Long baseRollupIndex = olapTable.getIndexIdByName(tableName);
+
+ // get storage type for rollup index
+ TStorageType rollupIndexStorageType = null;
+ try {
+ rollupIndexStorageType = PropertyAnalyzer.analyzeStorageType(addRollupClause.getProperties());
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ Preconditions.checkNotNull(rollupIndexStorageType);
+ // set rollup index meta to olap table
+ List<Column> rollupColumns = Catalog.getCurrentCatalog().getMaterializedViewHandler()
+ .checkAndPrepareMaterializedView(addRollupClause, olapTable, baseRollupIndex, false);
+ short rollupShortKeyColumnCount =
+ Catalog.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties());
+ int rollupSchemaHash = Util.generateSchemaHash();
+ long rollupIndexId = Catalog.getCurrentCatalog().getNextId();
+ olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion,
+ rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType);
+ }
+
+ // analyse sequence column
+ Type sequenceColType = null;
+ try {
+ sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType());
+ if (sequenceColType != null) {
+ olapTable.setSequenceInfo(sequenceColType);
+ }
+ } catch (Exception e) {
+ throw new DdlException(e.getMessage());
+ }
+
+ // analyze version info
+ Long versionInfo = null;
+ try {
+ versionInfo = PropertyAnalyzer.analyzeVersionInfo(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ Preconditions.checkNotNull(versionInfo);
+
+ // a set to record every new tablet created when create table
+ // if failed in any step, use this set to do clear things
+ Set<Long> tabletIdSet = new HashSet<>();
+ // create partition
+ try {
+ if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
+ // this is a 1-level partitioned table
+ // use table name as partition name
+ DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
+ String partitionName = tableName;
+ long partitionId = partitionNameToId.get(partitionName);
+
+ // check replica quota if this operation done
+ long indexNum = olapTable.getIndexIdToMeta().size();
+ long bucketNum = partitionDistributionInfo.getBucketNum();
+ long replicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
+ long totalReplicaNum = indexNum * bucketNum * replicaNum;
+ if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
+ throw new DdlException(
+ "Database " + db.getFullName() + " create unpartitioned table " + tableName + " increasing "
+ + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
+ }
+ // create partition
+ Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
+ olapTable.getBaseIndexId(), partitionId, partitionName, olapTable.getIndexIdToMeta(),
+ partitionDistributionInfo, partitionInfo.getDataProperty(partitionId).getStorageMedium(),
+ partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet,
+ olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType,
+ olapTable.getDataSortInfo());
+ olapTable.addPartition(partition);
+ } else if (partitionInfo.getType() == PartitionType.RANGE
+ || partitionInfo.getType() == PartitionType.LIST) {
+ try {
+ // just for remove entries in stmt.getProperties(),
+ // and then check if there still has unknown properties
+ PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_DATA_PROPERTY);
+ if (partitionInfo.getType() == PartitionType.RANGE) {
+ DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties);
+
+ } else if (partitionInfo.getType() == PartitionType.LIST) {
+ if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
+ throw new DdlException(
+ "Only support dynamic partition properties on range partition table");
+
+ }
+ }
+
+ if (properties != null && !properties.isEmpty()) {
+ // here, all properties should be checked
+ throw new DdlException("Unknown properties: " + properties);
+ }
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
+ // check replica quota if this operation done
+ long totalReplicaNum = 0;
+ for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
+ long indexNum = olapTable.getIndexIdToMeta().size();
+ long bucketNum = defaultDistributionInfo.getBucketNum();
+ long replicaNum = partitionInfo.getReplicaAllocation(entry.getValue()).getTotalReplicaNum();
+ totalReplicaNum += indexNum * bucketNum * replicaNum;
+ }
+ if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
+ throw new DdlException(
+ "Database " + db.getFullName() + " create table " + tableName + " increasing "
+ + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
+ }
+
+ // this is a 2-level partitioned tables
+ for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
+ DataProperty dataProperty = partitionInfo.getDataProperty(entry.getValue());
+ DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
+ Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
+ olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(),
+ partitionDistributionInfo, dataProperty.getStorageMedium(),
+ partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp,
+ tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat,
+ partitionInfo.getTabletType(entry.getValue()), compressionType,
+ olapTable.getDataSortInfo());
+ olapTable.addPartition(partition);
+ }
+ } else {
+ throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
+ }
+
+ Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
+ if (!result.first) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+
+ if (result.second) {
+ if (Catalog.getCurrentColocateIndex().isColocateTable(tableId)) {
+ // if this is a colocate join table, its table id is already added to colocate group
+ // so we should remove the tableId here
+ Catalog.getCurrentColocateIndex().removeTable(tableId);
+ }
+ for (Long tabletId : tabletIdSet) {
+ Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+ }
+ LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
+ } else {
+ // we have added these index to memory, only need to persist here
+ if (Catalog.getCurrentColocateIndex().isColocateTable(tableId)) {
+ GroupId groupId = Catalog.getCurrentColocateIndex().getGroup(tableId);
+ Map<Tag, List<List<Long>>> backendsPerBucketSeq =
+ Catalog.getCurrentColocateIndex().getBackendsPerBucketSeq(groupId);
+ ColocatePersistInfo info =
+ ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
+ Catalog.getCurrentCatalog().getEditLog().logColocateAddTable(info);
+ }
+ LOG.info("successfully create table[{};{}]", tableName, tableId);
+ // register or remove table from DynamicPartition after table created
+ DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
+ Catalog.getCurrentCatalog().getDynamicPartitionScheduler()
+ .createOrUpdateRuntimeInfo(tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME,
+ TimeUtils.getCurrentFormatTime());
+ }
+ } catch (DdlException e) {
+ for (Long tabletId : tabletIdSet) {
+ Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+ }
+ // only remove from memory, because we have not persist it
+ if (Catalog.getCurrentColocateIndex().isColocateTable(tableId)) {
+ Catalog.getCurrentColocateIndex().removeTable(tableId);
+ }
+
+ throw e;
+ }
+ }
+
+ private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
+ String tableName = stmt.getTableName();
+
+ List<Column> columns = stmt.getColumns();
+
+ long tableId = Catalog.getCurrentCatalog().getNextId();
+ MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
+ mysqlTable.setComment(stmt.getComment());
+ if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ LOG.info("successfully create table[{}-{}]", tableName, tableId);
+ return;
+ }
+
+ private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
+ String tableName = stmt.getTableName();
+ List<Column> columns = stmt.getColumns();
+
+ long tableId = Catalog.getCurrentCatalog().getNextId();
+ OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
+ odbcTable.setComment(stmt.getComment());
+ if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ LOG.info("successfully create table[{}-{}]", tableName, tableId);
+ return;
+ }
+
+ private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException {
+ String tableName = stmt.getTableName();
+
+ // create columns
+ List<Column> baseSchema = stmt.getColumns();
+ validateColumns(baseSchema);
+
+ // create partition info
+ PartitionDesc partitionDesc = stmt.getPartitionDesc();
+ PartitionInfo partitionInfo = null;
+ Map<String, Long> partitionNameToId = Maps.newHashMap();
+ if (partitionDesc != null) {
+ partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
+ } else {
+ long partitionId = Catalog.getCurrentCatalog().getNextId();
+ // use table name as single partition name
+ partitionNameToId.put(tableName, partitionId);
+ partitionInfo = new SinglePartitionInfo();
+ }
+
+ long tableId = Catalog.getCurrentCatalog().getNextId();
+ EsTable esTable = new EsTable(tableId, tableName, baseSchema, stmt.getProperties(), partitionInfo);
+ esTable.setComment(stmt.getComment());
+
+ if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ LOG.info("successfully create table{} with id {}", tableName, tableId);
+ return esTable;
+ }
+
+ private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
+ String tableName = stmt.getTableName();
+
+ List<Column> columns = stmt.getColumns();
+
+ long tableId = Catalog.getCurrentCatalog().getNextId();
+ BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, stmt.getProperties());
+ brokerTable.setComment(stmt.getComment());
+ brokerTable.setBrokerProperties(stmt.getExtProperties());
+
+ if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ LOG.info("successfully create table[{}-{}]", tableName, tableId);
+
+ return;
+ }
+
+ private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
+ String tableName = stmt.getTableName();
+ List<Column> columns = stmt.getColumns();
+ long tableId = Catalog.getCurrentCatalog().getNextId();
+ HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
+ hiveTable.setComment(stmt.getComment());
+ // check hive table whether exists in hive database
+ HiveMetaStoreClient hiveMetaStoreClient =
+ HiveMetaStoreClientHelper.getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
+ if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, hiveTable.getHiveDb(),
+ hiveTable.getHiveTable())) {
+ throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
+ }
+ // check hive table if exists in doris database
+ if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ LOG.info("successfully create table[{}-{}]", tableName, tableId);
+ }
+
+ private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlException {
+ String tableName = stmt.getTableName();
+ List<Column> columns = stmt.getColumns();
+ long tableId = Catalog.getCurrentCatalog().getNextId();
+ HudiTable hudiTable = new HudiTable(tableId, tableName, columns, stmt.getProperties());
+ hudiTable.setComment(stmt.getComment());
+ // check hudi properties in create stmt.
+ HudiUtils.validateCreateTable(hudiTable);
+ // check hudi table whether exists in hive database
+ String metastoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
+ HiveMetaStoreClient hiveMetaStoreClient = HiveMetaStoreClientHelper.getClient(metastoreUris);
+ if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, hudiTable.getHmsDatabaseName(),
+ hudiTable.getHmsTableName())) {
+ throw new DdlException(
+ String.format("Table [%s] dose not exist in Hive Metastore.", hudiTable.getHmsTableIdentifer()));
+ }
+ org.apache.hadoop.hive.metastore.api.Table hiveTable =
+ HiveMetaStoreClientHelper.getTable(hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName(),
+ metastoreUris);
+ if (!HudiUtils.isHudiTable(hiveTable)) {
+ throw new DdlException(String.format("Table [%s] is not a hudi table.", hudiTable.getHmsTableIdentifer()));
+ }
+ // after support snapshot query for mor, we should remove the check.
+ if (HudiUtils.isHudiRealtimeTable(hiveTable)) {
+ throw new DdlException(String.format("Can not support hudi realtime table.", hudiTable.getHmsTableName()));
+ }
+ // check table's schema when user specify the schema
+ if (!hudiTable.getFullSchema().isEmpty()) {
+ HudiUtils.validateColumns(hudiTable, hiveTable);
+ }
+ switch (hiveTable.getTableType()) {
+ case "EXTERNAL_TABLE":
+ case "MANAGED_TABLE":
+ break;
+ case "VIRTUAL_VIEW":
+ default:
+ throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "].");
+ }
+ // check hive table if exists in doris database
+ if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ LOG.info("successfully create table[{}-{}]", tableName, tableId);
+ }
+
+ private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
+ DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta,
+ Set<Long> tabletIdSet) throws DdlException {
+ ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+ Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
+ GroupId groupId = null;
+ if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
+ if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
+ throw new DdlException("Random distribution for colocate table is unsupported");
+ }
+ // if this is a colocate table, try to get backend seqs from colocation index.
+ groupId = colocateIndex.getGroup(tabletMeta.getTableId());
+ backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId);
+ }
+
+ // chooseBackendsArbitrary is true, means this may be the first table of colocation group,
+ // or this is just a normal table, and we can choose backends arbitrary.
+ // otherwise, backends should be chosen from backendsPerBucketSeq;
+ boolean chooseBackendsArbitrary = backendsPerBucketSeq == null || backendsPerBucketSeq.isEmpty();
+ if (chooseBackendsArbitrary) {
+ backendsPerBucketSeq = Maps.newHashMap();
+ }
+ for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
+ // create a new tablet with random chosen backends
+ Tablet tablet = new Tablet(Catalog.getCurrentCatalog().getNextId());
+
+ // add tablet to inverted index first
+ index.addTablet(tablet, tabletMeta);
+ tabletIdSet.add(tablet.getId());
+
+ // get BackendIds
+ Map<Tag, List<Long>> chosenBackendIds;
+ if (chooseBackendsArbitrary) {
+ // This is the first colocate table in the group, or just a normal table,
+ // randomly choose backends
+ if (!Config.disable_storage_medium_check) {
+ chosenBackendIds = Catalog.getCurrentSystemInfo()
+ .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
+ tabletMeta.getStorageMedium());
+ } else {
+ chosenBackendIds = Catalog.getCurrentSystemInfo()
+ .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
+ }
+
+ for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
+ backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
+ backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
+ }
+ } else {
+ // get backends from existing backend sequence
+ chosenBackendIds = Maps.newHashMap();
+ for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) {
+ chosenBackendIds.put(entry.getKey(), entry.getValue().get(i));
+ }
+ }
+ // create replicas
+ short totalReplicaNum = (short) 0;
+ for (List<Long> backendIds : chosenBackendIds.values()) {
+ for (long backendId : backendIds) {
+ long replicaId = Catalog.getCurrentCatalog().getNextId();
+ Replica replica =
+ new Replica(replicaId, backendId, replicaState, version, tabletMeta.getOldSchemaHash());
+ tablet.addReplica(replica);
+ totalReplicaNum++;
+ }
+ }
+ Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum(),
+ totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum());
+ }
+
+ if (groupId != null && chooseBackendsArbitrary) {
+ colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
+ ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
+ Catalog.getCurrentCatalog().getEditLog().logColocateBackendsPerBucketSeq(info);
+ }
+ }
+
+ /*
+ * generate and check columns' order and key's existence
+ */
+ private void validateColumns(List<Column> columns) throws DdlException {
+ if (columns.isEmpty()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
+ }
+
+ boolean encounterValue = false;
+ boolean hasKey = false;
+ for (Column column : columns) {
+ if (column.isKey()) {
+ if (encounterValue) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_OLAP_KEY_MUST_BEFORE_VALUE);
+ }
+ hasKey = true;
+ } else {
+ encounterValue = true;
+ }
+ }
+
+ if (!hasKey) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_MUST_HAVE_KEYS);
+ }
+ }
+
+ /*
+ * Truncate specified table or partitions.
+ * The main idea is:
+ *
+ * 1. using the same schema to create new table(partitions)
+ * 2. use the new created table(partitions) to replace the old ones.
+ *
+ * if no partition specified, it will truncate all partitions of this table, including all temp partitions,
+ * otherwise, it will only truncate those specified partitions.
+ *
+ */
+ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlException {
+ TableRef tblRef = truncateTableStmt.getTblRef();
+ TableName dbTbl = tblRef.getName();
+
+ // check, and save some info which need to be checked again later
+ Map<String, Long> origPartitions = Maps.newHashMap();
+ Map<Long, DistributionInfo> partitionsDistributionInfo = Maps.newHashMap();
+ OlapTable copiedTbl;
+
+ boolean truncateEntireTable = tblRef.getPartitionNames() == null;
+
+ Database db = (Database) getDbOrDdlException(dbTbl.getDb());
+ OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
+
+ olapTable.readLock();
+ try {
+ if (olapTable.getState() != OlapTableState.NORMAL) {
+ throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
+ }
+
+ if (!truncateEntireTable) {
+ for (String partName : tblRef.getPartitionNames().getPartitionNames()) {
+ Partition partition = olapTable.getPartition(partName);
+ if (partition == null) {
+ throw new DdlException("Partition " + partName + " does not exist");
+ }
+ origPartitions.put(partName, partition.getId());
+ partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
+ }
+ } else {
+ for (Partition partition : olapTable.getPartitions()) {
+ origPartitions.put(partition.getName(), partition.getId());
+ partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
+ }
+ }
+ copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), IndexExtState.VISIBLE, false);
+ } finally {
+ olapTable.readUnlock();
+ }
+
+ // 2. use the copied table to create partitions
+ List<Partition> newPartitions = Lists.newArrayList();
+ // tabletIdSet to save all newly created tablet ids.
+ Set<Long> tabletIdSet = Sets.newHashSet();
+ try {
+ for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
+ // the new partition must use new id
+ // If we still use the old partition id, the behavior of current load jobs on this partition
+ // will be undefined.
+ // By using a new id, load job will be aborted(just like partition is dropped),
+ // which is the right behavior.
+ long oldPartitionId = entry.getValue();
+ long newPartitionId = Catalog.getCurrentCatalog().getNextId();
+ Partition newPartition = createPartitionWithIndices(db.getClusterName(), db.getId(), copiedTbl.getId(),
+ copiedTbl.getBaseIndexId(), newPartitionId, entry.getKey(), copiedTbl.getIndexIdToMeta(),
+ partitionsDistributionInfo.get(oldPartitionId),
+ copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).getStorageMedium(),
+ copiedTbl.getPartitionInfo().getReplicaAllocation(oldPartitionId), null /* version info */,
+ copiedTbl.getCopiedBfColumns(), copiedTbl.getBfFpp(), tabletIdSet, copiedTbl.getCopiedIndexes(),
+ copiedTbl.isInMemory(), copiedTbl.getStorageFormat(),
+ copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), copiedTbl.getCompressionType(),
+ copiedTbl.getDataSortInfo());
+ newPartitions.add(newPartition);
+ }
+ } catch (DdlException e) {
+ // create partition failed, remove all newly created tablets
+ for (Long tabletId : tabletIdSet) {
+ Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+ }
+ throw e;
+ }
+ Preconditions.checkState(origPartitions.size() == newPartitions.size());
+
+ // all partitions are created successfully, try to replace the old partitions.
+ // before replacing, we need to check again.
+ // Things may be changed outside the table lock.
+ olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId());
+ olapTable.writeLockOrDdlException();
+ try {
+ if (olapTable.getState() != OlapTableState.NORMAL) {
+ throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
+ }
+
+ // check partitions
+ for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
+ Partition partition = copiedTbl.getPartition(entry.getValue());
+ if (partition == null || !partition.getName().equalsIgnoreCase(entry.getKey())) {
+ throw new DdlException("Partition [" + entry.getKey() + "] is changed");
+ }
+ }
+
+ // check if meta changed
+ // rollup index may be added or dropped, and schema may be changed during creating partition operation.
+ boolean metaChanged = false;
+ if (olapTable.getIndexNameToId().size() != copiedTbl.getIndexNameToId().size()) {
+ metaChanged = true;
+ } else {
+ // compare schemaHash
+ Map<Long, Integer> copiedIndexIdToSchemaHash = copiedTbl.getIndexIdToSchemaHash();
+ for (Map.Entry<Long, Integer> entry : olapTable.getIndexIdToSchemaHash().entrySet()) {
+ long indexId = entry.getKey();
+ if (!copiedIndexIdToSchemaHash.containsKey(indexId)) {
+ metaChanged = true;
+ break;
+ }
+ if (!copiedIndexIdToSchemaHash.get(indexId).equals(entry.getValue())) {
+ metaChanged = true;
+ break;
+ }
+ }
+ }
+
+ if (metaChanged) {
+ throw new DdlException("Table[" + copiedTbl.getName() + "]'s meta has been changed. try again.");
+ }
+
+ // replace
+ truncateTableInternal(olapTable, newPartitions, truncateEntireTable);
+
+ // write edit log
+ TruncateTableInfo info =
+ new TruncateTableInfo(db.getId(), olapTable.getId(), newPartitions, truncateEntireTable);
+ Catalog.getCurrentCatalog().getEditLog().logTruncateTable(info);
+ } finally {
+ olapTable.writeUnlock();
+ }
+
+ LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames());
+ }
+
+ private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions, boolean isEntireTable) {
+ // use new partitions to replace the old ones.
+ Set<Long> oldTabletIds = Sets.newHashSet();
+ for (Partition newPartition : newPartitions) {
+ Partition oldPartition = olapTable.replacePartition(newPartition);
+ // save old tablets to be removed
+ for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) {
+ index.getTablets().stream().forEach(t -> {
+ oldTabletIds.add(t.getId());
+ });
+ }
+ }
+
+ if (isEntireTable) {
+ // drop all temp partitions
+ olapTable.dropAllTempPartitions();
+ }
+
+ // remove the tablets in old partitions
+ for (Long tabletId : oldTabletIds) {
+ Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+ }
+ }
+
+ public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(info.getDbId());
+ OlapTable olapTable = db.getTableOrMetaException(info.getTblId(), TableType.OLAP);
+ olapTable.writeLock();
+ try {
+ truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable());
+
+ if (!Catalog.isCheckpointThread()) {
+ // add tablet to inverted index
+ TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+ for (Partition partition : info.getPartitions()) {
+ long partitionId = partition.getId();
+ TStorageMedium medium =
+ olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
+ for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
+ long indexId = mIndex.getId();
+ int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+ TabletMeta tabletMeta =
+ new TabletMeta(db.getId(), olapTable.getId(), partitionId, indexId, schemaHash, medium);
+ for (Tablet tablet : mIndex.getTablets()) {
+ long tabletId = tablet.getId();
+ invertedIndex.addTablet(tabletId, tabletMeta);
+ for (Replica replica : tablet.getReplicas()) {
+ invertedIndex.addReplica(tabletId, replica);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ public void replayAlterExternalTableSchema(String dbName, String tableName, List<Column> newSchema)
+ throws MetaNotFoundException {
+ Database db = (Database) getDbOrMetaException(dbName);
+ Table table = db.getTableOrMetaException(tableName);
+ table.writeLock();
+ try {
+ table.setNewFullSchema(newSchema);
+ } finally {
+ table.writeUnlock();
+ }
+ }
+
+ // for test only
+ public void clearDbs() {
+ if (idToDb != null) {
+ idToDb.clear();
+ }
+ if (fullNameToDb != null) {
+ fullNameToDb.clear();
+ }
+ }
+
+ /**
+ * create cluster
+ *
+ * @param stmt
+ * @throws DdlException
+ */
+ public void createCluster(CreateClusterStmt stmt) throws DdlException {
+ final String clusterName = stmt.getClusterName();
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ try {
+ if (nameToCluster.containsKey(clusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_HAS_EXIST, clusterName);
+ } else {
+ List<Long> backendList =
+ Catalog.getCurrentSystemInfo().createCluster(clusterName, stmt.getInstanceNum());
+ // 1: BE returned is less than requested, throws DdlException.
+ // 2: BE returned is more than or equal to 0, succeeds.
+ if (backendList != null || stmt.getInstanceNum() == 0) {
+ final long id = Catalog.getCurrentCatalog().getNextId();
+ final Cluster cluster = new Cluster(clusterName, id);
+ cluster.setBackendIdList(backendList);
+ unprotectCreateCluster(cluster);
+ if (clusterName.equals(SystemInfoService.DEFAULT_CLUSTER)) {
+ for (Database db : idToDb.values()) {
+ if (db.getClusterName().equals(SystemInfoService.DEFAULT_CLUSTER)) {
+ cluster.addDb(db.getFullName(), db.getId());
+ }
+ }
+ }
+ Catalog.getCurrentCatalog().getEditLog().logCreateCluster(cluster);
+ LOG.info("finish to create cluster: {}", clusterName);
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BE_NOT_ENOUGH);
+ }
+ }
+ } finally {
+ unlock();
+ }
+
+ // create super user for this cluster
+ UserIdentity adminUser = new UserIdentity(PaloAuth.ADMIN_USER, "%");
+ try {
+ adminUser.analyze(stmt.getClusterName());
+ } catch (AnalysisException e) {
+ LOG.error("should not happen", e);
+ }
+ Catalog.getCurrentCatalog().getAuth().createUser(new CreateUserStmt(new UserDesc(adminUser, "", true)));
+ }
+
+ private void unprotectCreateCluster(Cluster cluster) {
+ for (Long id : cluster.getBackendIdList()) {
+ final Backend backend = Catalog.getCurrentSystemInfo().getBackend(id);
+ backend.setOwnerClusterName(cluster.getName());
+ backend.setBackendState(BackendState.using);
+ }
+
+ idToCluster.put(cluster.getId(), cluster);
+ nameToCluster.put(cluster.getName(), cluster);
+
+ // create info schema db
+ final InfoSchemaDb infoDb = new InfoSchemaDb(cluster.getName());
+ infoDb.setClusterName(cluster.getName());
+ unprotectCreateDb(infoDb);
+
+ // only need to create default cluster once.
+ if (cluster.getName().equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
+ Catalog.getCurrentCatalog().setDefaultClusterCreated(true);
+ }
+ }
+
+ /**
+ * replay create cluster
+ *
+ * @param cluster
+ */
+ public void replayCreateCluster(Cluster cluster) {
+ tryLock(true);
+ try {
+ unprotectCreateCluster(cluster);
+ } finally {
+ unlock();
+ }
+ }
+
+ /**
+ * drop cluster and cluster's db must be have deleted
+ *
+ * @param stmt
+ * @throws DdlException
+ */
+ public void dropCluster(DropClusterStmt stmt) throws DdlException {
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ try {
+ final String clusterName = stmt.getClusterName();
+ final Cluster cluster = nameToCluster.get(clusterName);
+ if (cluster == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
+ }
+ final List<Backend> backends = Catalog.getCurrentSystemInfo().getClusterBackends(clusterName);
+ for (Backend backend : backends) {
+ if (backend.isDecommissioned()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_IN_DECOMMISSION, clusterName);
+ }
+ }
+
+ // check if there still have databases undropped, except for information_schema db
+ if (cluster.getDbNames().size() > 1) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DELETE_DB_EXIST, clusterName);
+ }
+
+ Catalog.getCurrentSystemInfo().releaseBackends(clusterName, false /* is not replay */);
+ final ClusterInfo info = new ClusterInfo(clusterName, cluster.getId());
+ unprotectDropCluster(info, false /* is not replay */);
+ Catalog.getCurrentCatalog().getEditLog().logDropCluster(info);
+ } finally {
+ unlock();
+ }
+
+ // drop user of this cluster
+ // set is replay to true, not write log
+ Catalog.getCurrentCatalog().getAuth().dropUserOfCluster(stmt.getClusterName(), true /* is replay */);
+ }
+
+ private void unprotectDropCluster(ClusterInfo info, boolean isReplay) {
+ Catalog.getCurrentSystemInfo().releaseBackends(info.getClusterName(), isReplay);
+ idToCluster.remove(info.getClusterId());
+ nameToCluster.remove(info.getClusterName());
+ final Database infoSchemaDb = fullNameToDb.get(InfoSchemaDb.getFullInfoSchemaDbName(info.getClusterName()));
+ fullNameToDb.remove(infoSchemaDb.getFullName());
+ idToDb.remove(infoSchemaDb.getId());
+ }
+
+ public void replayDropCluster(ClusterInfo info) throws DdlException {
+ tryLock(true);
+ try {
+ unprotectDropCluster(info, true/* is replay */);
+ } finally {
+ unlock();
+ }
+
+ Catalog.getCurrentCatalog().getAuth().dropUserOfCluster(info.getClusterName(), true /* is replay */);
+ }
+
+ public void replayExpandCluster(ClusterInfo info) {
+ tryLock(true);
+ try {
+ final Cluster cluster = nameToCluster.get(info.getClusterName());
+ cluster.addBackends(info.getBackendIdList());
+
+ for (Long beId : info.getBackendIdList()) {
+ Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
+ if (be == null) {
+ continue;
+ }
+ be.setOwnerClusterName(info.getClusterName());
+ be.setBackendState(BackendState.using);
+ }
+ } finally {
+ unlock();
+ }
+ }
+
+ /**
+ * modify cluster: Expansion or shrink
+ *
+ * @param stmt
+ * @throws DdlException
+ */
+ public void processModifyCluster(AlterClusterStmt stmt) throws UserException {
+ final String clusterName = stmt.getAlterClusterName();
+ final int newInstanceNum = stmt.getInstanceNum();
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ try {
+ Cluster cluster = nameToCluster.get(clusterName);
+ if (cluster == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
+ }
+
+ // check if this cluster has backend in decommission
+ final List<Long> backendIdsInCluster = cluster.getBackendIdList();
+ for (Long beId : backendIdsInCluster) {
+ Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
+ if (be.isDecommissioned()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_IN_DECOMMISSION, clusterName);
+ }
+ }
+
+ final int oldInstanceNum = backendIdsInCluster.size();
+ if (newInstanceNum > oldInstanceNum) {
+ // expansion
+ final List<Long> expandBackendIds = Catalog.getCurrentSystemInfo()
+ .calculateExpansionBackends(clusterName, newInstanceNum - oldInstanceNum);
+ if (expandBackendIds == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BE_NOT_ENOUGH);
+ }
+ cluster.addBackends(expandBackendIds);
+ final ClusterInfo info = new ClusterInfo(clusterName, cluster.getId(), expandBackendIds);
+ Catalog.getCurrentCatalog().getEditLog().logExpandCluster(info);
+ } else if (newInstanceNum < oldInstanceNum) {
+ // shrink
+ final List<Long> decomBackendIds = Catalog.getCurrentSystemInfo()
+ .calculateDecommissionBackends(clusterName, oldInstanceNum - newInstanceNum);
+ if (decomBackendIds == null || decomBackendIds.size() == 0) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_BACKEND_ERROR);
+ }
+
+ List<String> hostPortList = Lists.newArrayList();
+ for (Long id : decomBackendIds) {
+ final Backend backend = Catalog.getCurrentSystemInfo().getBackend(id);
+ hostPortList.add(
+ new StringBuilder().append(backend.getHost()).append(":").append(backend.getHeartbeatPort())
+ .toString());
+ }
+
+ // here we reuse the process of decommission backends. but set backend's decommission type to
+ // ClusterDecommission, which means this backend will not be removed from the system
+ // after decommission is done.
+ final DecommissionBackendClause clause = new DecommissionBackendClause(hostPortList);
+ try {
+ clause.analyze(null);
+ clause.setType(DecommissionType.ClusterDecommission);
+ AlterSystemStmt alterStmt = new AlterSystemStmt(clause);
+ alterStmt.setClusterName(clusterName);
+ Catalog.getCurrentCatalog().getAlterInstance().processAlterCluster(alterStmt);
+ } catch (AnalysisException e) {
+ Preconditions.checkState(false, "should not happened: " + e.getMessage());
+ }
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_ALTER_BE_NO_CHANGE, newInstanceNum);
+ }
+
+ } finally {
+ unlock();
+ }
+ }
+
+ /**
+ * @param ctx
+ * @param clusterName
+ * @throws DdlException
+ */
+ public void changeCluster(ConnectContext ctx, String clusterName) throws DdlException {
+ if (!Catalog.getCurrentCatalog().getAuth().checkCanEnterCluster(ConnectContext.get(), clusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_AUTHORITY, ConnectContext.get().getQualifiedUser(),
+ "enter");
+ }
+
+ if (!nameToCluster.containsKey(clusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName);
+ }
+
+ ctx.setCluster(clusterName);
+ }
+
+ /**
+ * migrate db to link dest cluster
+ *
+ * @param stmt
+ * @throws DdlException
+ */
+ public void migrateDb(MigrateDbStmt stmt) throws DdlException {
+ final String srcClusterName = stmt.getSrcCluster();
+ final String destClusterName = stmt.getDestCluster();
+ final String srcDbName = stmt.getSrcDb();
+ final String destDbName = stmt.getDestDb();
+
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ try {
+ if (!nameToCluster.containsKey(srcClusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NOT_EXIST, srcClusterName);
+ }
+ if (!nameToCluster.containsKey(destClusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DEST_CLUSTER_NOT_EXIST, destClusterName);
+ }
+
+ if (srcClusterName.equals(destClusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_SAME_CLUSTER);
+ }
+
+ final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
+ if (!srcCluster.containDb(srcDbName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_DB_NOT_EXIST, srcDbName);
+ }
+ final Cluster destCluster = this.nameToCluster.get(destClusterName);
+ if (!destCluster.containLink(destDbName, srcDbName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATION_NO_LINK, srcDbName, destDbName);
+ }
+
+ final Database db = fullNameToDb.get(srcDbName);
+
+ // if the max replication num of the src db is larger then the backends num of the dest cluster,
+ // the migration will not be processed.
+ final int maxReplicationNum = db.getMaxReplicationNum();
+ if (maxReplicationNum > destCluster.getBackendIdList().size()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_BE_NOT_ENOUGH, destClusterName);
+ }
+
+ if (db.getDbState() == DbState.LINK) {
+ final BaseParam param = new BaseParam();
+ param.addStringParam(destDbName);
+ param.addLongParam(db.getId());
+ param.addStringParam(srcDbName);
+ param.addStringParam(destClusterName);
+ param.addStringParam(srcClusterName);
+ fullNameToDb.remove(db.getFullName());
+ srcCluster.removeDb(db.getFullName(), db.getId());
+ destCluster.removeLinkDb(param);
+ destCluster.addDb(destDbName, db.getId());
+ db.writeLock();
+ try {
+ db.setDbState(DbState.MOVE);
+ // set cluster to the dest cluster.
+ // and Clone process will do the migration things.
+ db.setClusterName(destClusterName);
+ db.setName(destDbName);
+ db.setAttachDb(srcDbName);
+ } finally {
+ db.writeUnlock();
+ }
+ Catalog.getCurrentCatalog().getEditLog().logMigrateCluster(param);
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATION_NO_LINK, srcDbName, destDbName);
+ }
+ } finally {
+ unlock();
+ }
+ }
+
+ public void replayMigrateDb(BaseParam param) {
+ final String desDbName = param.getStringParam();
+ final String srcDbName = param.getStringParam(1);
+ final String desClusterName = param.getStringParam(2);
+ final String srcClusterName = param.getStringParam(3);
+ tryLock(true);
+ try {
+ final Cluster desCluster = this.nameToCluster.get(desClusterName);
+ final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
+ final Database db = fullNameToDb.get(srcDbName);
+ if (db.getDbState() == DbState.LINK) {
+ fullNameToDb.remove(db.getFullName());
+ srcCluster.removeDb(db.getFullName(), db.getId());
+ desCluster.removeLinkDb(param);
+ desCluster.addDb(param.getStringParam(), db.getId());
+
+ db.writeLock();
+ db.setName(desDbName);
+ db.setAttachDb(srcDbName);
+ db.setDbState(DbState.MOVE);
+ db.setClusterName(desClusterName);
+ db.writeUnlock();
+ }
+ } finally {
+ unlock();
+ }
+ }
+
+ public void replayLinkDb(BaseParam param) {
+ final String desClusterName = param.getStringParam(2);
+ final String srcDbName = param.getStringParam(1);
+ final String desDbName = param.getStringParam();
+
+ tryLock(true);
+ try {
+ final Cluster desCluster = this.nameToCluster.get(desClusterName);
+ final Database srcDb = fullNameToDb.get(srcDbName);
+ srcDb.writeLock();
+ srcDb.setDbState(DbState.LINK);
+ srcDb.setAttachDb(desDbName);
+ srcDb.writeUnlock();
+ desCluster.addLinkDb(param);
+ fullNameToDb.put(desDbName, srcDb);
+ } finally {
+ unlock();
+ }
+ }
+
+ /**
+ * link src db to dest db. we use java's quotation Mechanism to realize db hard links
+ *
+ * @param stmt
+ * @throws DdlException
+ */
+ public void linkDb(LinkDbStmt stmt) throws DdlException {
+ final String srcClusterName = stmt.getSrcCluster();
+ final String destClusterName = stmt.getDestCluster();
+ final String srcDbName = stmt.getSrcDb();
+ final String destDbName = stmt.getDestDb();
+
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try again");
+ }
+ try {
+ if (!nameToCluster.containsKey(srcClusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NOT_EXIST, srcClusterName);
+ }
+
+ if (!nameToCluster.containsKey(destClusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DEST_CLUSTER_NOT_EXIST, destClusterName);
+ }
+
+ if (srcClusterName.equals(destClusterName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_MIGRATE_SAME_CLUSTER);
+ }
+
+ if (fullNameToDb.containsKey(destDbName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, destDbName);
+ }
+
+ final Cluster srcCluster = this.nameToCluster.get(srcClusterName);
+ final Cluster destCluster = this.nameToCluster.get(destClusterName);
+
+ if (!srcCluster.containDb(srcDbName)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_DB_NOT_EXIST, srcDbName);
+ }
+ final Database srcDb = fullNameToDb.get(srcDbName);
+
+ if (srcDb.getDbState() != DbState.NORMAL) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_DB_STATE_LINK_OR_MIGRATE,
+ ClusterNamespace.getNameFromFullName(srcDbName));
+ }
+
+ srcDb.writeLock();
+ try {
+ srcDb.setDbState(DbState.LINK);
+ srcDb.setAttachDb(destDbName);
+ } finally {
+ srcDb.writeUnlock();
+ }
+
+ final long id = Catalog.getCurrentCatalog().getNextId();
+ final BaseParam param = new BaseParam();
+ param.addStringParam(destDbName);
+ param.addStringParam(srcDbName);
+ param.addLongParam(id);
+ param.addLongParam(srcDb.getId());
+ param.addStringParam(destClusterName);
+ param.addStringParam(srcClusterName);
+ destCluster.addLinkDb(param);
+ fullNameToDb.put(destDbName, srcDb);
+ Catalog.getCurrentCatalog().getEditLog().logLinkCluster(param);
+ } finally {
+ unlock();
+ }
+ }
+
+ public Cluster getCluster(String clusterName) {
+ return nameToCluster.get(clusterName);
+ }
+
+ public List<String> getClusterNames() {
+ return new ArrayList<String>(nameToCluster.keySet());
+ }
+
+ /**
+ * get migrate progress , when finish migration, next cloneCheck will reset dbState
+ *
+ * @return
+ */
+ public Set<BaseParam> getMigrations() {
+ final Set<BaseParam> infos = Sets.newHashSet();
+ for (Database db : fullNameToDb.values()) {
+ db.readLock();
+ try {
+ if (db.getDbState() == DbState.MOVE) {
+ int tabletTotal = 0;
+ int tabletQuorum = 0;
+ final Set<Long> beIds =
+ Sets.newHashSet(Catalog.getCurrentSystemInfo().getClusterBackendIds(db.getClusterName()));
+ final Set<String> tableNames = db.getTableNamesWithLock();
+ for (String tableName : tableNames) {
+
+ Table table = db.getTableNullable(tableName);
+ if (table == null || table.getType() != TableType.OLAP) {
+ continue;
+ }
+
+ OlapTable olapTable = (OlapTable) table;
+ olapTable.readLock();
+ try {
+ for (Partition partition : olapTable.getPartitions()) {
+ ReplicaAllocation replicaAlloc =
+ olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
+ short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+ for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(
+ IndexExtState.ALL)) {
+ if (materializedIndex.getState() != IndexState.NORMAL) {
+ continue;
+ }
+ for (Tablet tablet : materializedIndex.getTablets()) {
+ int replicaNum = 0;
+ int quorum = totalReplicaNum / 2 + 1;
+ for (Replica replica : tablet.getReplicas()) {
+ if (replica.getState() != ReplicaState.CLONE && beIds.contains(
+ replica.getBackendId())) {
+ replicaNum++;
+ }
+ }
+ if (replicaNum > quorum) {
+ replicaNum = quorum;
+ }
+
+ tabletQuorum = tabletQuorum + replicaNum;
+ tabletTotal = tabletTotal + quorum;
+ }
+ }
+ }
+ } finally {
+ olapTable.readUnlock();
+ }
+ }
+ final BaseParam info = new BaseParam();
+ info.addStringParam(db.getClusterName());
+ info.addStringParam(db.getAttachDb());
+ info.addStringParam(db.getFullName());
+ final float percentage = tabletTotal > 0 ? (float) tabletQuorum / (float) tabletTotal : 0f;
+ info.addFloatParam(percentage);
+ infos.add(info);
+ }
+ } finally {
+ db.readUnlock();
+ }
+ }
+
+ return infos;
+ }
+
+ public long loadCluster(DataInputStream dis, long checksum) throws IOException, DdlException {
+ int clusterCount = dis.readInt();
+ checksum ^= clusterCount;
+ for (long i = 0; i < clusterCount; ++i) {
+ final Cluster cluster = Cluster.read(dis);
+ checksum ^= cluster.getId();
+
+ List<Long> latestBackendIds = Catalog.getCurrentSystemInfo().getClusterBackendIds(cluster.getName());
+ if (latestBackendIds.size() != cluster.getBackendIdList().size()) {
+ LOG.warn(
+ "Cluster:" + cluster.getName() + ", backends in Cluster is " + cluster.getBackendIdList().size()
+ + ", backends in SystemInfoService is " + cluster.getBackendIdList().size());
+ }
+ // The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
+ // SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are
+ // for adding BE to some Cluster, but loadCluster is after loadBackend.
+ cluster.setBackendIdList(latestBackendIds);
+
+ String dbName = InfoSchemaDb.getFullInfoSchemaDbName(cluster.getName());
+ // Use real Catalog instance to avoid InfoSchemaDb id continuously increment
+ // when checkpoint thread load image.
+ InfoSchemaDb db = (InfoSchemaDb) Catalog.getServingCatalog().getDbNullable(dbName);
+ if (db == null) {
+ db = new InfoSchemaDb(cluster.getName());
+ db.setClusterName(cluster.getName());
+ }
+ String errMsg = "InfoSchemaDb id shouldn't larger than 10000, please restart your FE server";
+ // Every time we construct the InfoSchemaDb, which id will increment.
+ // When InfoSchemaDb id larger than 10000 and put it to idToDb,
+ // which may be overwrite the normal db meta in idToDb,
+ // so we ensure InfoSchemaDb id less than 10000.
+ Preconditions.checkState(db.getId() < Catalog.NEXT_ID_INIT_VALUE, errMsg);
+ idToDb.put(db.getId(), db);
+ fullNameToDb.put(db.getFullName(), db);
+ cluster.addDb(dbName, db.getId());
+ idToCluster.put(cluster.getId(), cluster);
+ nameToCluster.put(cluster.getName(), cluster);
+ }
+ LOG.info("finished replay cluster from image");
+ return checksum;
+ }
+
+ public void initDefaultCluster() {
+ final List<Long> backendList = Lists.newArrayList();
+ final List<Backend> defaultClusterBackends =
+ Catalog.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
+ for (Backend backend : defaultClusterBackends) {
+ backendList.add(backend.getId());
+ }
+
+ final long id = Catalog.getCurrentCatalog().getNextId();
+ final Cluster cluster = new Cluster(SystemInfoService.DEFAULT_CLUSTER, id);
+
+ // make sure one host hold only one backend.
+ Set<String> beHost = Sets.newHashSet();
+ for (Backend be : defaultClusterBackends) {
+ if (beHost.contains(be.getHost())) {
+ // we can not handle this situation automatically.
+ LOG.error("found more than one backends in same host: {}", be.getHost());
+ System.exit(-1);
+ } else {
+ beHost.add(be.getHost());
+ }
+ }
+
+ // we create default_cluster to meet the need for ease of use, because
+ // most users have no multi tenant needs.
+ cluster.setBackendIdList(backendList);
+ unprotectCreateCluster(cluster);
+ for (Database db : idToDb.values()) {
+ db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
+ cluster.addDb(db.getFullName(), db.getId());
+ }
+
+ // no matter default_cluster is created or not,
+ // mark isDefaultClusterCreated as true
+ Catalog.getCurrentCatalog().setDefaultClusterCreated(true);
+ Catalog.getCurrentCatalog().getEditLog().logCreateCluster(cluster);
+ }
+
+ public void replayUpdateDb(DatabaseInfo info) {
+ final Database db = fullNameToDb.get(info.getDbName());
+ db.setClusterName(info.getClusterName());
+ db.setDbState(info.getDbState());
+ }
+
+ public long saveCluster(CountingDataOutputStream dos, long checksum) throws IOException {
+ final int clusterCount = idToCluster.size();
+ checksum ^= clusterCount;
+ dos.writeInt(clusterCount);
+ for (Map.Entry<Long, Cluster> entry : idToCluster.entrySet()) {
+ long clusterId = entry.getKey();
+ if (clusterId >= Catalog.NEXT_ID_INIT_VALUE) {
+ checksum ^= clusterId;
+ final Cluster cluster = entry.getValue();
+ cluster.write(dos);
+ }
+ }
+ return checksum;
+ }
+
+ public void replayUpdateClusterAndBackends(BackendIdsUpdateInfo info) {
+ for (long id : info.getBackendList()) {
+ final Backend backend = Catalog.getCurrentSystemInfo().getBackend(id);
+ final Cluster cluster = nameToCluster.get(backend.getOwnerClusterName());
+ cluster.removeBackend(id);
+ backend.setDecommissioned(false);
+ backend.clearClusterName();
+ backend.setBackendState(BackendState.free);
+ }
+ }
+
+ public List<String> getClusterDbNames(String clusterName) throws AnalysisException {
+ final Cluster cluster = nameToCluster.get(clusterName);
+ if (cluster == null) {
+ throw new AnalysisException("No cluster selected");
+ }
+ return Lists.newArrayList(cluster.getDbNames());
+ }
+
+ public long saveDb(CountingDataOutputStream dos, long checksum) throws IOException {
+ int dbCount = idToDb.size() - nameToCluster.keySet().size();
+ checksum ^= dbCount;
+ dos.writeInt(dbCount);
+ for (Map.Entry<Long, Database> entry : idToDb.entrySet()) {
+ Database db = entry.getValue();
+ String dbName = db.getFullName();
+ // Don't write information_schema db meta
+ if (!InfoSchemaDb.isInfoSchemaDb(dbName)) {
+ checksum ^= entry.getKey();
+ db.write(dos);
+ }
+ }
+ return checksum;
+ }
+
+ public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlException {
+ int dbCount = dis.readInt();
+ long newChecksum = checksum ^ dbCount;
+ for (long i = 0; i < dbCount; ++i) {
+ Database db = new Database();
+ db.readFields(dis);
+ newChecksum ^= db.getId();
+ idToDb.put(db.getId(), db);
+ fullNameToDb.put(db.getFullName(), db);
+ if (db.getDbState() == DbState.LINK) {
+ fullNameToDb.put(db.getAttachDb(), db);
+ }
+ Catalog.getCurrentGlobalTransactionMgr().addDatabaseTransactionMgr(db.getId());
+ }
+ LOG.info("finished replay databases from image");
+ return newChecksum;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java
index 09d3893cd3..6b9b7f6818 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java
@@ -50,7 +50,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -168,19 +167,19 @@ public class ShowAction extends RestBaseController {
Map<String, Long> oneEntry = Maps.newHashMap();
String dbName = request.getParameter(DB_KEY);
- ConcurrentHashMap<String, Database> fullNameToDb = Catalog.getCurrentCatalog().getFullNameToDb();
long totalSize = 0;
if (dbName != null) {
String fullDbName = getFullDbName(dbName);
- Database db = fullNameToDb.get(fullDbName);
+ Database db = Catalog.getCurrentCatalog().getDbNullable(fullDbName);
if (db == null) {
return ResponseEntityBuilder.okWithCommonError("database " + fullDbName + " not found.");
}
totalSize = getDataSizeOfDatabase(db);
oneEntry.put(fullDbName, totalSize);
} else {
- for (Database db : fullNameToDb.values()) {
- if (db.isInfoSchemaDb()) {
+ for (long dbId : Catalog.getCurrentCatalog().getDbIds()) {
+ Database db = Catalog.getCurrentCatalog().getDbNullable(dbId);
+ if (db == null && db.isInfoSchemaDb()) {
continue;
}
totalSize += getDataSizeOfDatabase(db);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index 6fb0abbf4f..bffcc9c4f5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -41,7 +41,6 @@ public class FakeEditLog extends MockUp<EditLog> {
@Mock
public void $init(String nodeName) { // CHECKSTYLE IGNORE THIS LINE
// do nothing
- System.out.println("abc");
}
@Mock
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org