You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/03/27 12:16:37 UTC

[incubator-doris] branch master updated: Support determine isPreviousLoadFinished for some alter jobs in table level (#3196)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c4fc6  Support determine isPreviousLoadFinished for some alter jobs in table level  (#3196)
32c4fc6 is described below

commit 32c4fc691cd79ccf47023ed02160485b1906579b
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Fri Mar 27 07:16:23 2020 -0500

    Support determine isPreviousLoadFinished for some alter jobs in table level  (#3196)
    
    This PR is to reduce the time cost for waiting transactions to be completed in same db by filter the running transactions in table level.
    
    NOTICE: Update FE meta version to 79
---
 .../main/java/org/apache/doris/alter/AlterJob.java |  3 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |  2 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  2 +-
 .../java/org/apache/doris/analysis/InsertStmt.java |  3 +-
 .../org/apache/doris/clone/TabletScheduler.java    |  2 +-
 .../org/apache/doris/common/FeMetaVersion.java     |  5 ++-
 fe/src/main/java/org/apache/doris/load/Load.java   |  3 +-
 .../main/java/org/apache/doris/load/LoadJob.java   |  6 +++
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  3 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |  1 -
 .../org/apache/doris/load/loadv2/LoadManager.java  |  4 +-
 .../org/apache/doris/load/loadv2/MiniLoadJob.java  |  8 +++-
 .../load/routineload/RoutineLoadTaskInfo.java      |  3 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  3 +-
 .../org/apache/doris/task/LoadPendingTask.java     |  4 +-
 .../doris/transaction/GlobalTransactionMgr.java    | 51 ++++++++++++++--------
 .../apache/doris/transaction/TransactionState.java | 27 +++++++++++-
 .../org/apache/doris/load/loadv2/LoadJobTest.java  |  3 +-
 .../transaction/GlobalTransactionMgrTest.java      | 24 +++++-----
 19 files changed, 108 insertions(+), 49 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJob.java b/fe/src/main/java/org/apache/doris/alter/AlterJob.java
index d0aaaa4..ab927cc 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterJob.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.alter;
 
+import com.google.common.collect.Lists;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -291,7 +292,7 @@ public abstract class AlterJob implements Writable {
             return true;
         } else {
             isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
-                    transactionId, dbId);
+                    transactionId, dbId, Lists.newArrayList(tableId));
             return isPreviousLoadFinished;
         }
     }
diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 031483c..877d955 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -498,7 +498,7 @@ public class RollupJobV2 extends AlterJobV2 {
 
     // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
     protected boolean isPreviousLoadFinished() {
-        return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId);
+        return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId));
     }
 
     public static RollupJobV2 read(DataInput in) throws IOException {
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 96025fd..e18e6f9 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -617,7 +617,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
     // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
     protected boolean isPreviousLoadFinished() {
-        return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId);
+        return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId));
     }
 
     public static SchemaChangeJobV2 read(DataInput in) throws IOException {
diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 2351cb1..50b94e2 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -292,7 +292,8 @@ public class InsertStmt extends DdlStmt {
             if (targetTable instanceof OlapTable) {
                 LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
                 transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
-                        label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
+                        Lists.newArrayList(targetTable.getId()), label, "FE: " + FrontendOptions.getLocalHostAddress(),
+                        sourceType, timeoutSecond);
             }
             isTransactionBegin = true;
         }
diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 0e3e194..036fe6e 100644
--- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -880,7 +880,7 @@ public class TabletScheduler extends MasterDaemon {
         } else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
             long watermarkTxnId = replica.getWatermarkTxnId();
             if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
-                    tabletCtx.getDbId())) {
+                    tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) {
                 throw new SchedException(Status.SCHEDULE_FAILED, "wait txn before " + watermarkTxnId + " to be finished");
             }
         }
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index ed32274..19b2f63 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -167,6 +167,9 @@ public final class FeMetaVersion {
     public static final int VERSION_77 = 77;
     // plugin support
     public static final int VERSION_78 = 78;
+    // for transaction state in table level
+    public static final int VERSION_79 = 79;
+
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_78;
+    public static final int VERSION_CURRENT = VERSION_79;
 }
diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java
index 2990a09..8b5e8b8 100644
--- a/fe/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/src/main/java/org/apache/doris/load/Load.java
@@ -3326,7 +3326,8 @@ public class Load {
             }
             loadDeleteJob.setIdToTabletLoadInfo(idToTabletLoadInfo);
             loadDeleteJob.setState(JobState.LOADING);
-            long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), jobLabel,
+            long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
+                    Lists.newArrayList(table.getId()), jobLabel,
                     "FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND,
                     Config.stream_load_default_timeout_second);
             loadDeleteJob.setTransactionId(transactionId);
diff --git a/fe/src/main/java/org/apache/doris/load/LoadJob.java b/fe/src/main/java/org/apache/doris/load/LoadJob.java
index 450dfdb..ef0e26c 100644
--- a/fe/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/LoadJob.java
@@ -78,6 +78,7 @@ public class LoadJob implements Writable {
 
     private long id;
     private long dbId;
+    private long tableId;
     private String label;
     // when this job is a real time load job, the job is attach with a transaction
     private long transactionId = -1;
@@ -144,6 +145,7 @@ public class LoadJob implements Writable {
             DeleteInfo deleteInfo) {
         this.id = id;
         this.dbId = dbId;
+        this.tableId = tableId;
         this.label = label; 
         this.transactionId = -1;
         this.timestamp = -1;
@@ -243,6 +245,10 @@ public class LoadJob implements Writable {
         return dbId;
     }
 
+    public long getTableId() {
+        return tableId;
+    }
+
     public void setDbId(long dbId) {
         this.dbId = dbId;
     }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 62c10b5..eb7938e 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -208,7 +208,8 @@ public class BrokerLoadJob extends LoadJob {
     public void beginTxn()
             throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
         transactionId = Catalog.getCurrentGlobalTransactionMgr()
-                .beginTransaction(dbId, label, null, "FE: " + FrontendOptions.getLocalHostAddress(),
+                .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
+                        "FE: " + FrontendOptions.getLocalHostAddress(),
                                   TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
                                   timeoutSecond);
     }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 8f68309..08efaeb 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -222,7 +222,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
     public long getDbId() {
         return dbId;
     }
-
     public String getLabel() {
         return label;
     }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 4490a1b..5104094 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -137,11 +138,12 @@ public class LoadManager implements Writable{
             cluster = request.getCluster();
         }
         Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
+        Table table = database.getTable(request.tbl);
         checkTable(database, request.getTbl());
         LoadJob loadJob = null;
         writeLock();
         try {
-            loadJob = new MiniLoadJob(database.getId(), request);
+            loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
             // call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
             // NOTICE(cmy): this order is only for Mini Load, because mini load's unprotectedExecute() only do beginTxn().
             // for other kind of load job, execute the job after adding job.
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
index 94d4707..68def55 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2;
 
+import com.google.common.collect.Lists;
 import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
@@ -47,14 +48,17 @@ public class MiniLoadJob extends LoadJob {
 
     private String tableName;
 
+    private long tableId;
+
     // only for log replay
     public MiniLoadJob() {
         super();
         this.jobType = EtlJobType.MINI;
     }
 
-    public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
+    public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
         super(dbId, request.getLabel());
+        this.tableId = tableId;
         this.jobType = EtlJobType.MINI;
         this.tableName = request.getTbl();
         if (request.isSetTimeout_second()) {
@@ -93,7 +97,7 @@ public class MiniLoadJob extends LoadJob {
     public void beginTxn()
             throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
         transactionId = Catalog.getCurrentGlobalTransactionMgr()
-                .beginTransaction(dbId, label, requestId, "FE: " + FrontendOptions.getLocalHostAddress(),
+                .beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId, "FE: " + FrontendOptions.getLocalHostAddress(),
                                   TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
                                   timeoutSecond);
     }
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 3af9fb4..3a7af6d 100644
--- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -164,7 +164,8 @@ public abstract class RoutineLoadTaskInfo {
         RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
         try {
             txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
-                    routineLoadJob.getDbId(), DebugUtil.printId(id), null, "FE: " + FrontendOptions.getLocalHostAddress(),
+                    routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
+                    "FE: " + FrontendOptions.getLocalHostAddress(),
                     TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(),
                     timeoutMs / 1000);
         } catch (DuplicatedRequestException e) {
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 9a6075f..78b2138 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -670,6 +670,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         Catalog catalog = Catalog.getInstance();
         String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
         Database db = catalog.getDb(fullDbName);
+        Table table = db.getTable(request.tbl);
         if (db == null) {
             String dbName = fullDbName;
             if (Strings.isNullOrEmpty(request.getCluster())) {
@@ -681,7 +682,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         // begin
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
         return Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
-                db.getId(), request.getLabel(), request.getRequest_id(), "BE: " + clientIp,
+                db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequest_id(), "BE: " + clientIp,
                 TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
     }
 
diff --git a/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java b/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java
index ef13da8..b9fcc87 100644
--- a/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java
+++ b/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.task;
 
+import com.google.common.collect.Lists;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.common.util.DebugUtil;
@@ -67,6 +68,7 @@ public abstract class LoadPendingTask extends MasterTask {
         
         // get db
         long dbId = job.getDbId();
+        long tableId = job.getTableId();
         db = Catalog.getInstance().getDb(dbId);
         if (db == null) {
             load.cancelLoadJob(job, CancelType.ETL_SUBMIT_FAIL, "db does not exist. id: " + dbId);
@@ -78,7 +80,7 @@ public abstract class LoadPendingTask extends MasterTask {
             // create etl request and make some guarantee for schema change and rollup
             if (job.getTransactionId() < 0) {
                 long transactionId = Catalog.getCurrentGlobalTransactionMgr()
-                        .beginTransaction(dbId, DebugUtil.printId(UUID.randomUUID()),
+                        .beginTransaction(dbId, Lists.newArrayList(tableId), DebugUtil.printId(UUID.randomUUID()),
                                           "FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND,
                                           job.getTimeoutSecond());
                 job.setTransactionId(transactionId);
diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 2279ffc..e201f17 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -121,10 +122,10 @@ public class GlobalTransactionMgr implements Writable {
         return callbackFactory;
     }
 
-    public long beginTransaction(long dbId, String label, String coordinator, LoadJobSourceType sourceType,
+    public long beginTransaction(long dbId, List<Long> tableIdList, String label, String coordinator, LoadJobSourceType sourceType,
             long timeoutSecond)
             throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException {
-        return beginTransaction(dbId, label, null, coordinator, sourceType, -1, timeoutSecond);
+        return beginTransaction(dbId, tableIdList, label, null, coordinator, sourceType, -1, timeoutSecond);
     }
     
     /**
@@ -140,7 +141,7 @@ public class GlobalTransactionMgr implements Writable {
      * @throws DuplicatedRequestException
      * @throws IllegalTransactionParameterException
      */
-    public long beginTransaction(long dbId, String label, TUniqueId requestId,
+    public long beginTransaction(long dbId, List<Long> tableIdList, String label, TUniqueId requestId,
             String coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond)
             throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException {
 
@@ -196,7 +197,7 @@ public class GlobalTransactionMgr implements Writable {
           
             long tid = idGenerator.getNextTransactionId();
             LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator);
-            TransactionState transactionState = new TransactionState(dbId, tid, label, requestId, sourceType,
+            TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType,
                     coordinator, listenerId, timeoutSecond * 1000);
             transactionState.setPrepareTime(System.currentTimeMillis());
             unprotectUpsertTransactionState(transactionState);
@@ -802,25 +803,37 @@ public class GlobalTransactionMgr implements Writable {
     }
     
     // check if there exists a load job before the endTransactionId have all finished
-    // load job maybe started but could not know the affected table id, so that we not check by table
-    public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId) {
-        readLock();
-        try {
-            for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
-                if (entry.getValue().getDbId() != dbId || !entry.getValue().isRunning()) {
-                    continue;
-                }
-                if (entry.getKey() <= endTransactionId) {
-                    LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
-                            entry.getKey(), dbId, endTransactionId);
-                    return false;
-                }
+    public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List<Long> tableIdList) {
+        for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
+            if (entry.getValue().getDbId() != dbId || !isIntersectionNotEmpty(entry.getValue().getTableIdList(),
+                    tableIdList) || !entry.getValue().isRunning()) {
+                continue;
+            }
+            if (entry.getKey() <= endTransactionId) {
+                LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
+                        entry.getKey(), dbId, endTransactionId);
+                return false;
             }
-        } finally {
-            readUnlock();
         }
         return true;
     }
+
+    // check if there exists a intersection between the source tableId list and target tableId list
+    // if one of them is null or empty, that means that we don't know related tables in tableList,
+    // we think the two lists may have intersection for right ordered txns
+    public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> targetTableIdList) {
+        if (CollectionUtils.isEmpty(sourceTableIdList) || CollectionUtils.isEmpty(targetTableIdList)) {
+            return true;
+        }
+        for (int i = 0; i < sourceTableIdList.size(); i++) {
+            for (int j = 0; j < targetTableIdList.size(); j++) {
+                if (sourceTableIdList.get(i).equals(targetTableIdList.get(j))) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
     
     /*
      * The txn cleaner will run at a fixed interval and try to delete expired and timeout txns:
diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
index 0a967ac..e261f7f 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.transaction;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.Config;
@@ -40,6 +42,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -126,6 +129,7 @@ public class TransactionState implements Writable {
     }
 
     private long dbId;
+    private List<Long> tableIdList;
     private long transactionId;
     private String label;
     // requsetId is used to judge whether a begin request is a internal retry request.
@@ -168,6 +172,7 @@ public class TransactionState implements Writable {
 
     public TransactionState() {
         this.dbId = -1;
+        this.tableIdList = Lists.newArrayList();
         this.transactionId = -1;
         this.label = "";
         this.idToTableCommitInfos = Maps.newHashMap();
@@ -184,9 +189,10 @@ public class TransactionState implements Writable {
         this.latch = new CountDownLatch(1);
     }
     
-    public TransactionState(long dbId, long transactionId, String label, TUniqueId requsetId,
+    public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requsetId,
                             LoadJobSourceType sourceType, String coordinator, long callbackId, long timeoutMs) {
         this.dbId = dbId;
+        this.tableIdList = (tableIdList == null ? Lists.newArrayList() : tableIdList);
         this.transactionId = transactionId;
         this.label = label;
         this.requsetId = requsetId;
@@ -408,7 +414,11 @@ public class TransactionState implements Writable {
     public long getDbId() {
         return dbId;
     }
-    
+
+    public List<Long> getTableIdList() {
+        return tableIdList;
+    }
+
     public Map<Long, TableCommitInfo> getIdToTableCommitInfos() {
         return idToTableCommitInfos;
     }
@@ -467,6 +477,7 @@ public class TransactionState implements Writable {
         sb.append("transaction id: ").append(transactionId);
         sb.append(", label: ").append(label);
         sb.append(", db id: ").append(dbId);
+        sb.append(", table id list: ").append(StringUtils.join(tableIdList, ","));
         sb.append(", callback id: ").append(callbackId);
         sb.append(", coordinator: ").append(coordinator);
         sb.append(", transaction status: ").append(transactionStatus);
@@ -533,6 +544,10 @@ public class TransactionState implements Writable {
         }
         out.writeLong(callbackId);
         out.writeLong(timeoutMs);
+        out.writeInt(tableIdList.size());
+        for (int i = 0; i < tableIdList.size(); i++) {
+            out.writeLong(tableIdList.get(i));
+        }
     }
     
     public void readFields(DataInput in) throws IOException {
@@ -564,5 +579,13 @@ public class TransactionState implements Writable {
             callbackId = in.readLong();
             timeoutMs = in.readLong();
         }
+
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_79) {
+            tableIdList = Lists.newArrayList();
+            int tableListSize = in.readInt();
+            for (int i = 0; i < tableListSize; i++) {
+                tableIdList.add(in.readLong());
+            }
+        }
     }
 }
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index ec4533f..2feecfc 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.doris.load.loadv2;
 
+import com.google.common.collect.Lists;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
@@ -106,7 +107,7 @@ public class LoadJobTest {
         LoadJob loadJob = new BrokerLoadJob();
         new Expectations() {
             {
-                globalTransactionMgr.beginTransaction(anyLong, anyString, (TUniqueId) any, anyString,
+                globalTransactionMgr.beginTransaction(anyLong, Lists.newArrayList(), anyString, (TUniqueId) any, anyString,
                         (TransactionState.LoadJobSourceType) any, anyLong, anyLong);
                 minTimes = 0;
                 result = 1;
diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index e3a5f3f..860553c 100644
--- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -106,7 +106,7 @@ public class GlobalTransactionMgrTest {
     public void testBeginTransaction() throws LabelAlreadyUsedException, AnalysisException,
             BeginTransactionException, DuplicatedRequestException {
         FakeCatalog.setCatalog(masterCatalog);
-        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLable1,
                 transactionSource,
                 LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -124,7 +124,7 @@ public class GlobalTransactionMgrTest {
         FakeCatalog.setCatalog(masterCatalog);
         long transactionId = 0;
         try {
-            transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+            transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                     CatalogTestUtil.testTxnLable1,
                     transactionSource,
                     LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -141,7 +141,7 @@ public class GlobalTransactionMgrTest {
         assertEquals(transactionSource, transactionState.getCoordinator());
 
         try {
-            transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+            transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                     CatalogTestUtil.testTxnLable1,
                     transactionSource,
                     LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -154,7 +154,7 @@ public class GlobalTransactionMgrTest {
     @Test
     public void testCommitTransaction1() throws UserException {
         FakeCatalog.setCatalog(masterCatalog);
-        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLable1,
                 transactionSource,
                 LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -195,7 +195,7 @@ public class GlobalTransactionMgrTest {
     public void testCommitTransactionWithOneFailed() throws UserException {
         TransactionState transactionState = null;
         FakeCatalog.setCatalog(masterCatalog);
-        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLable1,
                 transactionSource,
                 LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -217,7 +217,7 @@ public class GlobalTransactionMgrTest {
 
         FakeCatalog.setCatalog(masterCatalog);
         // commit another transaction with 1,3 success
-        long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+        long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLable2,
                 transactionSource,
                 LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -320,7 +320,7 @@ public class GlobalTransactionMgrTest {
                 partitionIdToOffset);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
-        TransactionState transactionState = new TransactionState(1L, 1L, "label", null,
+        TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null,
                 LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
                 Config.stream_load_default_timeout_second);
         transactionState.setTransactionStatus(TransactionStatus.PREPARE);
@@ -387,7 +387,7 @@ public class GlobalTransactionMgrTest {
                 partitionIdToOffset);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
-        TransactionState transactionState = new TransactionState(1L, 1L, "label", null,
+        TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null,
                 LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
                 Config.stream_load_default_timeout_second);
         transactionState.setTransactionStatus(TransactionStatus.PREPARE);
@@ -431,7 +431,7 @@ public class GlobalTransactionMgrTest {
     }
 
     public void testFinishTransaction() throws UserException {
-        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLable1,
                 transactionSource,
                 LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -477,7 +477,7 @@ public class GlobalTransactionMgrTest {
                 .getPartition(CatalogTestUtil.testPartition1);
         Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1);
         FakeCatalog.setCatalog(masterCatalog);
-        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLable1,
                 transactionSource,
                 LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -531,7 +531,7 @@ public class GlobalTransactionMgrTest {
 
         FakeCatalog.setCatalog(masterCatalog);
         // commit another transaction with 1,3 success
-        long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+        long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLable2,
                 transactionSource,
                 LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -603,7 +603,7 @@ public class GlobalTransactionMgrTest {
     public void testDeleteTransaction() throws LabelAlreadyUsedException,
             AnalysisException, BeginTransactionException, DuplicatedRequestException {
 
-        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+        long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLable1,
                 transactionSource,
                 LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org