You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/12/16 02:39:33 UTC

[incubator-doris] branch master updated: [fix][refactor](broker load) refactor the scheduling logic of broker load (#7371)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b90967  [fix][refactor](broker load) refactor the scheduling logic of broker load (#7371)
2b90967 is described below

commit 2b90967c4c8c0659deb981c08d3e6ce784460b02
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Dec 16 10:39:22 2021 +0800

    [fix][refactor](broker load) refactor the scheduling logic of broker load (#7371)
    
    1. Refactor the scheduling logic of broker load. Details see #7367
    2. Fix bug that loadedBytes in SHOW LOAD result is wrong.
    3. Cancel the thread of LoadTimeoutChecker
       Now for PENDING load jobs, there will be no timeout. And the timeout of a load job
       start when pending load task is scheduled.
    4. Fix a bug that the loading task is never submitted to the pool.
       The logic of BlockedPolicy is wrong. We should make sure the task is submitted to the pool,
       or the RejectedExecutionException should be thrown.
    5. Now the transaction of a load job will begin in pending task, instead of when submitting the job.
---
 be/src/runtime/runtime_state.cpp                   |  2 +
 be/src/runtime/runtime_state.h                     |  4 --
 .../load-data/broker-load-manual.md                | 14 +++++
 .../load-data/broker-load-manual.md                | 12 ++++
 .../java/org/apache/doris/catalog/Catalog.java     | 72 +++++++++++-----------
 .../org/apache/doris/common/ThreadPoolManager.java | 35 ++++++-----
 .../doris/load/loadv2/BrokerLoadPendingTask.java   |  5 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java | 18 ++----
 .../apache/doris/load/loadv2/LoadJobScheduler.java | 51 ++++-----------
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |  1 +
 .../org/apache/doris/load/loadv2/LoadManager.java  | 43 ++++++-------
 .../doris/load/loadv2/LoadTimeoutChecker.java      | 49 ---------------
 .../doris/load/loadv2/SparkLoadPendingTask.java    | 19 +++---
 .../org/apache/doris/task/MasterTaskExecutor.java  |  4 +-
 .../doris/transaction/DatabaseTransactionMgr.java  | 29 +++++----
 .../org/apache/doris/load/loadv2/LoadJobTest.java  | 14 ++---
 .../apache/doris/load/loadv2/SparkLoadJobTest.java | 28 +++------
 .../load/loadv2/SparkLoadPendingTaskTest.java      | 11 ++--
 18 files changed, 176 insertions(+), 235 deletions(-)

diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 8ac3378..6e51936 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -64,6 +64,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id,
           _num_rows_load_filtered(0),
           _num_rows_load_unselected(0),
           _num_print_error_rows(0),
+          _num_bytes_load_total(0),
           _load_job_id(-1),
           _normal_row_number(0),
           _error_row_number(0),
@@ -91,6 +92,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
           _num_rows_load_filtered(0),
           _num_rows_load_unselected(0),
           _num_print_error_rows(0),
+          _num_bytes_load_total(0),
           _normal_row_number(0),
           _error_row_number(0),
           _error_log_file_path(""),
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 3e4a014..e0cbd7f 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -311,10 +311,6 @@ public:
         _num_bytes_load_total.fetch_add(bytes_load);
     }
 
-    void set_update_num_bytes_load_total(int64_t bytes_load) {
-        _num_bytes_load_total.store(bytes_load);
-    }
-
     void update_num_rows_load_filtered(int64_t num_rows) {
         _num_rows_load_filtered.fetch_add(num_rows);
     }
diff --git a/docs/en/administrator-guide/load-data/broker-load-manual.md b/docs/en/administrator-guide/load-data/broker-load-manual.md
index 930b5e1..1a27a3b 100644
--- a/docs/en/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/en/administrator-guide/load-data/broker-load-manual.md
@@ -460,6 +460,20 @@ We will only discuss the case of a single BE. If the user cluster has more than
 		Note: The average user's environment may not reach the speed of 10M/s, so it is recommended that more than 500G files be split and imported.
 		
 		```
+		
+### Job Scheduling
+
+The system limits the number of Broker Load jobs running in a cluster to prevent too many Load jobs from running at the same time.
+
+First, the configuration parameter of FE: `desired_max_waiting_jobs` will limit the number of Broker Load jobs that are pending or running (the job status is PENDING or LOADING) in a cluster. The default is 100. If this threshold is exceeded, the newly submitted job will be rejected directly.
+
+A Broker Load job will be divided into pending task and loading task phases. Among them, the pending task is responsible for obtaining the information of the imported file, and the loading task will be sent to BE to perform specific import tasks.
+
+The configuration parameter `async_pending_load_task_pool_size` of FE is used to limit the number of pending tasks running at the same time. It is also equivalent to controlling the number of import tasks that are actually running. This parameter defaults to 10. In other words, assuming that the user submits 100 Load jobs, only 10 jobs will enter the LOADING state and start execution, while other jobs are in the PENDING waiting state.
+
+The FE configuration parameter `async_loading_load_task_pool_size` is used to limit the number of loading tasks that run at the same time. A Broker Load job will have 1 pending task and multiple loading tasks (equal to the number of DATA INFILE clauses in the LOAD statement). So `async_loading_load_task_pool_size` should be greater than or equal to `async_pending_load_task_pool_size`.
+
+Because the work of pending tasks is relatively lightweight (for example, just accessing hdfs to obtain file information), `async_pending_load_task_pool_size` does not need to be large, and the default 10 is usually sufficient. And `async_loading_load_task_pool_size` is really used to limit the import tasks that can be run at the same time. It can be adjusted appropriately according to the cluster size.
 
 ### Performance analysis
 
diff --git a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
index 8d85457..6a7dc40 100644
--- a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
@@ -469,6 +469,18 @@ LoadFinishTime: 2019-07-27 11:50:16
         注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。
         
         ```
+        
+### 作业调度
+
+系统会限制一个集群内,正在运行的 Broker Load 作业数量,以防止同时运行过多的 Load 作业。
+
+首先, FE 的配置参数:`desired_max_waiting_jobs` 会限制一个集群内,未开始或正在运行(作业状态为 PENDING 或 LOADING)的 Broker Load 作业数量。默认为 100。如果超过这个阈值,新提交的作业将会被直接拒绝。
+
+一个 Broker Load 作业会被分为 pending task 和 loading task 阶段。其中 pending task 负责获取导入文件的信息,而 loading task 会发送给BE执行具体的导入任务。
+
+FE 的配置参数 `async_pending_load_task_pool_size` 用于限制同时运行的 pending task 的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为 10。也就是说,假设用户提交了100个Load作业,同时只会有10个作业会进入 LOADING 状态开始执行,而其他作业处于 PENDING 等待状态。
+
+FE 的配置参数 `async_loading_load_task_pool_size` 用于限制同时运行的 loading task 的任务数量。一个 Broker Load 作业会有 1 个 pending task 和多个 loading task (等于 LOAD 语句中 DATA INFILE 子句的个数)。所以 `async_loading_load_task_pool_size` 应该大于等于 `async_pending_load_task_pool_size`。
 
 ### 性能分析
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index b2cf59d..00ce591 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -57,6 +57,7 @@ import org.apache.doris.analysis.CreateTableLikeStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.CreateUserStmt;
 import org.apache.doris.analysis.CreateViewStmt;
+import org.apache.doris.analysis.DataSortInfo;
 import org.apache.doris.analysis.DdlStmt;
 import org.apache.doris.analysis.DecommissionBackendClause;
 import org.apache.doris.analysis.DistributionDesc;
@@ -93,7 +94,6 @@ import org.apache.doris.analysis.TypeDef;
 import org.apache.doris.analysis.UninstallPluginStmt;
 import org.apache.doris.analysis.UserDesc;
 import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.analysis.DataSortInfo;
 import org.apache.doris.backup.BackupHandler;
 import org.apache.doris.blockrule.SqlBlockRuleMgr;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
@@ -171,7 +171,6 @@ import org.apache.doris.load.loadv2.LoadEtlChecker;
 import org.apache.doris.load.loadv2.LoadJobScheduler;
 import org.apache.doris.load.loadv2.LoadLoadingChecker;
 import org.apache.doris.load.loadv2.LoadManager;
-import org.apache.doris.load.loadv2.LoadTimeoutChecker;
 import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.load.routineload.RoutineLoadScheduler;
 import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
@@ -429,7 +428,6 @@ public class Catalog {
 
     private LoadJobScheduler loadJobScheduler;
 
-    private LoadTimeoutChecker loadTimeoutChecker;
     private LoadEtlChecker loadEtlChecker;
     private LoadLoadingChecker loadLoadingChecker;
 
@@ -582,14 +580,19 @@ public class Catalog {
         this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat, Config.tablet_rebalancer_type);
         this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);
 
+        // The pendingLoadTaskScheduler's queue size should not less than Config.desired_max_waiting_jobs.
+        // So that we can guarantee that all submitted load jobs can be scheduled without being starved.
         this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending_load_task_scheduler", Config.async_pending_load_task_pool_size,
-                Config.async_pending_load_task_pool_size, !isCheckpointCatalog);
+                Config.desired_max_waiting_jobs, !isCheckpointCatalog);
+        // The loadingLoadTaskScheduler's queue size is unlimited, so that it can receive all loading tasks
+        // created after pending tasks finish. And don't worry about the high concurrency, because the
+        // concurrency is limited by Config.desired_max_waiting_jobs and Config.async_loading_load_task_pool_size.
         this.loadingLoadTaskScheduler = new MasterTaskExecutor("loading_load_task_scheduler", Config.async_loading_load_task_pool_size,
-                Config.async_loading_load_task_pool_size / 5, !isCheckpointCatalog);
+                Integer.MAX_VALUE, !isCheckpointCatalog);
+
         this.loadJobScheduler = new LoadJobScheduler();
         this.loadManager = new LoadManager(loadJobScheduler);
         this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000);
-        this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager);
         this.loadEtlChecker = new LoadEtlChecker(loadManager);
         this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
         this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
@@ -1318,7 +1321,6 @@ public class Catalog {
         loadingLoadTaskScheduler.start();
         loadManager.prepareJobs();
         loadJobScheduler.start();
-        loadTimeoutChecker.start();
         loadEtlChecker.start();
         loadLoadingChecker.start();
         // Export checker
@@ -3075,8 +3077,8 @@ public class Catalog {
             List<String> createTableStmt = Lists.newArrayList();
             table.readLock();
             try {
-                if (table.getType() == TableType.OLAP){
-                    if (!CollectionUtils.isEmpty(stmt.getRollupNames())){
+                if (table.getType() == TableType.OLAP) {
+                    if (!CollectionUtils.isEmpty(stmt.getRollupNames())) {
                         OlapTable olapTable = (OlapTable) table;
                         for (String rollupIndexName : stmt.getRollupNames()) {
                             if (!olapTable.hasMaterializedIndex(rollupIndexName)) {
@@ -3103,7 +3105,7 @@ public class Catalog {
             throw new DdlException("Failed to execute CREATE TABLE LIKE " + stmt.getExistedTableName() + ". Reason: " + e.getMessage());
         }
     }
-    
+
     public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlException {
         try {
             List<String> columnNames = stmt.getColumnNames();
@@ -3514,7 +3516,7 @@ public class Catalog {
                                                  boolean isInMemory,
                                                  TStorageFormat storageFormat,
                                                  TTabletType tabletType,
-                                                 DataSortInfo dataSortInfo)throws DdlException {
+                                                 DataSortInfo dataSortInfo) throws DdlException {
         // create base index first.
         Preconditions.checkArgument(baseIndexId != -1);
         MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
@@ -4074,7 +4076,7 @@ public class Catalog {
 
     public static void getDdlStmt(Table table, List<String> createTableStmt, List<String> addPartitionStmt,
                                   List<String> createRollupStmt, boolean separatePartition, boolean hidePassword) {
-         getDdlStmt(null, null, table, createTableStmt, addPartitionStmt, createRollupStmt, separatePartition, hidePassword);
+        getDdlStmt(null, null, table, createTableStmt, addPartitionStmt, createRollupStmt, separatePartition, hidePassword);
     }
 
     public static void getDdlStmt(DdlStmt ddlStmt, String dbName, Table table, List<String> createTableStmt, List<String> addPartitionStmt,
@@ -4159,7 +4161,7 @@ public class Catalog {
             sb.append("\n").append(distributionInfo.toSql());
 
             // rollup index
-            if (ddlStmt instanceof CreateTableLikeStmt){
+            if (ddlStmt instanceof CreateTableLikeStmt) {
 
                 CreateTableLikeStmt stmt = (CreateTableLikeStmt) ddlStmt;
 
@@ -4175,7 +4177,7 @@ public class Catalog {
                     addIndexIdList = olapTable.getIndexIdListExceptBaseIndex();
                 }
 
-                if (!addIndexIdList.isEmpty()){
+                if (!addIndexIdList.isEmpty()) {
                     sb.append("\n").append("rollup (");
                 }
 
@@ -4192,7 +4194,7 @@ public class Catalog {
                             sb.append(", ");
                         }
                     }
-                    if (index != size){
+                    if (index != size) {
                         sb.append("),");
                     } else {
                         sb.append(")");
@@ -4536,7 +4538,7 @@ public class Catalog {
                     }
                 }
                 Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum(),
-						totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum());
+                        totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum());
             }
 
             if (groupId != null && chooseBackendsArbitrary) {
@@ -4841,7 +4843,7 @@ public class Catalog {
 
     public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
         return getDbOrException(dbName, s -> new MetaNotFoundException("unknown databases, dbName=" + s,
-                        ErrorCode.ERR_BAD_DB_ERROR));
+                ErrorCode.ERR_BAD_DB_ERROR));
     }
 
     public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
@@ -5382,7 +5384,7 @@ public class Catalog {
 
     // the invoker should keep table's write lock
     public void modifyTableColocate(Database db, OlapTable table, String colocateGroup, boolean isReplay,
-            GroupId assignedGroupId)
+                                    GroupId assignedGroupId)
             throws DdlException {
 
         String oldGroup = table.getColocateGroup();
@@ -5765,44 +5767,44 @@ public class Catalog {
             if (olapTable.isColocateTable()) {
                 throw new DdlException("Cannot change default bucket number of colocate table.");
             }
-    
+
             if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) {
                 throw new DdlException("Only support change partitioned table's distribution.");
             }
-    
+
             DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
             if (defaultDistributionInfo.getType() != DistributionInfoType.HASH) {
                 throw new DdlException("Cannot change default bucket number of distribution type " + defaultDistributionInfo.getType());
             }
-    
+
             DistributionDesc distributionDesc = modifyDistributionClause.getDistributionDesc();
-    
+
             DistributionInfo distributionInfo = null;
-    
+
             List<Column> baseSchema = olapTable.getBaseSchema();
-    
+
             if (distributionDesc != null) {
                 distributionInfo = distributionDesc.toDistributionInfo(baseSchema);
-                    // for now. we only support modify distribution's bucket num
+                // for now. we only support modify distribution's bucket num
                 if (distributionInfo.getType() != DistributionInfoType.HASH) {
                     throw new DdlException("Cannot change distribution type to " + distributionInfo.getType());
                 }
-    
+
                 HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
                 List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
                 List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
                 if (!newDistriCols.equals(defaultDistriCols)) {
                     throw new DdlException("Cannot assign hash distribution with different distribution cols. "
-                                + "default is: " + defaultDistriCols);
+                            + "default is: " + defaultDistriCols);
                 }
-    
+
                 int bucketNum = hashDistributionInfo.getBucketNum();
                 if (bucketNum <= 0) {
                     throw new DdlException("Cannot assign hash distribution buckets less than 1");
                 }
-    
+
                 defaultDistributionInfo.setBucketNum(bucketNum);
-    
+
                 ModifyTableDefaultDistributionBucketNumOperationLog info = new ModifyTableDefaultDistributionBucketNumOperationLog(db.getId(), olapTable.getId(), bucketNum);
                 editLog.logModifyDefaultDistributionBucketNum(info);
                 LOG.info("modify table[{}] default bucket num to {}", olapTable.getName(), bucketNum);
@@ -5950,7 +5952,7 @@ public class Catalog {
     }
 
     public Function getTableFunction(Function desc, Function.CompareMode mode) {
-        return  functionSet.getFunction(desc, mode, true);
+        return functionSet.getFunction(desc, mode, true);
     }
 
     public boolean isNullResultWithOneNullParamFunction(String funcName) {
@@ -6699,13 +6701,13 @@ public class Catalog {
     /*
      * Truncate specified table or partitions.
      * The main idea is:
-     * 
+     *
      * 1. using the same schema to create new table(partitions)
      * 2. use the new created table(partitions) to replace the old ones.
-     * 
+     *
      * if no partition specified, it will truncate all partitions of this table, including all temp partitions,
      * otherwise, it will only truncate those specified partitions.
-     * 
+     *
      */
     public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlException {
         TableRef tblRef = truncateTableStmt.getTblRef();
@@ -7227,7 +7229,7 @@ public class Catalog {
 
     public void cleanTrash(AdminCleanTrashStmt stmt) {
         List<Backend> backends = stmt.getBackends();
-        for (Backend backend : backends){
+        for (Backend backend : backends) {
             BackendService.Client client = null;
             TNetworkAddress address = null;
             boolean ok = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 20ee89a..147e6de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -31,6 +31,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
@@ -103,8 +104,8 @@ public class ThreadPoolManager {
     }
 
     public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) {
-       return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME ,TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
-               new BlockedPolicy(poolName, 60), poolName, needRegisterMetric);
+        return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
+                new BlockedPolicy(poolName, 60), poolName, needRegisterMetric);
     }
 
     public static ThreadPoolExecutor newDaemonProfileThreadPool(int numThread, int queueSize, String poolName,
@@ -115,13 +116,13 @@ public class ThreadPoolManager {
     }
 
     public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize,
-                                               int maximumPoolSize,
-                                               long keepAliveTime,
-                                               TimeUnit unit,
-                                               BlockingQueue<Runnable> workQueue,
-                                               RejectedExecutionHandler handler,
-                                               String poolName,
-                                               boolean needRegisterMetric) {
+                                                         int maximumPoolSize,
+                                                         long keepAliveTime,
+                                                         TimeUnit unit,
+                                                         BlockingQueue<Runnable> workQueue,
+                                                         RejectedExecutionHandler handler,
+                                                         String poolName,
+                                                         boolean needRegisterMetric) {
         ThreadFactory threadFactory = namedThreadFactory(poolName);
         ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
         if (needRegisterMetric) {
@@ -164,7 +165,7 @@ public class ThreadPoolManager {
 
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-           LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
+            LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
         }
     }
 
@@ -173,7 +174,7 @@ public class ThreadPoolManager {
      */
     static class BlockedPolicy implements RejectedExecutionHandler {
 
-        private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);
+        private static final Logger LOG = LogManager.getLogger(BlockedPolicy.class);
 
         private String threadPoolName;
 
@@ -187,14 +188,20 @@ public class ThreadPoolManager {
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
             try {
-                executor.getQueue().offer(r, timeoutSeconds, TimeUnit.SECONDS);
+                boolean ret = executor.getQueue().offer(r, timeoutSeconds, TimeUnit.SECONDS);
+                if (!ret) {
+                    throw new RejectedExecutionException("submit task failed, queue size is full: " + this.threadPoolName);
+                }
             } catch (InterruptedException e) {
-                LOG.warn("Task " + r.toString() + " wait to enqueue in " + threadPoolName + " " + executor.toString() + " failed");
+                String errMsg = String.format("Task %s wait to enqueue in %s %s failed",
+                        r.toString(), threadPoolName, executor.toString());
+                LOG.warn(errMsg);
+                throw new RejectedExecutionException(errMsg);
             }
         }
     }
 
-    static class LogDiscardOldestPolicy implements RejectedExecutionHandler{
+    static class LogDiscardOldestPolicy implements RejectedExecutionHandler {
 
         private static final Logger LOG = LogManager.getLogger(LogDiscardOldestPolicy.class);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
index 622a1e4..a35df4a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
@@ -59,6 +59,7 @@ public class BrokerLoadPendingTask extends LoadTask {
     void executeTask() throws UserException {
         LOG.info("begin to execute broker pending task. job: {}", callback.getCallbackId());
         getAllFileStatus();
+        ((BrokerLoadJob) callback).beginTxn();
     }
 
     private void getAllFileStatus() throws UserException {
@@ -122,8 +123,8 @@ public class BrokerLoadPendingTask extends LoadTask {
                     LOG.info("get {} files in file group {} for table {}. size: {}. job: {}, broker: {} ",
                             filteredFileStatuses.size(), groupNum, entry.getKey(), groupFileSize,
                             callback.getCallbackId(),
-                        brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER ?
-                            BrokerUtil.getAddress(brokerDesc): brokerDesc.getStorageType());
+                            brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER ?
+                                    BrokerUtil.getAddress(brokerDesc) : brokerDesc.getStorageType());
                     groupNum++;
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 1d86239..d8edfc6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -423,8 +423,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
      * @throws AnalysisException          there are error params in job
      * @throws DuplicatedRequestException
      */
-    public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException,
-            DuplicatedRequestException, LoadException, QuotaExceedException, MetaNotFoundException {
+    public void execute() throws LoadException {
         writeLock();
         try {
             unprotectedExecute();
@@ -433,19 +432,13 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         }
     }
 
-    public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException,
-            DuplicatedRequestException, LoadException, QuotaExceedException, MetaNotFoundException {
+    public void unprotectedExecute() throws LoadException {
         // check if job state is pending
         if (state != JobState.PENDING) {
             return;
         }
-        // the limit of job will be restrict when begin txn
-        beginTxn();
+
         unprotectedExecuteJob();
-        // update spark load job state from PENDING to ETL when pending task is finished
-        if (jobType != EtlJobType.SPARK) {
-            unprotectedUpdateState(JobState.LOADING);
-        }
     }
 
     public void processTimeout() {
@@ -759,7 +752,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
 
             // task info
             jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + getTimeout()
-                                + "; max_filter_ratio:" + getMaxFilterRatio());
+                    + "; max_filter_ratio:" + getMaxFilterRatio());
             // error msg
             if (failMsg == null) {
                 jobInfo.add(FeConstants.null_string);
@@ -1127,7 +1120,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         loadStartTimestamp = info.getLoadStartTimestamp();
     }
 
-    protected void auditFinishedLoadJob() {}
+    protected void auditFinishedLoadJob() {
+    }
 
     public static class LoadJobStateUpdateInfo implements Writable {
         @SerializedName(value = "jobId")
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
index e8d23b8..30a78c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
@@ -18,18 +18,12 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.DuplicatedRequestException;
-import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.load.FailMsg;
-import org.apache.doris.transaction.BeginTransactionException;
 
 import com.google.common.collect.Queues;
 
@@ -66,12 +60,13 @@ public class LoadJobScheduler extends MasterDaemon {
 
     private void process() throws InterruptedException {
         while (true) {
-            if (!needScheduleJobs.isEmpty()) {
-                if (needScheduleJobs.peek() instanceof BrokerLoadJob && Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().isTaskQueueFull()) {
-                    LOG.warn("Failed to take one broker load job from queue because of task queue in loading_load_task_scheduler is full");
-                    return;
-                }
-            } else {
+            if (needScheduleJobs.isEmpty()) {
+                return;
+            }
+
+            if (needScheduleJobs.peek() instanceof BrokerLoadJob && !Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().hasIdleThread()) {
+                LOG.info("Failed to take one broker load job from queue because of loading_load_task_scheduler is full." +
+                        " Waiting for next round. You can try to increase the value of Config.async_loading_load_task_pool_size");
                 return;
             }
 
@@ -81,40 +76,18 @@ public class LoadJobScheduler extends MasterDaemon {
             // schedule job
             try {
                 loadJob.execute();
-            } catch (LabelAlreadyUsedException | AnalysisException | MetaNotFoundException | QuotaExceedException e) {
-                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
-                                 .add("error_msg", "There are error properties in job. Job will be cancelled")
-                                 .build(), e);
-                // transaction not begin, so need not abort
-                loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
-                        false, true);
             } catch (LoadException e) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
-                                 .add("error_msg", "Failed to submit etl job. Job will be cancelled")
-                                 .build(), e);
-                // transaction already begin, so need abort
+                        .add("error_msg", "Failed to submit etl job. Job will be cancelled")
+                        .build(), e);
                 loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
-                                              true, true);
-            } catch (DuplicatedRequestException e) {
-                // should not happen in load job scheduler, there is no request id.
+                        true, true);
+            } catch (RejectedExecutionException e) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
-                        .add("error_msg", "Failed to begin txn with duplicate request. Job will be rescheduled later")
+                        .add("error_msg", "Failed to submit etl job. Job queue is full. retry later")
                         .build(), e);
                 needScheduleJobs.put(loadJob);
                 return;
-            } catch (BeginTransactionException e) {
-                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
-                                 .add("error_msg", "Failed to begin txn when job is scheduling. "
-                                         + "Job will be rescheduled later")
-                                 .build(), e);
-                needScheduleJobs.put(loadJob);
-                return;
-            } catch (RejectedExecutionException e) {
-                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
-                        .add("error_msg", "Failed to submit etl job. Job queue is full.")
-                        .build(), e);
-                loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
-                        true, true);
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 243e95a..301da84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -111,6 +111,7 @@ public class LoadLoadingTask extends LoadTask {
                 DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime);
         retryTime--;
         beginTime = System.nanoTime();
+        ((BrokerLoadJob) callback).updateState(JobState.LOADING);
         executeOnce();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 42aea02..ae0f97c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -81,7 +81,7 @@ import java.util.stream.Collectors;
  *   LoadManager.lock
  *     LoadJob.lock
  */
-public class LoadManager implements Writable{
+public class LoadManager implements Writable {
     private static final Logger LOG = LogManager.getLogger(LoadManager.class);
 
     private Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
@@ -108,19 +108,19 @@ public class LoadManager implements Writable{
             if (stmt.getBrokerDesc() != null && stmt.getBrokerDesc().isMultiLoadBroker()) {
                 if (!Catalog.getCurrentCatalog().getLoadInstance()
                         .isUncommittedLabel(dbId, stmt.getLabel().getLabelName())) {
-                    throw new DdlException("label: " + stmt.getLabel().getLabelName() + " not found!") ;
+                    throw new DdlException("label: " + stmt.getLabel().getLabelName() + " not found!");
                 }
-
             } else {
                 checkLabelUsed(dbId, stmt.getLabel().getLabelName());
                 if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() == null) {
                     throw new DdlException("LoadManager only support the broker and spark load.");
                 }
-                if (loadJobScheduler.isQueueFull()) {
-                    throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, "
-                            + "please retry later.");
+                if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
+                    throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " unfinished load jobs, "
+                            + "please retry later. You can use `SHOW LOAD` to view submitted jobs");
                 }
             }
+
             loadJob = BulkLoadJob.fromLoadStmt(stmt);
             createLoadJob(loadJob);
         } finally {
@@ -134,6 +134,11 @@ public class LoadManager implements Writable{
         return loadJob.getId();
     }
 
+    private long unprotectedGetUnfinishedJobNum() {
+        return idToLoadJob.values().parallelStream()
+                .filter(j -> (j.getState() != JobState.FINISHED && j.getState() != JobState.CANCELLED)).count();
+    }
+
     /**
      * This method will be invoked by streaming mini load.
      * It will begin the txn of mini load immediately without any scheduler .
@@ -149,7 +154,7 @@ public class LoadManager implements Writable{
         }
         Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
         Table table = database.getTableOrDdlException(request.tbl);
-        LoadJob loadJob = null;
+        MiniLoadJob loadJob = null;
         writeLock();
         try {
             loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
@@ -158,6 +163,7 @@ public class LoadManager implements Writable{
             // for other kind of load job, execute the job after adding job.
             // Mini load job must be executed before release write lock.
             // Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun.
+            loadJob.beginTxn();
             loadJob.unprotectedExecute();
             createLoadJob(loadJob);
         } catch (DuplicatedRequestException e) {
@@ -249,8 +255,8 @@ public class LoadManager implements Writable{
     public void replayCreateLoadJob(LoadJob loadJob) {
         createLoadJob(loadJob);
         LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
-                         .add("msg", "replay create load job")
-                         .build());
+                .add("msg", "replay create load job")
+                .build());
     }
 
     // add load job and also add to to callback factory
@@ -281,7 +287,7 @@ public class LoadManager implements Writable{
     }
 
     public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType,
-            long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
+                                      long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
 
         // get db id
         Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbName);
@@ -392,9 +398,9 @@ public class LoadManager implements Writable{
         }
         job.unprotectReadEndOperation(operation);
         LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId())
-                         .add("operation", operation)
-                         .add("msg", "replay end load job")
-                         .build());
+                .add("operation", operation)
+                .add("msg", "replay end load job")
+                .build());
     }
 
     public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) {
@@ -461,11 +467,6 @@ public class LoadManager implements Writable{
         }
     }
 
-    // only for those jobs which transaction is not started
-    public void processTimeoutJobs() {
-        idToLoadJob.values().stream().forEach(entity -> entity.processTimeout());
-    }
-
     // only for those jobs which have etl state, like SparkLoadJob
     public void processEtlStateJobs() {
         idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.ETL))
@@ -475,7 +476,7 @@ public class LoadManager implements Writable{
                     } catch (DataQualityException e) {
                         LOG.info("update load job etl status failed. job id: {}", job.getId(), e);
                         job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG),
-                                                  true, true);
+                                true, true);
                     } catch (UserException e) {
                         LOG.warn("update load job etl status failed. job id: {}", job.getId(), e);
                         job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
@@ -536,7 +537,7 @@ public class LoadManager implements Writable{
             List<LoadJob> loadJobList = Lists.newArrayList();
             if (Strings.isNullOrEmpty(labelValue)) {
                 loadJobList.addAll(labelToLoadJobs.values()
-                                           .stream().flatMap(Collection::stream).collect(Collectors.toList()));
+                        .stream().flatMap(Collection::stream).collect(Collectors.toList()));
             } else {
                 // check label value
                 if (accurateMatch) {
@@ -667,7 +668,7 @@ public class LoadManager implements Writable{
     }
 
     public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds,
-            List<Long> relatedBackendIds) {
+                                List<Long> relatedBackendIds) {
         LoadJob job = idToLoadJob.get(jobId);
         if (job != null) {
             job.initLoadProgress(loadId, fragmentIds, relatedBackendIds);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
deleted file mode 100644
index 2f75e1f..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.load.loadv2;
-
-import org.apache.doris.common.Config;
-import org.apache.doris.common.util.MasterDaemon;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * LoadTimeoutChecker will try to cancel the timeout load jobs.
- * And it will not handle the job which the corresponding transaction is started.
- * For those jobs, global transaction manager cancel the corresponding job while aborting the timeout transaction.
- */
-public class LoadTimeoutChecker extends MasterDaemon {
-    private static final Logger LOG = LogManager.getLogger(LoadTimeoutChecker.class);
-
-    private LoadManager loadManager;
-
-    public LoadTimeoutChecker(LoadManager loadManager) {
-        super("Load job timeout checker", Config.load_checker_interval_second * 1000);
-        this.loadManager = loadManager;
-    }
-
-    @Override
-    protected void runAfterCatalogReady() {
-        try {
-            loadManager.processTimeoutJobs();
-        } catch (Throwable e) {
-            LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e);
-        }
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index ba63bb0..85e1235 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -108,8 +108,9 @@ public class SparkLoadPendingTask extends LoadTask {
     }
 
     @Override
-    void executeTask() throws LoadException {
+    void executeTask() throws UserException {
         LOG.info("begin to execute spark pending task. load job id: {}", loadJobId);
+        ((SparkLoadJob) callback).beginTxn();
         submitEtlJob();
     }
 
@@ -160,7 +161,7 @@ public class SparkLoadPendingTask extends LoadTask {
                     List<EtlIndex> etlIndexes = createEtlIndexes(table);
                     // partition info
                     EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(table,
-                                                                               tableIdToPartitionIds.get(tableId));
+                            tableIdToPartitionIds.get(tableId));
                     etlTable = new EtlTable(etlIndexes, etlPartitionInfo);
                     tables.put(tableId, etlTable);
 
@@ -317,7 +318,7 @@ public class SparkLoadPendingTask extends LoadTask {
         }
 
         return new EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue,
-                             stringLength, precision, scale);
+                stringLength, precision, scale);
     }
 
     private EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set<Long> partitionIds) throws LoadException {
@@ -448,7 +449,7 @@ public class SparkLoadPendingTask extends LoadTask {
         if (columnToHadoopFunction != null) {
             for (Map.Entry<String, Pair<String, List<String>>> entry : columnToHadoopFunction.entrySet()) {
                 columnMappings.put(entry.getKey(),
-                                   new EtlColumnMapping(entry.getValue().first, entry.getValue().second));
+                        new EtlColumnMapping(entry.getValue().first, entry.getValue().second));
             }
         }
         for (ImportColumnDesc columnDesc : copiedColumnExprList) {
@@ -499,13 +500,13 @@ public class SparkLoadPendingTask extends LoadTask {
         EtlFileGroup etlFileGroup = null;
         if (fileGroup.isLoadFromTable()) {
             etlFileGroup = new EtlFileGroup(SourceType.HIVE, hiveDbTableName, hiveTableProperties,
-                                            fileGroup.isNegative(), columnMappings, where, partitionIds);
+                    fileGroup.isNegative(), columnMappings, where, partitionIds);
         } else {
             etlFileGroup = new EtlFileGroup(SourceType.FILE, fileGroup.getFilePaths(), fileFieldNames,
-                                            fileGroup.getColumnsFromPath(), fileGroup.getValueSeparator(),
-                                            fileGroup.getLineDelimiter(), fileGroup.isNegative(),
-                                            fileGroup.getFileFormat(), columnMappings,
-                                            where, partitionIds);
+                    fileGroup.getColumnsFromPath(), fileGroup.getValueSeparator(),
+                    fileGroup.getLineDelimiter(), fileGroup.isNegative(),
+                    fileGroup.getFileFormat(), columnMappings,
+                    where, partitionIds);
         }
 
         return etlFileGroup;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
index 86c40b4..81a8597 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
@@ -52,8 +52,8 @@ public class MasterTaskExecutor {
         scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric);
     }
 
-    public boolean isTaskQueueFull() {
-        return executor.getQueue().remainingCapacity() == 0;
+    public boolean hasIdleThread() {
+        return executor.getActiveCount() < executor.getMaximumPoolSize();
     }
 
     public void start() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index d09fe31..866841c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -325,7 +325,6 @@ public class DatabaseTransactionMgr {
         }
     }
 
-
     private void checkDatabaseDataQuota() throws MetaNotFoundException, QuotaExceedException {
         Database db = catalog.getDbOrMetaException(dbId);
 
@@ -704,17 +703,17 @@ public class DatabaseTransactionMgr {
         List<Long> tableIdList = transactionState.getTableIdList();
         // to be compatiable with old meta version, table List may be empty
         if (tableIdList.isEmpty()) {
-           readLock();
-           try {
-               for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
-                   long tableId = tableCommitInfo.getTableId();
-                   if (!tableIdList.contains(tableId)) {
-                       tableIdList.add(tableId);
-                   }
-               }
-           } finally {
-               readUnlock();
-           }
+            readLock();
+            try {
+                for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
+                    long tableId = tableCommitInfo.getTableId();
+                    if (!tableIdList.contains(tableId)) {
+                        tableIdList.add(tableId);
+                    }
+                }
+            } finally {
+                readUnlock();
+            }
         }
 
         List<Table> tableList = db.getTablesOnIdOrderWithIgnoringWrongTableId(tableIdList);
@@ -827,7 +826,7 @@ public class DatabaseTransactionMgr {
                                 LOG.info("publish version failed for transaction {} on tablet {}, with only {} replicas less than quorum {}",
                                         transactionState, tablet, healthReplicaNum, quorumReplicaNum);
                                 String errMsg = String.format("publish on tablet %d failed. succeed replica num %d less than quorum %d."
-                                        + " table: %d, partition: %d, publish version: %d",
+                                                + " table: %d, partition: %d, publish version: %d",
                                         tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId, partitionId, partition.getVisibleVersion() + 1);
                                 transactionState.setErrorMsg(errMsg);
                                 hasError = true;
@@ -865,8 +864,8 @@ public class DatabaseTransactionMgr {
     }
 
     protected void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
-                                               Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
-                                               Database db) {
+                                                Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
+                                                Database db) {
         // transaction state is modified during check if the transaction could committed
         if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
             return;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index c4dc3f9..880965c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -93,7 +93,7 @@ public class LoadJobTest {
     public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr,
                             @Mocked MasterTaskExecutor masterTaskExecutor)
             throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException,
-            QuotaExceedException, MetaNotFoundException {
+            QuotaExceedException, MetaNotFoundException, InterruptedException {
         LoadJob loadJob = new BrokerLoadJob();
         new Expectations() {
             {
@@ -110,9 +110,7 @@ public class LoadJobTest {
         } catch (LoadException e) {
             Assert.fail(e.getMessage());
         }
-        Assert.assertEquals(JobState.LOADING, loadJob.getState());
-        Assert.assertEquals(1, loadJob.getTransactionId());
-
+        Assert.assertEquals(JobState.PENDING, loadJob.getState());
     }
 
     @Test
@@ -169,12 +167,12 @@ public class LoadJobTest {
     @Test
     public void testUpdateStateToFinished(@Mocked MetricRepo metricRepo,
                                           @Injectable LoadTask loadTask1,
-            @Mocked LongCounterMetric longCounterMetric) {
-        
+                                          @Mocked LongCounterMetric longCounterMetric) {
+
         MetricRepo.COUNTER_LOAD_FINISHED = longCounterMetric;
         LoadJob loadJob = new BrokerLoadJob();
         loadJob.idToTasks.put(1L, loadTask1);
-        
+
         // TxnStateCallbackFactory factory = Catalog.getCurrentCatalog().getGlobalTransactionMgr().getCallbackFactory();
         Catalog catalog = Catalog.getCurrentCatalog();
         GlobalTransactionMgr mgr = new GlobalTransactionMgr(catalog);
@@ -183,7 +181,7 @@ public class LoadJobTest {
         loadJob.updateState(JobState.FINISHED);
         Assert.assertEquals(JobState.FINISHED, loadJob.getState());
         Assert.assertNotEquals(-1, (long) Deencapsulation.getField(loadJob, "finishTimestamp"));
-        Assert.assertEquals(100, (int)Deencapsulation.getField(loadJob, "progress"));
+        Assert.assertEquals(100, (int) Deencapsulation.getField(loadJob, "progress"));
         Assert.assertEquals(0, loadJob.idToTasks.size());
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index 578d3b8..07463d8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -38,9 +38,7 @@ import org.apache.doris.catalog.ResourceMgr;
 import org.apache.doris.catalog.SparkResource;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
-import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DataQualityException;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -60,8 +58,6 @@ import org.apache.doris.task.PushTask;
 import org.apache.doris.thrift.TEtlState;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TabletCommitInfo;
-import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -200,12 +196,6 @@ public class SparkLoadJobTest {
                             @Injectable MasterTaskExecutor executor) throws Exception {
         new Expectations() {
             {
-                Catalog.getCurrentGlobalTransactionMgr();
-                result = transactionMgr;
-                transactionMgr.beginTransaction(dbId, Lists.newArrayList(), label, null,
-                                                (TransactionState.TxnCoordinator) any, LoadJobSourceType.FRONTEND,
-                                                anyLong, anyLong);
-                result = transactionId;
                 pendingTask.init();
                 pendingTask.getSignature();
                 result = pendingTaskId;
@@ -220,9 +210,7 @@ public class SparkLoadJobTest {
         SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
         job.execute();
 
-        // check transaction id and id to tasks
-        Assert.assertEquals(transactionId, job.getTransactionId());
-        Assert.assertTrue(job.idToTasks.containsKey(pendingTaskId));
+        Assert.assertEquals(JobState.PENDING, job.getState());
     }
 
     @Test
@@ -272,7 +260,7 @@ public class SparkLoadJobTest {
         new Expectations() {
             {
                 handler.getEtlJobStatus((SparkLoadAppHandle) any, appId, anyLong, etlOutputPath,
-                                        (SparkResource) any, (BrokerDesc) any);
+                        (SparkResource) any, (BrokerDesc) any);
                 result = status;
             }
         };
@@ -295,7 +283,7 @@ public class SparkLoadJobTest {
         new Expectations() {
             {
                 handler.getEtlJobStatus((SparkLoadAppHandle) any, appId, anyLong, etlOutputPath,
-                                        (SparkResource) any, (BrokerDesc) any);
+                        (SparkResource) any, (BrokerDesc) any);
                 result = status;
             }
         };
@@ -315,7 +303,7 @@ public class SparkLoadJobTest {
         new Expectations() {
             {
                 handler.getEtlJobStatus((SparkLoadAppHandle) any, appId, anyLong, etlOutputPath,
-                                        (SparkResource) any, (BrokerDesc) any);
+                        (SparkResource) any, (BrokerDesc) any);
                 result = status;
             }
         };
@@ -337,7 +325,7 @@ public class SparkLoadJobTest {
         status.getCounters().put("dpp.abnorm.ALL", "1");
         Map<String, Long> filePathToSize = Maps.newHashMap();
         String filePath = String.format("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/V1.label6.%d.%d.%d.0.%d.parquet",
-                                        tableId, partitionId, indexId, schemaHash);
+                tableId, partitionId, indexId, schemaHash);
         long fileSize = 6L;
         filePathToSize.put(filePath, fileSize);
         PartitionInfo partitionInfo = new RangePartitionInfo();
@@ -346,7 +334,7 @@ public class SparkLoadJobTest {
         new Expectations() {
             {
                 handler.getEtlJobStatus((SparkLoadAppHandle) any, appId, anyLong, etlOutputPath,
-                                        (SparkResource) any, (BrokerDesc) any);
+                        (SparkResource) any, (BrokerDesc) any);
                 result = status;
                 handler.getEtlFilePaths(etlOutputPath, (BrokerDesc) any);
                 result = filePathToSize;
@@ -380,7 +368,7 @@ public class SparkLoadJobTest {
                 Catalog.getCurrentGlobalTransactionMgr();
                 result = transactionMgr;
                 transactionMgr.commitTransaction(dbId, (List<Table>) any, transactionId, (List<TabletCommitInfo>) any,
-                                                 (LoadJobFinalOperation) any);
+                        (LoadJobFinalOperation) any);
             }
         };
 
@@ -466,7 +454,7 @@ public class SparkLoadJobTest {
         loadStartTimestamp = 1592388888L;
         String tabletMeta = String.format("%d.%d.%d.0.%d", tableId, partitionId, indexId, schemaHash);
         String filePath = String.format("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/V1.label6.%d.%d.%d.0.%d.parquet",
-                                        tableId, partitionId, indexId, schemaHash);
+                tableId, partitionId, indexId, schemaHash);
         long fileSize = 6L;
         tabletMetaToFileInfo.put(tabletMeta, Pair.create(filePath, fileSize));
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java
index e2f88b8..216706b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java
@@ -40,6 +40,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.LoadException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo;
@@ -73,7 +74,7 @@ public class SparkLoadPendingTaskTest {
                                 @Injectable BrokerDesc brokerDesc,
                                 @Mocked Catalog catalog, @Injectable SparkLoadAppHandle handle,
                                 @Injectable Database database,
-                                @Injectable OlapTable table) throws LoadException {
+                                @Injectable OlapTable table) throws UserException {
         long dbId = 0L;
         long tableId = 1L;
 
@@ -97,7 +98,7 @@ public class SparkLoadPendingTaskTest {
         Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
         List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
         DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
-                                                   null, null, null, false, null);
+                null, null, null, false, null);
         BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc);
         brokerFileGroups.add(brokerFileGroup);
         BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null);
@@ -173,9 +174,9 @@ public class SparkLoadPendingTaskTest {
         int distributionColumnIndex = 1;
         DistributionInfo distributionInfo = new HashDistributionInfo(3, Lists.newArrayList(columns.get(distributionColumnIndex)));
         Partition partition1 = new Partition(partition1Id, "p1", null,
-                                             distributionInfo);
+                distributionInfo);
         Partition partition2 = new Partition(partition2Id, "p2", null,
-                                             new HashDistributionInfo(4, Lists.newArrayList(columns.get(distributionColumnIndex))));
+                new HashDistributionInfo(4, Lists.newArrayList(columns.get(distributionColumnIndex))));
         int partitionColumnIndex = 0;
         List<Partition> partitions = Lists.newArrayList(partition1, partition2);
         RangePartitionInfo partitionInfo = new RangePartitionInfo(Lists.newArrayList(columns.get(partitionColumnIndex)));
@@ -192,7 +193,7 @@ public class SparkLoadPendingTaskTest {
         Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
         List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
         DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
-                                                   null, null, null, false, null);
+                null, null, null, false, null);
         BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc);
         brokerFileGroups.add(brokerFileGroup);
         BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org