You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/09/24 11:25:50 UTC
[incubator-doris] 02/05: fix
This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch writeLock
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit e1c4b9ef919edf57d802b3356790048f2ca8bb3b
Author: caiconghui <ca...@163.com>
AuthorDate: Mon Sep 13 20:07:06 2021 +0800
fix
---
.../main/java/org/apache/doris/alter/Alter.java | 75 ++++-----
.../java/org/apache/doris/alter/AlterHandler.java | 2 +-
.../java/org/apache/doris/alter/AlterJobV2.java | 2 +-
.../doris/alter/MaterializedViewHandler.java | 10 +-
.../java/org/apache/doris/alter/RollupJob.java | 5 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 4 +-
.../apache/doris/alter/SchemaChangeHandler.java | 8 +-
.../org/apache/doris/alter/SchemaChangeJob.java | 15 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 4 +-
.../java/org/apache/doris/backup/RestoreJob.java | 22 ++-
.../java/org/apache/doris/catalog/Catalog.java | 171 +++++++++++++--------
.../apache/doris/catalog/CatalogRecycleBin.java | 1 +
.../java/org/apache/doris/catalog/Database.java | 52 ++++---
.../main/java/org/apache/doris/catalog/Table.java | 33 ++--
.../org/apache/doris/catalog/TabletStatMgr.java | 5 +-
.../doris/clone/DynamicPartitionScheduler.java | 4 +-
.../org/apache/doris/clone/TabletSchedCtx.java | 10 +-
.../org/apache/doris/clone/TabletScheduler.java | 9 +-
.../apache/doris/common/util/MetaLockUtils.java | 18 ++-
.../doris/consistency/CheckConsistencyJob.java | 8 +-
.../org/apache/doris/http/rest/RowCountAction.java | 2 +-
.../doris/http/rest/TableRowCountAction.java | 4 +-
.../apache/doris/journal/bdbje/BDBJEJournal.java | 7 +-
.../java/org/apache/doris/load/LoadChecker.java | 7 +-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +-
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 2 +-
.../java/org/apache/doris/master/MasterImpl.java | 6 +-
.../org/apache/doris/master/ReportHandler.java | 11 +-
.../doris/transaction/DatabaseTransactionMgr.java | 2 +-
.../org/apache/doris/catalog/InfoSchemaDbTest.java | 3 +-
.../doris/common/util/MetaLockUtilsTest.java | 3 +-
31 files changed, 304 insertions(+), 203 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index bf1a0c8..542acdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -148,7 +148,7 @@ public class Alter {
} else if (currentAlterOps.hasPartitionOp()) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
if (alterClause instanceof DropPartitionClause) {
if (!((DropPartitionClause) alterClause).isTempPartition()) {
@@ -207,7 +207,7 @@ public class Alter {
private void processModifyTableComment(Database db, OlapTable tbl, AlterClause alterClause)
throws DdlException {
- tbl.writeLock();
+ tbl.writeLockOrDdlException();
try {
ModifyTableCommentClause clause = (ModifyTableCommentClause) alterClause;
tbl.setComment(clause.getComment());
@@ -221,7 +221,7 @@ public class Alter {
private void processModifyColumnComment(Database db, OlapTable tbl, List<AlterClause> alterClauses)
throws DdlException {
- tbl.writeLock();
+ tbl.writeLockOrDdlException();
try {
// check first
Map<String, String> colToComment = Maps.newHashMap();
@@ -334,7 +334,7 @@ public class Alter {
((SchemaChangeHandler) schemaChangeHandler).updatePartitionsInMemoryMeta(
db, tableName, partitionNames, properties);
OlapTable olapTable = (OlapTable) table;
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
} finally {
@@ -356,27 +356,29 @@ public class Alter {
ReplaceTableClause clause = (ReplaceTableClause) alterClauses.get(0);
String newTblName = clause.getTblName();
boolean swapTable = clause.isSwapTable();
- Table newTbl = db.getTableOrMetaException(newTblName, TableType.OLAP);
- OlapTable olapNewTbl = (OlapTable) newTbl;
- db.writeLock();
- origTable.writeLock();
+ db.writeLockOrDdlException();
try {
- String oldTblName = origTable.getName();
- // First, we need to check whether the table to be operated on can be renamed
- olapNewTbl.checkAndSetName(oldTblName, true);
- if (swapTable) {
- origTable.checkAndSetName(newTblName, true);
+ Table newTbl = db.getTableOrMetaException(newTblName, TableType.OLAP);
+ OlapTable olapNewTbl = (OlapTable) newTbl;
+ origTable.writeLock();
+ try {
+ String oldTblName = origTable.getName();
+ // First, we need to check whether the table to be operated on can be renamed
+ olapNewTbl.checkAndSetName(oldTblName, true);
+ if (swapTable) {
+ origTable.checkAndSetName(newTblName, true);
+ }
+ replaceTableInternal(db, origTable, olapNewTbl, swapTable, false);
+ // write edit log
+ ReplaceTableOperationLog log = new ReplaceTableOperationLog(db.getId(), origTable.getId(), olapNewTbl.getId(), swapTable);
+ Catalog.getCurrentCatalog().getEditLog().logReplaceTable(log);
+ LOG.info("finish replacing table {} with table {}, is swap: {}", oldTblName, newTblName, swapTable);
+ } finally {
+ origTable.writeUnlock();
}
- replaceTableInternal(db, origTable, olapNewTbl, swapTable, false);
- // write edit log
- ReplaceTableOperationLog log = new ReplaceTableOperationLog(db.getId(), origTable.getId(), olapNewTbl.getId(), swapTable);
- Catalog.getCurrentCatalog().getEditLog().logReplaceTable(log);
- LOG.info("finish replacing table {} with table {}, is swap: {}", oldTblName, newTblName, swapTable);
} finally {
- origTable.writeUnlock();
db.writeUnlock();
}
-
}
public void replayReplaceTable(ReplaceTableOperationLog log) throws MetaNotFoundException {
@@ -446,25 +448,28 @@ public class Alter {
}
private void modifyViewDef(Database db, View view, String inlineViewDef, long sqlMode, List<Column> newFullSchema) throws DdlException {
- db.writeLock();
- view.writeLock();
+ db.writeLockOrDdlException();
try {
- view.setInlineViewDefWithSqlMode(inlineViewDef, sqlMode);
+ view.writeLockOrDdlException();
try {
- view.init();
- } catch (UserException e) {
- throw new DdlException("failed to init view stmt", e);
+ view.setInlineViewDefWithSqlMode(inlineViewDef, sqlMode);
+ try {
+ view.init();
+ } catch (UserException e) {
+ throw new DdlException("failed to init view stmt", e);
+ }
+ view.setNewFullSchema(newFullSchema);
+ String viewName = view.getName();
+ db.dropTable(viewName);
+ db.createTable(view);
+
+ AlterViewInfo alterViewInfo = new AlterViewInfo(db.getId(), view.getId(), inlineViewDef, newFullSchema, sqlMode);
+ Catalog.getCurrentCatalog().getEditLog().logModifyViewDef(alterViewInfo);
+ LOG.info("modify view[{}] definition to {}", viewName, inlineViewDef);
+ } finally {
+ view.writeUnlock();
}
- view.setNewFullSchema(newFullSchema);
- String viewName = view.getName();
- db.dropTable(viewName);
- db.createTable(view);
-
- AlterViewInfo alterViewInfo = new AlterViewInfo(db.getId(), view.getId(), inlineViewDef, newFullSchema, sqlMode);
- Catalog.getCurrentCatalog().getEditLog().logModifyViewDef(alterViewInfo);
- LOG.info("modify view[{}] definition to {}", viewName, inlineViewDef);
} finally {
- view.writeUnlock();
db.writeUnlock();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
index 2f825b2..9aefdbb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -420,7 +420,7 @@ public abstract class AlterHandler extends MasterDaemon {
Database db = Catalog.getCurrentCatalog().getDbOrMetaException(task.getDbId());
OlapTable tbl = db.getTableOrMetaException(task.getTableId(), Table.TableType.OLAP);
- tbl.writeLock();
+ tbl.writeLockOrMetaException();
try {
Partition partition = tbl.getPartition(task.getPartitionId());
if (partition == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index 6a31be1..c8c8815 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -194,7 +194,7 @@ public abstract class AlterJobV2 implements Writable {
throw new AlterCancelException(e.getMessage());
}
- tbl.writeLock();
+ tbl.writeLockOrAlterCancelException();
try {
boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentCatalog().getTabletScheduler(), db.getClusterName());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index 66c1c53..ccd5219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -174,7 +174,7 @@ public class MaterializedViewHandler extends AlterHandler {
*/
public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
olapTable.checkStableAndNormal(db.getClusterName());
if (olapTable.existTempPartitions()) {
@@ -229,7 +229,7 @@ public class MaterializedViewHandler extends AlterHandler {
Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
// save job id for log
Set<Long> logJobIdSet = new HashSet<>();
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
@@ -708,7 +708,7 @@ public class MaterializedViewHandler extends AlterHandler {
public void processBatchDropRollup(List<AlterClause> dropRollupClauses, Database db, OlapTable olapTable)
throws DdlException, MetaNotFoundException {
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
@@ -744,7 +744,7 @@ public class MaterializedViewHandler extends AlterHandler {
public void processDropMaterializedView(DropMaterializedViewStmt dropMaterializedViewStmt, Database db,
OlapTable olapTable) throws DdlException, MetaNotFoundException {
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
// check table state
if (olapTable.getState() != OlapTableState.NORMAL) {
@@ -881,7 +881,7 @@ public class MaterializedViewHandler extends AlterHandler {
try {
Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP);
- olapTable.writeLock();
+ olapTable.writeLockOrMetaException();
try {
if (olapTable.getState() == olapTableState) {
return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJob.java
index 9b83815..1ab6cd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJob.java
@@ -643,7 +643,10 @@ public class RollupJob extends AlterJob {
return -1;
}
- olapTable.writeLock();
+ if (!olapTable.writeLockIfExist()) {
+ LOG.warn("unknown table, tableName=" + olapTable.getName());
+ return -1;
+ }
try {
// if all previous transaction has finished, then check base and rollup replica num
synchronized (this) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 28fbd79..e074b3a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -281,7 +281,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
// create all rollup replicas success.
// add rollup index to catalog
- tbl.writeLock();
+ tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
addRollupIndexToCatalog(tbl);
@@ -427,7 +427,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
* all tasks are finished. check the integrity.
* we just check whether all rollup replicas are healthy.
*/
- tbl.writeLock();
+ tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 27bfa69..465c333 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1611,7 +1611,7 @@ public class SchemaChangeHandler extends AlterHandler {
@Override
public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable)
throws UserException {
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
// index id -> index schema
Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>();
@@ -1709,7 +1709,7 @@ public class SchemaChangeHandler extends AlterHandler {
@Override
public void processExternalTable(List<AlterClause> alterClauses, Database db, Table externalTable)
throws UserException {
- externalTable.writeLock();
+ externalTable.writeLockOrDdlException();
try {
// copy the external table schema columns
List<Column> newSchema = Lists.newArrayList();
@@ -1790,7 +1790,7 @@ public class SchemaChangeHandler extends AlterHandler {
updatePartitionInMemoryMeta(db, olapTable.getName(), partition.getName(), isInMemory);
}
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
Catalog.getCurrentCatalog().modifyTableInMemoryMeta(db, olapTable, properties);
} finally {
@@ -1918,7 +1918,7 @@ public class SchemaChangeHandler extends AlterHandler {
AlterJobV2 schemaChangeJobV2 = null;
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
if (olapTable.getState() != OlapTableState.SCHEMA_CHANGE &&
olapTable.getState() != OlapTableState.WAITING_STABLE) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJob.java
index f781d65..f2df913 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJob.java
@@ -584,15 +584,9 @@ public class SchemaChangeJob extends AlterJob {
long replicaId = schemaChangeTask.getReplicaId();
// update replica's info
- OlapTable olapTable;
- try {
- Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
- olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP);
- } catch (MetaNotFoundException e) {
- LOG.warn(e.getMessage());
- return;
- }
- olapTable.writeLock();
+ Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
+ OlapTable olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP);
+ olapTable.writeLockOrMetaException();
try {
Preconditions.checkState(olapTable.getState() == OlapTableState.SCHEMA_CHANGE);
@@ -668,12 +662,11 @@ public class SchemaChangeJob extends AlterJob {
OlapTable olapTable;
try {
olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP);
+ olapTable.writeLockOrMetaException();
} catch (MetaNotFoundException e) {
LOG.warn(e.getMessage());
return -1;
}
-
- olapTable.writeLock();
try {
synchronized (this) {
boolean hasUnfinishedPartition = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 1ad1493..2d77da2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -307,7 +307,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// create all replicas success.
// add all shadow indexes to catalog
- tbl.writeLock();
+ tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
addShadowIndexToCatalog(tbl);
@@ -465,7 +465,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
* all tasks are finished. check the integrity.
* we just check whether all new replicas are healthy.
*/
- tbl.writeLock();
+ tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 8e92395..7614200 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -445,7 +445,9 @@ public class RestoreJob extends AbstractJob {
}
OlapTable olapTbl = (OlapTable) tbl;
- olapTbl.writeLock();
+ if (!olapTbl.writeLockIfExist()) {
+ continue;
+ }
try {
if (olapTbl.getState() != OlapTableState.NORMAL) {
status = new Status(ErrCode.COMMON_ERROR,
@@ -783,7 +785,11 @@ public class RestoreJob extends AbstractJob {
// add restored tables
for (Table tbl : restoredTbls) {
- db.writeLock();
+ if (!db.writeLockIfExist()) {
+ status = new Status(ErrCode.COMMON_ERROR, "Database " + db.getFullName()
+ + " has been dropped");
+ return;
+ }
try {
if (!db.createTable(tbl)) {
status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName()
@@ -1387,7 +1393,9 @@ public class RestoreJob extends AbstractJob {
continue;
}
OlapTable olapTbl = (OlapTable) tbl;
- tbl.writeLock();
+ if (!tbl.writeLockIfExist()) {
+ continue;
+ }
try {
Map<Long, Pair<Long, Long>> map = restoredVersionInfo.rowMap().get(tblId);
for (Map.Entry<Long, Pair<Long, Long>> entry : map.entrySet()) {
@@ -1576,7 +1584,9 @@ public class RestoreJob extends AbstractJob {
}
LOG.info("remove restored partition in table {} when cancelled: {}",
restoreTbl.getName(), entry.second.getName());
- restoreTbl.writeLock();
+ if (!restoreTbl.writeLockIfExist()) {
+ continue;
+ }
try {
restoreTbl.dropPartition(dbId, entry.second.getName(), true /* force drop */);
} finally {
@@ -1625,7 +1635,9 @@ public class RestoreJob extends AbstractJob {
}
OlapTable olapTbl = (OlapTable) tbl;
- tbl.writeLock();
+ if (!tbl.writeLockIfExist()) {
+ continue;
+ }
try {
if (olapTbl.getState() == OlapTableState.RESTORE
|| olapTbl.getState() == OlapTableState.RESTORE_WITH_LOAD) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index e12b8a2..ca8fbfd 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -2734,7 +2734,26 @@ public class Catalog {
// save table names for recycling
Set<String> tableNames = db.getTableNamesWithLock();
- unprotectDropDb(db, stmt.isForceDrop(), false);
+ List<Table> tableList = db.getTables();
+ MetaLockUtils.writeLockTables(tableList);
+ try {
+ if (!stmt.isForceDrop()) {
+ for (Table table : tableList) {
+ if (table.getType() == TableType.OLAP) {
+ OlapTable olapTable = (OlapTable) table;
+ if (olapTable.getState() != OlapTableState.NORMAL) {
+ throw new DdlException("The table [" + olapTable.getState() + "]'s state is " + olapTable.getState() + ", cannot be dropped." +
+ " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered)," +
+ " please use \"DROP table FORCE\".");
+ }
+ }
+ }
+ }
+ unprotectDropDb(db, tableList, stmt.isForceDrop(), false);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+
if (!stmt.isForceDrop()) {
Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
} else {
@@ -2758,15 +2777,11 @@ public class Catalog {
LOG.info("finish drop database[{}], is force : {}", dbName, stmt.isForceDrop());
}
- public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay) {
- for (Table table : db.getTables()) {
- table.writeLock();
- try {
- unprotectDropTable(db, table, isForeDrop, isReplay);
- } finally {
- table.writeUnlock();
- }
+ public void unprotectDropDb(Database db, List<Table> tableList, boolean isForeDrop, boolean isReplay) {
+ for (Table table : tableList) {
+ unprotectDropTable(db, table, isForeDrop, isReplay);
}
+ db.markDropped();
}
public void replayDropLinkDb(DropLinkDbAndUpdateDbInfo info) {
@@ -2792,7 +2807,13 @@ public class Catalog {
db.writeLock();
try {
Set<String> tableNames = db.getTableNamesWithLock();
- unprotectDropDb(db, isForceDrop, true);
+ List<Table> tableList = db.getTables();
+ MetaLockUtils.writeLockTables(tableList);
+ try {
+ unprotectDropDb(db, tableList, isForceDrop, true);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
if (!isForceDrop) {
Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
} else {
@@ -2838,6 +2859,10 @@ public class Catalog {
// log
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L);
editLog.logRecoverDb(recoverInfo);
+ for (Table table : db.getTables()) {
+ table.unmarkDropped();
+ }
+ db.unmarkDropped();
} finally {
unlock();
}
@@ -2850,7 +2875,7 @@ public class Catalog {
String tableName = recoverStmt.getTableName();
Database db = this.getDbOrDdlException(dbName);
- db.writeLock();
+ db.writeLockOrDdlException();
try {
if (db.getTable(tableName).isPresent()) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
@@ -2869,7 +2894,7 @@ public class Catalog {
Database db = this.getDbOrDdlException(dbName);
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
String partitionName = recoverStmt.getPartitionName();
if (olapTable.getPartition(partitionName) != null) {
@@ -2899,24 +2924,33 @@ public class Catalog {
public void alterDatabaseQuota(AlterDatabaseQuotaStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
Database db = this.getDbOrDdlException(dbName);
-
QuotaType quotaType = stmt.getQuotaType();
- if (quotaType == QuotaType.DATA) {
- db.setDataQuotaWithLock(stmt.getQuota());
- } else if (quotaType == QuotaType.REPLICA) {
- db.setReplicaQuotaWithLock(stmt.getQuota());
+ db.writeLockOrDdlException();
+ try {
+ if (quotaType == QuotaType.DATA) {
+ db.setDataQuota(stmt.getQuota());
+ } else if (quotaType == QuotaType.REPLICA) {
+ db.setReplicaQuota(stmt.getQuota());
+ }
+ long quota = stmt.getQuota();
+ DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType);
+ editLog.logAlterDb(dbInfo);
+ } finally {
+ db.writeUnlock();
}
- long quota = stmt.getQuota();
- DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType);
- editLog.logAlterDb(dbInfo);
}
public void replayAlterDatabaseQuota(String dbName, long quota, QuotaType quotaType) throws MetaNotFoundException {
Database db = this.getDbOrMetaException(dbName);
- if (quotaType == QuotaType.DATA) {
- db.setDataQuotaWithLock(quota);
- } else if (quotaType == QuotaType.REPLICA) {
- db.setReplicaQuotaWithLock(quota);
+ db.writeLock();
+ try {
+ if (quotaType == QuotaType.DATA) {
+ db.setDataQuota(quota);
+ } else if (quotaType == QuotaType.REPLICA) {
+ db.setReplicaQuota(quota);
+ }
+ } finally {
+ db.writeUnlock();
}
}
@@ -3268,8 +3302,7 @@ public class Catalog {
// check again
table = db.getOlapTableOrDdlException(tableName);
-
- table.writeLock();
+ table.writeLockOrDdlException();
try {
olapTable = (OlapTable) table;
if (olapTable.getState() != OlapTableState.NORMAL) {
@@ -4378,10 +4411,13 @@ public class Catalog {
}
}
- public void replayCreateTable(String dbName, Table table) {
+ public void replayCreateTable(String dbName, Table table) throws MetaNotFoundException {
Database db = this.fullNameToDb.get(dbName);
- db.createTableWithLock(table, true, false);
-
+ try {
+ db.createTableWithLock(table, true, false);
+ } catch (DdlException e) {
+ throw new MetaNotFoundException(e.getMessage());
+ }
if (!isCheckpointThread()) {
// add to inverted index
if (table.getType() == TableType.OLAP) {
@@ -4508,7 +4544,7 @@ public class Catalog {
// check database
Database db = this.getDbOrDdlException(dbName);
- db.writeLock();
+ db.writeLockOrDdlException();
try {
Table table = db.getTableNullable(tableName);
if (table == null) {
@@ -4569,6 +4605,7 @@ public class Catalog {
}
db.dropTable(table.getName());
+ table.markDropped();
if (!isForceDrop) {
Catalog.getCurrentRecycleBin().recycleTable(db.getId(), table);
} else {
@@ -4906,7 +4943,7 @@ public class Catalog {
OlapTable olapTable = (OlapTable) table;
// use try lock to avoid blocking a long time.
// if block too long, backend report rpc will timeout.
- if (!olapTable.tryWriteLock(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ if (!olapTable.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
LOG.warn("try get table {} writelock but failed when checking backend storage medium", table.getName());
continue;
}
@@ -5254,42 +5291,45 @@ public class Catalog {
// entry of rename table operation
public void renameTable(Database db, Table table, TableRenameClause tableRenameClause) throws DdlException {
- db.writeLock();
- table.writeLock();
+ db.writeLockOrDdlException();
try {
- if (table instanceof OlapTable) {
- OlapTable olapTable = (OlapTable) table;
- if (olapTable.getState() != OlapTableState.NORMAL) {
- throw new DdlException("Table[" + olapTable.getName() + "] is under " + olapTable.getState());
+ table.writeLockOrDdlException();
+ try {
+ if (table instanceof OlapTable) {
+ OlapTable olapTable = (OlapTable) table;
+ if (olapTable.getState() != OlapTableState.NORMAL) {
+ throw new DdlException("Table[" + olapTable.getName() + "] is under " + olapTable.getState());
+ }
}
- }
- String oldTableName = table.getName();
- String newTableName = tableRenameClause.getNewTableName();
- if (oldTableName.equals(newTableName)) {
- throw new DdlException("Same table name");
- }
+ String oldTableName = table.getName();
+ String newTableName = tableRenameClause.getNewTableName();
+ if (oldTableName.equals(newTableName)) {
+ throw new DdlException("Same table name");
+ }
- // check if name is already used
- if (db.getTable(newTableName).isPresent()) {
- throw new DdlException("Table name[" + newTableName + "] is already used");
- }
+ // check if name is already used
+ if (db.getTable(newTableName).isPresent()) {
+ throw new DdlException("Table name[" + newTableName + "] is already used");
+ }
- if (table.getType() == TableType.OLAP) {
- // olap table should also check if any rollup has same name as "newTableName"
- ((OlapTable) table).checkAndSetName(newTableName, false);
- } else {
- table.setName(newTableName);
- }
+ if (table.getType() == TableType.OLAP) {
+ // olap table should also check if any rollup has same name as "newTableName"
+ ((OlapTable) table).checkAndSetName(newTableName, false);
+ } else {
+ table.setName(newTableName);
+ }
- db.dropTable(oldTableName);
- db.createTable(table);
+ db.dropTable(oldTableName);
+ db.createTable(table);
- TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName);
- editLog.logTableRename(tableInfo);
- LOG.info("rename table[{}] to {}", oldTableName, newTableName);
+ TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName);
+ editLog.logTableRename(tableInfo);
+ LOG.info("rename table[{}] to {}", oldTableName, newTableName);
+ } finally {
+ table.writeUnlock();
+ }
} finally {
- table.writeUnlock();
db.writeUnlock();
}
}
@@ -5424,7 +5464,7 @@ public class Catalog {
}
public void renameRollup(Database db, OlapTable table, RollupRenameClause renameClause) throws DdlException {
- table.writeLock();
+ table.writeLockOrDdlException();
try {
if (table.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + table.getName() + "] is under " + table.getState());
@@ -5485,7 +5525,7 @@ public class Catalog {
}
public void renamePartition(Database db, OlapTable table, PartitionRenameClause renameClause) throws DdlException {
- table.writeLock();
+ table.writeLockOrDdlException();
try {
if (table.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + table.getName() + "] is under " + table.getState());
@@ -5696,8 +5736,7 @@ public class Catalog {
}
public void modifyDefaultDistributionBucketNum(Database db, OlapTable olapTable, ModifyDistributionClause modifyDistributionClause) throws DdlException {
- olapTable.writeLock();
-
+ olapTable.writeLockOrDdlException();
try {
if (olapTable.isColocateTable()) {
throw new DdlException("Cannot change default bucket number of colocate table.");
@@ -6723,7 +6762,7 @@ public class Catalog {
// before replacing, we need to check again.
// Things may be changed outside the table lock.
olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId());
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
if (olapTable.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table' state is not NORMAL: " + olapTable.getState());
@@ -6925,7 +6964,7 @@ public class Catalog {
// Convert table's distribution type from random to hash.
// random distribution is no longer supported.
public void convertDistributionType(Database db, OlapTable tbl) throws DdlException {
- tbl.writeLock();
+ tbl.writeLockOrDdlException();
try {
if (!tbl.convertRandomDistributionToHashDistribution()) {
throw new DdlException("Table " + tbl.getName() + " is not random distributed");
@@ -7071,7 +7110,7 @@ public class Catalog {
}
Database db = this.getDbOrMetaException(meta.getDbId());
Table table = db.getTableOrMetaException(meta.getTableId());
- table.writeLock();
+ table.writeLockOrMetaException();
try {
Replica replica = tabletInvertedIndex.getReplica(tabletId, backendId);
if (replica == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index 5d5e903..af414ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -376,6 +376,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), -1L);
Catalog.getCurrentCatalog().getEditLog().logRecoverTable(recoverInfo);
LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
+ table.unmarkDropped();
return true;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 621703f..494ca89 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -100,6 +100,8 @@ public class Database extends MetaObject implements Writable {
private volatile long replicaQuotaSize;
+ private volatile boolean isDropped;
+
public enum DbState {
NORMAL, LINK, MOVE
}
@@ -131,6 +133,14 @@ public class Database extends MetaObject implements Writable {
this.dbEncryptKey = new DatabaseEncryptKey();
}
+ public void markDropped() {
+ isDropped = true;
+ }
+
+ public void unmarkDropped() {
+ isDropped = false;
+ }
+
public void readLock() {
this.rwLock.readLock().lock();
}
@@ -156,10 +166,26 @@ public class Database extends MetaObject implements Writable {
}
}
- public boolean isWriteLockHeldByCurrentThread() {
- return this.rwLock.writeLock().isHeldByCurrentThread();
+ public boolean writeLockIfExist() {
+ if (!isDropped) {
+ this.rwLock.writeLock().lock();
+ return true;
+ }
+ return false;
+ }
+
+ public <E extends Exception> void writeLockOrException(E e) throws E {
+ writeLock();
+ if (isDropped) {
+ writeUnlock();
+ throw e;
+ }
}
+ public void writeLockOrDdlException() throws DdlException {
+ writeLockOrException(new DdlException("unknown db, dbName=" + fullQualifiedName));
+ }
+
public long getId() {
return id;
}
@@ -177,26 +203,16 @@ public class Database extends MetaObject implements Writable {
}
}
- public void setDataQuotaWithLock(long newQuota) {
+ public void setDataQuota(long newQuota) {
Preconditions.checkArgument(newQuota >= 0L);
LOG.info("database[{}] set quota from {} to {}", fullQualifiedName, dataQuotaBytes, newQuota);
- writeLock();
- try {
- this.dataQuotaBytes = newQuota;
- } finally {
- writeUnlock();
- }
+ this.dataQuotaBytes = newQuota;
}
- public void setReplicaQuotaWithLock(long newQuota) {
+ public void setReplicaQuota(long newQuota) {
Preconditions.checkArgument(newQuota >= 0L);
LOG.info("database[{}] set replica quota from {} to {}", fullQualifiedName, replicaQuotaSize, newQuota);
- writeLock();
- try {
- this.replicaQuotaSize = newQuota;
- } finally {
- writeUnlock();
- }
+ this.replicaQuotaSize = newQuota;
}
public long getDataQuota() {
@@ -303,12 +319,12 @@ public class Database extends MetaObject implements Writable {
}
// return pair <success?, table exist?>
- public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
+ public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) throws DdlException {
boolean result = true;
// if a table is already exists, then edit log won't be executed
// some caller of this method may need to know this message
boolean isTableExist = false;
- writeLock();
+ writeLockOrDdlException();
try {
String tableName = table.getName();
if (Catalog.isStoredTableNamesLowerCase()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index c2b349f..cf5d577 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -17,8 +17,8 @@
package org.apache.doris.catalog;
+import org.apache.doris.alter.AlterCancelException;
import org.apache.doris.analysis.CreateTableStmt;
-import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
@@ -139,6 +139,14 @@ public class Table extends MetaObject implements Writable {
this.createTime = Instant.now().getEpochSecond();
}
+ public void markDropped() {
+ isDropped = true;
+ }
+
+ public void unmarkDropped() {
+ isDropped = false;
+ }
+
public void readLock() {
this.rwLock.readLock().lock();
}
@@ -201,22 +209,14 @@ public class Table extends MetaObject implements Writable {
writeLockOrException(new MetaNotFoundException("unknown table, tableName=" + name));
}
- public void writeLockOrAnalysisException() throws AnalysisException {
- writeLockOrException(new AnalysisException("unknown table, tableName=" + name));
- }
-
- public boolean tryWriteLockOrDdlException(long timeout, TimeUnit unit) throws DdlException {
- return tryWriteLockOrException(timeout, unit, new DdlException("unknown table, tableName=" + name));
+ public void writeLockOrAlterCancelException() throws AlterCancelException {
+ writeLockOrException(new AlterCancelException("unknown table, tableName=" + name));
}
public boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException {
return tryWriteLockOrException(timeout, unit, new MetaNotFoundException("unknown table, tableName=" + name));
}
- public boolean tryWriteLockOrAnalysisException(long timeout, TimeUnit unit) throws AnalysisException {
- return tryWriteLockOrException(timeout, unit, new AnalysisException("unknown table, tableName=" + name));
- }
-
public <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E {
if (tryWriteLock(timeout, unit)) {
if (isDropped) {
@@ -228,6 +228,17 @@ public class Table extends MetaObject implements Writable {
return false;
}
+ public boolean tryWriteLockIfExist(long timeout, TimeUnit unit) {
+ if (tryWriteLock(timeout, unit)) {
+ if (isDropped) {
+ writeUnlock();
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
public boolean isTypeRead() {
return isTypeRead;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index ed02c39..9755822 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -91,9 +91,10 @@ public class TabletStatMgr extends MasterDaemon {
if (table.getType() != TableType.OLAP) {
continue;
}
-
OlapTable olapTable = (OlapTable) table;
- table.writeLock();
+ if (!table.writeLockIfExist()) {
+ continue;
+ }
try {
for (Partition partition : olapTable.getAllPartitions()) {
long version = partition.getVisibleVersion();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 62905a7..5b43573 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -363,7 +363,9 @@ public class DynamicPartitionScheduler extends MasterDaemon {
}
for (DropPartitionClause dropPartitionClause : dropPartitionClauses) {
- olapTable.writeLock();
+ if (!olapTable.writeLockIfExist()) {
+ continue;
+ }
try {
Catalog.getCurrentCatalog().dropPartition(db, olapTable, dropPartitionClause);
clearDropPartitionFailedMsg(olapTable.getId());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 18e66b1..391b929 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -685,8 +685,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
Database db = Catalog.getCurrentCatalog().getDbNullable(dbId);
if (db != null) {
Table table = db.getTableNullable(tblId);
- if (table != null) {
- table.writeLock();
+ if (table != null && table.writeLockIfExist()) {
try {
List<Replica> cloneReplicas = Lists.newArrayList();
tablet.getReplicas().stream().filter(r -> r.getState() == ReplicaState.CLONE).forEach(r -> {
@@ -840,10 +839,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
// 1. check the tablet status first
- Database db = Catalog.getCurrentCatalog().getDbOrException(dbId, s -> new SchedException(Status.UNRECOVERABLE, "db does not exist"));
- OlapTable olapTable = (OlapTable) db.getTableOrException(tblId, s -> new SchedException(Status.UNRECOVERABLE, "tbl does not exist"));
-
- olapTable.writeLock();
+ Database db = Catalog.getCurrentCatalog().getDbOrException(dbId, s -> new SchedException(Status.UNRECOVERABLE, "db " + dbId + " does not exist"));
+ OlapTable olapTable = (OlapTable) db.getTableOrException(tblId, s -> new SchedException(Status.UNRECOVERABLE, "tbl " + tabletId + " does not exist"));
+ olapTable.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table " + olapTable.getName() + " does not exist"));
try {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 907c917..f591112 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -460,10 +460,11 @@ public class TabletScheduler extends MasterDaemon {
stat.counterTabletScheduled.incrementAndGet();
Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
- // check this tablet again
- Database db = catalog.getDbOrException(tabletCtx.getDbId(), s -> new SchedException(Status.UNRECOVERABLE, "db does not exist"));
- OlapTable tbl = (OlapTable) db.getTableOrException(tabletCtx.getTblId(), s -> new SchedException(Status.UNRECOVERABLE, "tbl does not exist"));
- tbl.writeLock();
+ Database db = Catalog.getCurrentCatalog().getDbOrException(tabletCtx.getDbId(),
+ s -> new SchedException(Status.UNRECOVERABLE, "db " + tabletCtx.getDbId() + " does not exist"));
+ OlapTable tbl = (OlapTable) db.getTableOrException(tabletCtx.getTblId(),
+ s -> new SchedException(Status.UNRECOVERABLE, "tbl " + tabletCtx.getTblId() + " does not exist"));
+ tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table " + tbl.getName() + " does not exist"));
try {
boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
index ed0e06a..de5a3c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
@@ -19,6 +19,7 @@ package org.apache.doris.common.util;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
+import org.apache.doris.common.MetaNotFoundException;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -60,9 +61,22 @@ public class MetaLockUtils {
}
}
- public static boolean tryWriteLockTables(List<Table> tableList, long timeout, TimeUnit unit) {
+ public static void writeLockTablesOrMetaException(List<Table> tableList) throws MetaNotFoundException {
for (int i = 0; i < tableList.size(); i++) {
- if (!tableList.get(i).tryWriteLock(timeout, unit)) {
+ try {
+ tableList.get(i).writeLockOrMetaException();
+ } catch (MetaNotFoundException e) {
+ for (int j = i - 1; j >= 0; j--) {
+ tableList.get(j).writeUnlock();
+ }
+ throw e;
+ }
+ }
+ }
+
+ public static boolean tryWriteLockTables(List<Table> tableList, long timeout, TimeUnit unit) throws MetaNotFoundException {
+ for (int i = 0; i < tableList.size(); i++) {
+ if (!tableList.get(i).tryWriteLockOrMetaException(timeout, unit)) {
for (int j = i - 1; j >= 0; j--) {
tableList.get(j).writeUnlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java b/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java
index d0cbb49..4d32786 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java
@@ -215,7 +215,10 @@ public class CheckConsistencyJob {
if (state != JobState.RUNNING) {
// failed to send task. set tablet's checked version and version hash to avoid choosing it again
- table.writeLock();
+ if (!table.writeLockIfExist()) {
+ LOG.debug("table[{}] does not exist", tabletMeta.getTableId());
+ return false;
+ }
try {
tablet.setCheckedVersion(checkedVersion, checkedVersionHash);
} finally {
@@ -261,11 +264,10 @@ public class CheckConsistencyJob {
boolean isConsistent = true;
Table table = db.getTableNullable(tabletMeta.getTableId());
- if (table == null) {
+ if (table == null || !table.writeLockIfExist()) {
LOG.warn("table[{}] does not exist", tabletMeta.getTableId());
return -1;
}
- table.writeLock();
try {
OlapTable olapTable = (OlapTable) table;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/RowCountAction.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/RowCountAction.java
index e134fc3..4648065 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/RowCountAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/RowCountAction.java
@@ -74,7 +74,7 @@ public class RowCountAction extends RestBaseAction {
Catalog catalog = Catalog.getCurrentCatalog();
Database db = catalog.getDbOrDdlException(dbName);
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
- olapTable.writeLock();
+ olapTable.writeLockOrDdlException();
try {
for (Partition partition : olapTable.getAllPartitions()) {
long version = partition.getVisibleVersion();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/TableRowCountAction.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/TableRowCountAction.java
index 3b75547..1b4a8ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/TableRowCountAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/TableRowCountAction.java
@@ -84,13 +84,13 @@ public class TableRowCountAction extends RestBaseAction {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage());
}
- table.writeLock();
+ table.readLock();
try {
OlapTable olapTable = (OlapTable) table;
resultMap.put("status", 200);
resultMap.put("size", olapTable.proximateRowCount());
} finally {
- table.writeUnlock();
+ table.readUnlock();
}
} catch (DorisHttpException e) {
// status code should conforms to HTTP semantic
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 967caad..10e1b8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -155,8 +155,10 @@ public class BDBJEJournal implements Journal {
// Parameter null means auto commit
if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
writeSucceed = true;
- LOG.debug("master write journal {} finished. db name {}, current time {}",
- id, currentJournalDB.getDatabaseName(), System.currentTimeMillis());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("master write journal {} finished. db name {}, current time {}",
+ id, currentJournalDB.getDatabaseName(), System.currentTimeMillis());
+ }
break;
}
} catch (DatabaseException e) {
@@ -166,7 +168,6 @@ public class BDBJEJournal implements Journal {
} catch (InterruptedException e1) {
e1.printStackTrace();
}
- continue;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
index 32f36f6..2e3588a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -330,7 +330,12 @@ public class LoadChecker extends MasterDaemon {
// concurrent problems
// table in tables are ordered.
- MetaLockUtils.writeLockTables(tables);
+ try {
+ MetaLockUtils.writeLockTablesOrMetaException(tables);
+ } catch (UserException e) {
+ load.cancelLoadJob(job, CancelType.LOAD_RUN_FAIL, "table does not exist. dbId: " + job.getDbId() + ", err: " + e.getMessage());
+ return;
+ }
try {
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
for (Replica replica : job.getFinishedReplicas()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index afd153d..529293a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -275,6 +275,7 @@ public class BrokerLoadJob extends BulkLoadJob {
try {
db = getDb();
tableList = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(fileGroupAggInfo.getAllTableIds()));
+ MetaLockUtils.writeLockTablesOrMetaException(tableList);
} catch (MetaNotFoundException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
@@ -283,7 +284,6 @@ public class BrokerLoadJob extends BulkLoadJob {
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
return;
}
- MetaLockUtils.writeLockTables(tableList);
try {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
.add("txn_id", transactionId)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index cb38cbf..0667c1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -625,7 +625,7 @@ public class SparkLoadJob extends BulkLoadJob {
.build());
Database db = getDb();
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(tableToLoadPartitions.keySet()));
- MetaLockUtils.writeLockTables(tableList);
+ MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
Catalog.getCurrentGlobalTransactionMgr().commitTransaction(
dbId, tableList, transactionId, commitInfos,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 55de348..fdf971e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -338,12 +338,11 @@ public class MasterImpl {
LOG.debug("push report state: {}", pushState.name());
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
- if (olapTable == null) {
+ if (olapTable == null || !olapTable.writeLockIfExist()) {
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
LOG.warn("finish push replica error, cannot find table[" + tableId + "] when push finished");
return;
}
- olapTable.writeLock();
try {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
@@ -552,12 +551,11 @@ public class MasterImpl {
LOG.debug("push report state: {}", pushState.name());
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
- if (olapTable == null) {
+ if (olapTable == null || !olapTable.writeLockIfExist()) {
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
LOG.warn("finish push replica error, cannot find table[" + tableId + "] when push finished");
return;
}
- olapTable.writeLock();
try {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index df0e599..d676b88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -416,10 +416,9 @@ public class ReportHandler extends Daemon {
long tabletId = tabletIds.get(i);
long tableId = tabletMeta.getTableId();
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
- if (olapTable == null) {
+ if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}
- olapTable.writeLock();
try {
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
@@ -545,10 +544,9 @@ public class ReportHandler extends Daemon {
long tabletId = tabletIds.get(i);
long tableId = tabletMeta.getTableId();
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
- if (olapTable == null) {
+ if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}
- olapTable.writeLock();
try {
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
@@ -817,10 +815,9 @@ public class ReportHandler extends Daemon {
long tabletId = tabletIds.get(i);
long tableId = tabletMeta.getTableId();
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
- if (olapTable == null) {
+ if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}
- olapTable.writeLock();
try {
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
@@ -939,7 +936,7 @@ public class ReportHandler extends Daemon {
Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP);
- olapTable.writeLock();
+ olapTable.writeLockOrMetaException();
try {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index cdb73f4..a3a8ca6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -718,7 +718,7 @@ public class DatabaseTransactionMgr {
}
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
- MetaLockUtils.writeLockTables(tableList);
+ MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
boolean hasError = false;
Iterator<TableCommitInfo> tableCommitInfoIterator = transactionState.getIdToTableCommitInfos().values().iterator();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
index 0a808c9..422a7ac 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.catalog;
+import org.apache.doris.common.DdlException;
import org.junit.Assert;
import org.junit.Test;
@@ -24,7 +25,7 @@ import java.io.IOException;
public class InfoSchemaDbTest {
@Test
- public void testNormal() throws IOException {
+ public void testNormal() throws IOException, DdlException {
Database db = new InfoSchemaDb();
Assert.assertFalse(db.createTable(null));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/MetaLockUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/MetaLockUtilsTest.java
index 47fb1ff..7ce2d0a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/MetaLockUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/MetaLockUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.util;
import com.google.common.collect.Lists;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
+import org.apache.doris.common.MetaNotFoundException;
import org.junit.Assert;
import org.junit.Test;
@@ -55,7 +56,7 @@ public class MetaLockUtilsTest {
}
@Test
- public void testWriteLockTables() {
+ public void testWriteLockTables() throws MetaNotFoundException {
List<Table> tableList = Lists.newArrayList(new Table(Table.TableType.OLAP), new Table(Table.TableType.OLAP));
MetaLockUtils.writeLockTables(tableList);
Assert.assertTrue(tableList.get(0).isWriteLockHeldByCurrentThread());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org