You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/19 06:16:55 UTC

[doris] branch master updated: [Feature](broker-load) Support priority for Broker Load job. (#20628)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 63b9684696 [Feature](broker-load) Support priority for Broker Load job. (#20628)
63b9684696 is described below

commit 63b968469615b0e7b87d1f85de20449eab1429ab
Author: Xiangyu Wang <du...@gmail.com>
AuthorDate: Mon Jun 19 14:16:48 2023 +0800

    [Feature](broker-load) Support priority for Broker Load job. (#20628)
    
    Support priority for Broker Load job.
---
 .../Load/BROKER-LOAD.md                            |   9 +-
 .../Load/BROKER-LOAD.md                            |   8 +-
 .../java/org/apache/doris/analysis/LoadStmt.java   |  12 ++
 .../main/java/org/apache/doris/catalog/Env.java    |   8 +-
 .../org/apache/doris/common/ThreadPoolManager.java |  96 ++++++++++++
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   5 +-
 .../doris/load/loadv2/BrokerLoadPendingTask.java   |   4 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |   5 +
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   4 +-
 .../org/apache/doris/load/loadv2/LoadTask.java     |  30 +++-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |   2 +-
 .../doris/load/loadv2/SparkLoadPendingTask.java    |   4 +-
 .../org/apache/doris/task/MasterTaskExecutor.java  |  12 +-
 .../doris/task/PriorityMasterTaskExecutor.java     |  38 +++++
 .../doris/load/loadv2/BrokerLoadJobTest.java       |   8 +-
 .../load/loadv2/BrokerLoadPendingTaskTest.java     |   2 +-
 .../load/loadv2/SparkLoadPendingTaskTest.java      |   4 +-
 .../doris/task/PriorityMasterTaskExecutorTest.java | 162 +++++++++++++++++++++
 18 files changed, 391 insertions(+), 22 deletions(-)

diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 9c6f41f8e4..50b859113a 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -186,8 +186,15 @@ WITH BROKER broker_name
   - `load_to_single_tablet`
   
     Boolean type, True means that one task can only load data to one tablet in the corresponding partition at a time. The default value is false. The number of tasks for the job depends on the overall concurrency. This parameter can only be set when loading data into the OLAP table with random partition.
+    
+  - <version since="dev" type="inline"> priority </version>
+    
+    Set the priority of the load job, there are three options: `HIGH/NORMAL/LOW`, use `NORMAL` priority as default. The pending broker load jobs which have higher priority will be chosen to execute earlier.
+
 -  <version since="1.2.3" type="inline"> comment </version>
-    Specify the comment for the import job. The comment can be viewed in the `show load` statement.
+    
+   Specify the comment for the import job. The comment can be viewed in the `show load` statement.
+
 ### Example
 
 1. Import a batch of data from HDFS
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 988a8a3201..73387d12da 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -186,8 +186,14 @@ WITH BROKER broker_name
       
       布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,作业的任务数取决于整体并发度。该参数只允许在对带有random分区的olap表导数的时候设置。
 
+    - <version since="dev" type="inline"> priority </version>
+
+      设置导入任务的优先级,可选 `HIGH/NORMAL/LOW` 三种优先级,默认为 `NORMAL`,对于处在 `PENDING` 状态的导入任务,更高优先级的任务将优先被执行进入 `LOADING` 状态。
+
 -  <version since="1.2.3" type="inline"> comment </version>
-  - 指定导入任务的备注信息。可选参数。
+
+   指定导入任务的备注信息。可选参数。
+
 ### Example
 
 1. 从 HDFS 导入一批数据
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 34d2f2f064..d4401626cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -81,6 +81,7 @@ public class LoadStmt extends DdlStmt {
     public static final String TIMEZONE = "timezone";
     public static final String LOAD_PARALLELISM = "load_parallelism";
     public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
+    public static final String PRIORITY = "priority";
     public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
     // temp property, just make regression test happy.
     // should remove when Config.enable_new_load_scan_node is set to true by default.
@@ -209,6 +210,7 @@ public class LoadStmt extends DdlStmt {
                     return Boolean.valueOf(s);
                 }
             })
+            .put(PRIORITY, (Function<String, LoadTask.Priority>) s -> LoadTask.Priority.valueOf(s))
             .build();
 
     public LoadStmt(DataDescription dataDescription, Map<String, String> properties, String comment) {
@@ -363,6 +365,16 @@ public class LoadStmt extends DdlStmt {
                 throw new DdlException(SEND_BATCH_PARALLELISM + " is not a number.");
             }
         }
+
+        // priority
+        final String priority = properties.get(PRIORITY);
+        if (priority != null) {
+            try {
+                LoadTask.Priority.valueOf(priority);
+            } catch (IllegalArgumentException | NullPointerException e) {
+                throw new DdlException(PRIORITY + " must be in [LOW/NORMAL/HIGH].");
+            }
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index dc822b1470..ae8a07e79b 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -155,6 +155,7 @@ import org.apache.doris.load.loadv2.LoadJobScheduler;
 import org.apache.doris.load.loadv2.LoadLoadingChecker;
 import org.apache.doris.load.loadv2.LoadManager;
 import org.apache.doris.load.loadv2.LoadManagerAdapter;
+import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.loadv2.ProgressManager;
 import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.load.routineload.RoutineLoadScheduler;
@@ -225,6 +226,7 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.CompactionTask;
 import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.MasterTaskExecutor;
+import org.apache.doris.task.PriorityMasterTaskExecutor;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -409,7 +411,7 @@ public class Env {
 
     // Thread pools for pending and loading task, separately
     private MasterTaskExecutor pendingLoadTaskScheduler;
-    private MasterTaskExecutor loadingLoadTaskScheduler;
+    private PriorityMasterTaskExecutor<LoadTask> loadingLoadTaskScheduler;
 
     private LoadJobScheduler loadJobScheduler;
 
@@ -632,8 +634,8 @@ public class Env {
         // The loadingLoadTaskScheduler's queue size is unlimited, so that it can receive all loading tasks
         // created after pending tasks finish. And don't worry about the high concurrency, because the
         // concurrency is limited by Config.desired_max_waiting_jobs and Config.async_loading_load_task_pool_size.
-        this.loadingLoadTaskScheduler = new MasterTaskExecutor("loading-load-task-scheduler",
-                Config.async_loading_load_task_pool_size, Integer.MAX_VALUE, !isCheckpointCatalog);
+        this.loadingLoadTaskScheduler = new PriorityMasterTaskExecutor<>("loading-load-task-scheduler",
+                Config.async_loading_load_task_pool_size, LoadTask.COMPARATOR, LoadTask.class, !isCheckpointCatalog);
 
         this.loadJobScheduler = new LoadJobScheduler();
         this.loadManager = new LoadManager(loadJobScheduler);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 8d1b1e7da7..28c18618d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -26,12 +26,18 @@ import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
 
+import java.util.Comparator;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
@@ -120,6 +126,14 @@ public class ThreadPoolManager {
                 poolName, needRegisterMetric);
     }
 
+    public static <T> ThreadPoolExecutor newDaemonFixedPriorityThreadPool(int numThread, int initQueueSize,
+                                                                          Comparator<T> comparator, Class<T> tClass,
+                                                                          String poolName, boolean needRegisterMetric) {
+        return newDaemonPriorityThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+                    new PriorityBlockingQueue<>(initQueueSize), new BlockedPolicy(poolName, 60),
+                    comparator, tClass, poolName, needRegisterMetric);
+    }
+
     public static ThreadPoolExecutor newDaemonProfileThreadPool(int numThread, int queueSize, String poolName,
                                                                 boolean needRegisterMetric) {
         return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
@@ -144,6 +158,25 @@ public class ThreadPoolManager {
         return threadPool;
     }
 
+    public static <T> ThreadPoolExecutor newDaemonPriorityThreadPool(int corePoolSize,
+                                                                 int maximumPoolSize,
+                                                                 long keepAliveTime,
+                                                                 TimeUnit unit,
+                                                                 PriorityBlockingQueue<Runnable> workQueue,
+                                                                 RejectedExecutionHandler handler,
+                                                                 Comparator<T> comparator,
+                                                                 Class<T> tClass,
+                                                                 String poolName,
+                                                                 boolean needRegisterMetric) {
+        ThreadFactory threadFactory = namedThreadFactory(poolName);
+        ThreadPoolExecutor threadPool = new PriorityThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
+                    unit, workQueue, threadFactory, handler, comparator, tClass);
+        if (needRegisterMetric) {
+            nameToThreadPoolMap.put(poolName, threadPool);
+        }
+        return threadPool;
+    }
+
     // Now, we have no delay task num limit and thread num limit in ScheduledThreadPoolExecutor,
     // so it may cause oom when there are too many delay tasks or threads in ScheduledThreadPoolExecutor
     // Please use this api only for scheduling short task at fix rate.
@@ -165,6 +198,69 @@ public class ThreadPoolManager {
         return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(poolName + "-%d").build();
     }
 
+    private static class PriorityThreadPoolExecutor<T> extends ThreadPoolExecutor {
+
+        private final Comparator<T> comparator;
+        private final Class<T> tClass;
+
+        private PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+                                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+                                          RejectedExecutionHandler handler, Comparator<T> comparator, Class<T> tClass) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+            this.comparator = comparator;
+            this.tClass = tClass;
+        }
+
+        private static class ComparableFutureTask<V, T> extends FutureTask<V>
+                    implements Comparable<ComparableFutureTask<V, T>> {
+
+            private final @NotNull T t;
+            private final Comparator<T> comparator;
+
+            public ComparableFutureTask(@NotNull Callable task, Comparator<T> comparator) {
+                super(task);
+                this.t = (T) task;
+                this.comparator = comparator;
+            }
+
+            public ComparableFutureTask(@NotNull Runnable task, V result, Comparator<T> comparator) {
+                super(task, result);
+                this.t = (T) task;
+                this.comparator = comparator;
+            }
+
+            @Override
+            public int compareTo(@NotNull ComparableFutureTask<V, T> other) {
+                return comparator.compare(t, other.t);
+            }
+
+        }
+
+        @Override
+        protected <V> RunnableFuture<V> newTaskFor(Runnable task, V value) {
+            if (!tClass.isInstance(task)) {
+                throw new RejectedExecutionException("Task must be an instance of [" + tClass.getName() + "]");
+            }
+            return new ComparableFutureTask<>(task, value, comparator);
+        }
+
+        @Override
+        protected <V> RunnableFuture<V> newTaskFor(Callable<V> task) {
+            if (!tClass.isInstance(task)) {
+                throw new RejectedExecutionException("Task must be an instance of [" + tClass.getName() + "]");
+            }
+            return new ComparableFutureTask<>(task, comparator);
+        }
+
+        @Override
+        public void execute(Runnable task) {
+            if (!(task instanceof ComparableFutureTask) && !tClass.isInstance(task)) {
+                throw new RejectedExecutionException("Task must be an instance of [" + tClass.getName() + "]");
+            }
+            super.execute(task);
+        }
+    }
+
     /**
      * A handler for rejected task that discards and log it, used for cached thread pool
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index ef41736dd0..c0bc098e46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -109,7 +109,8 @@ public class BrokerLoadJob extends BulkLoadJob {
 
     @Override
     protected void unprotectedExecuteJob() {
-        LoadTask task = new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc);
+        LoadTask task = new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(),
+                brokerDesc, getPriority());
         idToTasks.put(task.getSignature(), task);
         Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
     }
@@ -209,7 +210,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                         isStrictMode(), transactionId, this, getTimeZone(), getTimeout(),
                         getLoadParallelism(), getSendBatchParallelism(),
                         getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(),
-                        useNewLoadScanNode());
+                        useNewLoadScanNode(), getPriority());
 
                 UUID uuid = UUID.randomUUID();
                 TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
index b295a8f31e..7599c7d472 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
@@ -45,8 +45,8 @@ public class BrokerLoadPendingTask extends LoadTask {
 
     public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback,
                                  Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
-                                 BrokerDesc brokerDesc) {
-        super(loadTaskCallback, TaskType.PENDING);
+                                 BrokerDesc brokerDesc, Priority priority) {
+        super(loadTaskCallback, TaskType.PENDING, priority);
         this.retryTime = 3;
         this.attachment = new BrokerPendingTaskAttachment(signature);
         this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 010f6d5765..18c881c558 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -434,6 +434,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         jobProperties.put(LoadStmt.LOAD_PARALLELISM, Config.default_load_parallelism);
         jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 1);
         jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, false);
+        jobProperties.put(LoadStmt.PRIORITY, LoadTask.Priority.NORMAL);
     }
 
     public void isJobTypeRead(boolean jobTypeRead) {
@@ -1226,6 +1227,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         return (int) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM);
     }
 
+    public LoadTask.Priority getPriority() {
+        return (LoadTask.Priority) jobProperties.get(LoadStmt.PRIORITY);
+    }
+
     public boolean isSingleTabletLoadPerSink() {
         return (boolean) jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 13e8a5beaa..450972900f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -82,8 +82,8 @@ public class LoadLoadingTask extends LoadTask {
             long txnId, LoadTaskCallback callback, String timezone,
             long timeoutS, int loadParallelism, int sendBatchParallelism,
             boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink,
-            boolean useNewLoadScanNode) {
-        super(callback, TaskType.LOADING);
+            boolean useNewLoadScanNode, Priority priority) {
+        super(callback, TaskType.LOADING, priority);
         this.db = db;
         this.table = table;
         this.brokerDesc = brokerDesc;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
index 4536cac276..dbedb9facc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
@@ -28,6 +28,9 @@ import org.apache.doris.task.MasterTask;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Comparator;
+
+
 public abstract class LoadTask extends MasterTask {
 
     public enum MergeType {
@@ -41,18 +44,38 @@ public abstract class LoadTask extends MasterTask {
         LOADING
     }
 
+    public enum Priority {
+        HIGH(0),
+        NORMAL(1),
+        LOW(2);
+
+        Priority(int value) {
+            this.value = value;
+        }
+
+        private final int value;
+
+        public int getValue() {
+            return value;
+        }
+    }
+
     private static final Logger LOG = LogManager.getLogger(LoadTask.class);
+    public static final Comparator<LoadTask> COMPARATOR = Comparator.comparing(LoadTask::getPriorityValue)
+                .thenComparingLong(LoadTask::getSignature);
 
     protected TaskType taskType;
     protected LoadTaskCallback callback;
     protected TaskAttachment attachment;
     protected FailMsg failMsg = new FailMsg();
     protected int retryTime = 1;
+    protected final Priority priority;
 
-    public LoadTask(LoadTaskCallback callback, TaskType taskType) {
+    public LoadTask(LoadTaskCallback callback, TaskType taskType, Priority priority) {
         this.taskType = taskType;
         this.signature = Env.getCurrentEnv().getNextId();
         this.callback = callback;
+        this.priority = priority;
     }
 
     @Override
@@ -107,4 +130,9 @@ public abstract class LoadTask extends MasterTask {
     public TaskType getTaskType() {
         return taskType;
     }
+
+    public int getPriorityValue() {
+        return this.priority.value;
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 71e01c6af4..bbf06b21da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -212,7 +212,7 @@ public class SparkLoadJob extends BulkLoadJob {
 
         // create pending task
         LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), sparkResource,
-                brokerDesc);
+                brokerDesc, getPriority());
         task.init();
         idToTasks.put(task.getSignature(), task);
         Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index d3cfe466de..67825c0327 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -91,8 +91,8 @@ public class SparkLoadPendingTask extends LoadTask {
 
     public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
                                 Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
-                                SparkResource resource, BrokerDesc brokerDesc) {
-        super(loadTaskCallback, TaskType.PENDING);
+                                SparkResource resource, BrokerDesc brokerDesc, Priority priority) {
+        super(loadTaskCallback, TaskType.PENDING, priority);
         this.retryTime = 3;
         this.attachment = new SparkPendingTaskAttachment(signature);
         this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
index 440c45bb98..84558774f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java
@@ -35,9 +35,15 @@ import java.util.concurrent.TimeUnit;
 public class MasterTaskExecutor {
     private static final Logger LOG = LogManager.getLogger(MasterTaskExecutor.class);
 
-    private ThreadPoolExecutor executor;
-    private Map<Long, Future<?>> runningTasks;
-    public ScheduledThreadPoolExecutor scheduledThreadPool;
+    protected ThreadPoolExecutor executor;
+    protected Map<Long, Future<?>> runningTasks;
+    protected ScheduledThreadPoolExecutor scheduledThreadPool;
+
+    protected MasterTaskExecutor(String name, boolean needRegisterMetric) {
+        runningTasks = Maps.newHashMap();
+        scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1,
+                name + "_scheduler_thread_pool", needRegisterMetric);
+    }
 
     public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric) {
         executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "-pool",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PriorityMasterTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/PriorityMasterTaskExecutor.java
new file mode 100644
index 0000000000..d82c5df2dd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PriorityMasterTaskExecutor.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.common.ThreadPoolManager;
+
+import java.util.Comparator;
+
+public class PriorityMasterTaskExecutor<T> extends MasterTaskExecutor {
+
+    public PriorityMasterTaskExecutor(String name, int threadNum, Comparator<T> comparator,
+                                      Class<T> tClass, boolean needRegisterMetric) {
+        this(name, threadNum, threadNum * 2, comparator, tClass, needRegisterMetric);
+    }
+
+    public PriorityMasterTaskExecutor(String name, int threadNum, int initQueueSize,
+                                      Comparator<T> comparator, Class<T> tClass, boolean needRegisterMetric) {
+        super(name, needRegisterMetric);
+        executor = ThreadPoolManager.newDaemonFixedPriorityThreadPool(threadNum, initQueueSize, comparator, tClass,
+                name + "_pool", needRegisterMetric);
+    }
+
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index c64a616aaf..be377f094e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -132,6 +132,8 @@ public class BrokerLoadJobTest {
         List<DataDescription> dataDescriptionList = Lists.newArrayList();
         dataDescriptionList.add(dataDescription);
         BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
+        Map<String, String> properties = new HashMap<>();
+        properties.put(LoadStmt.PRIORITY, "HIGH");
 
         new Expectations() {
             {
@@ -171,6 +173,9 @@ public class BrokerLoadJobTest {
                 loadStmt.getEtlJobType();
                 minTimes = 0;
                 result = EtlJobType.BROKER;
+                loadStmt.getProperties();
+                minTimes = 0;
+                result = properties;
             }
         };
 
@@ -188,6 +193,7 @@ public class BrokerLoadJobTest {
             Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label"));
             Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state"));
             Assert.assertEquals(EtlJobType.BROKER, Deencapsulation.getField(brokerLoadJob, "jobType"));
+            Assert.assertEquals(brokerLoadJob.getPriority(), LoadTask.Priority.HIGH);
         } catch (DdlException e) {
             Assert.fail(e.getMessage());
         }
@@ -360,7 +366,7 @@ public class BrokerLoadJobTest {
         TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
         Profile jobProfile = new Profile("test", false);
         LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, 100, 100, false, 100,
-                callback, "", 100, 1, 1, true, jobProfile, false, false);
+                callback, "", 100, 1, 1, true, jobProfile, false, false, LoadTask.Priority.NORMAL);
         try {
             UserIdentity userInfo = new UserIdentity("root", "localhost");
             userInfo.setIsAnalyzed();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java
index cfc15dd65a..ee7a60ce0d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java
@@ -75,7 +75,7 @@ public class BrokerLoadPendingTaskTest {
             }
         };
 
-        BrokerLoadPendingTask brokerLoadPendingTask = new BrokerLoadPendingTask(brokerLoadJob, aggKeyToFileGroups, brokerDesc);
+        BrokerLoadPendingTask brokerLoadPendingTask = new BrokerLoadPendingTask(brokerLoadJob, aggKeyToFileGroups, brokerDesc, LoadTask.Priority.NORMAL);
         brokerLoadPendingTask.executeTask();
         BrokerPendingTaskAttachment brokerPendingTaskAttachment = Deencapsulation.getField(brokerLoadPendingTask, "attachment");
         Assert.assertEquals(1, brokerPendingTaskAttachment.getFileNumByTable(aggKey));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java
index 709d23bcf2..f02e29271d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java
@@ -134,7 +134,7 @@ public class SparkLoadPendingTaskTest {
             }
         };
 
-        SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc);
+        SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc, LoadTask.Priority.NORMAL);
         task.init();
         SparkPendingTaskAttachment attachment = Deencapsulation.getField(task, "attachment");
         Assert.assertEquals(null, attachment.getAppId());
@@ -222,7 +222,7 @@ public class SparkLoadPendingTaskTest {
             }
         };
 
-        SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc);
+        SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc, LoadTask.Priority.NORMAL);
         EtlJobConfig etlJobConfig = Deencapsulation.getField(task, "etlJobConfig");
         Assert.assertEquals(null, etlJobConfig);
         task.init();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/PriorityMasterTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/PriorityMasterTaskExecutorTest.java
new file mode 100644
index 0000000000..29d97e7cf8
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/PriorityMasterTaskExecutorTest.java
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+
+public class PriorityMasterTaskExecutorTest {
+    private static final Logger LOG = LoggerFactory.getLogger(PriorityMasterTaskExecutorTest.class);
+
+    // save the running order
+    public static List<MasterTask> runningOrderList = Lists.newCopyOnWriteArrayList();
+
+    private static final int THREAD_NUM = 1;
+
+    private PriorityMasterTaskExecutor<TestMasterTask> executor;
+
+    @Before
+    public void setUp() {
+        Comparator<TestMasterTask> comparator = Comparator.comparing(TestMasterTask::getPriority)
+                .thenComparingLong(TestMasterTask::getSignature);
+        executor = new PriorityMasterTaskExecutor("priority_master_task_executor_test", THREAD_NUM, comparator,
+                TestMasterTask.class, false);
+        executor.start();
+    }
+
+    @After
+    public void tearDown() {
+        if (executor != null) {
+            executor.close();
+        }
+    }
+
+    @Test
+    public void testSubmit() {
+
+        MasterTask errorTask = new ErrorMasterTask();
+        try {
+            executor.submit(errorTask);
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof RejectedExecutionException);
+            Assert.assertTrue(("Task must be an instance of [" + TestMasterTask.class.getName() + "]").equals(e.getMessage()));
+        }
+
+
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch finishLatch = new CountDownLatch(5);
+        // submit task
+        MasterTask task1 = new TestMasterTask(1L, 0, startLatch, finishLatch);
+        Assert.assertTrue(executor.submit(task1));
+        Assert.assertEquals(1, executor.getTaskNum());
+
+        // submit same running task error
+        Assert.assertFalse(executor.submit(task1));
+        Assert.assertEquals(1, executor.getTaskNum());
+
+        // submit some task with priority
+        MasterTask task5 = new TestMasterTask(5L, 1, startLatch, finishLatch);
+        executor.submit(task5);
+
+        MasterTask task2 = new TestMasterTask(2L, 1, startLatch, finishLatch);
+        executor.submit(task2);
+
+        MasterTask task4 = new TestMasterTask(4L, 0, startLatch, finishLatch);
+        executor.submit(task4);
+
+        MasterTask task3 = new TestMasterTask(3L, 0, startLatch, finishLatch);
+        executor.submit(task3);
+
+        // start running tasks
+        startLatch.countDown();
+
+        // wait for all tasks finish
+        try {
+            finishLatch.await();
+        } catch (InterruptedException interruptedException) {
+            interruptedException.printStackTrace();
+            Assert.fail();
+        }
+
+        // compare priority value first, the lower the higher priority
+        // then compare signature value, the lower the higher priority
+        Assert.assertTrue(runningOrderList.size() == 5);
+        Assert.assertTrue(runningOrderList.get(0) == task1);
+        Assert.assertTrue(runningOrderList.get(1) == task3);
+        Assert.assertTrue(runningOrderList.get(2) == task4);
+        Assert.assertTrue(runningOrderList.get(3) == task2);
+        Assert.assertTrue(runningOrderList.get(4) == task5);
+    }
+
+    private class ErrorMasterTask extends MasterTask {
+
+        @Override
+        protected void exec() {
+            // do nothing
+        }
+    }
+
+    private class TestMasterTask extends MasterTask {
+
+        private final long priority;
+        private final CountDownLatch startLatch;
+        private final CountDownLatch finishLatch;
+
+        public TestMasterTask(long signature, long priority, CountDownLatch startLatch, CountDownLatch finishLatch) {
+            this.signature = signature;
+            this.priority = priority;
+            this.startLatch = startLatch;
+            this.finishLatch = finishLatch;
+        }
+
+        public long getPriority() {
+            return priority;
+        }
+
+        public long getSignature() {
+            return signature;
+        }
+
+        @Override
+        protected void exec() {
+            runningOrderList.add(this);
+            LOG.info("run exec. signature: {}", signature);
+            try {
+                startLatch.await();
+                Thread.sleep(1000);
+            } catch (InterruptedException interruptedException) {
+                throw new RuntimeException(interruptedException);
+            } finally {
+                finishLatch.countDown();
+            }
+            LOG.info("run finish. signature: {}", signature);
+        }
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org