You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/10 07:38:54 UTC

[kylin] branch master updated: KYLIN-3892 Set cubing job priority

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ac5a84  KYLIN-3892 Set cubing job priority
7ac5a84 is described below

commit 7ac5a8449621c191b9e1bcdeaa61a31e144e24a3
Author: Temple Zhou <db...@gmail.com>
AuthorDate: Tue Apr 9 17:31:44 2019 +0800

    KYLIN-3892 Set cubing job priority
---
 .../org/apache/kylin/engine/EngineFactory.java     |  6 ++++-
 .../apache/kylin/engine/IBatchCubingEngine.java    |  2 +-
 .../org/apache/kylin/job/dao/ExecutablePO.java     | 11 +++++++++
 .../kylin/job/execution/AbstractExecutable.java    | 16 +++++++++++++
 .../kylin/job/execution/ExecutableManager.java     |  2 ++
 .../job/impl/threadpool/DefaultFetcherRunner.java  | 25 +++++++------------
 .../kylin/job/impl/threadpool/FetcherRunner.java   | 24 +++++++++++++++++++
 .../job/impl/threadpool/PriorityFetcherRunner.java | 28 +++++++---------------
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  8 +++++--
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  6 +++++
 .../kylin/engine/mr/MRBatchCubingEngine2.java      |  4 ++--
 .../engine/spark/SparkBatchCubingEngine2.java      |  4 ++--
 .../engine/spark/SparkBatchCubingJobBuilder2.java  | 10 +++++++-
 .../kylin/rest/controller/CubeController.java      |  8 +++----
 .../apache/kylin/rest/request/JobBuildRequest.java | 10 ++++++++
 .../kylin/rest/request/JobBuildRequest2.java       |  9 +++++++
 .../org/apache/kylin/rest/service/JobService.java  | 10 ++++----
 .../org/apache/kylin/tool/job/CubeBuildingCLI.java |  4 ++--
 18 files changed, 131 insertions(+), 56 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 734c470..63eb09a 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -54,7 +54,11 @@ public class EngineFactory {
 
     /** Build a new cube segment, typically its time range appends to the end of current cube. */
     public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);
+        return createBatchCubingJob(newSegment, submitter, 0);
+    }
+
+    public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+        return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter, priorityOffset);
     }
 
     /** Merge multiple small segments into a big one. */
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index a618eac..d259a0e 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -32,7 +32,7 @@ public interface IBatchCubingEngine {
     public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment);
 
     /** Build a new cube segment, typically its time range appends to the end of current cube. */
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset);
 
     /** Merge multiple small segments into a big one. */
     public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
index f48c876..1238b66 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -48,6 +48,9 @@ public class ExecutablePO extends RootPersistentEntity {
     @JsonProperty("params")
     private Map<String, String> params = Maps.newHashMap();
 
+    @JsonProperty("priority")
+    private Integer priority;
+
     public String getName() {
         return name;
     }
@@ -88,4 +91,12 @@ public class ExecutablePO extends RootPersistentEntity {
         this.params = params;
     }
 
+    public Integer getPriority() {
+        return priority;
+    }
+
+    public void setPriority(Integer priority) {
+        this.priority = priority;
+    }
+    
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 8e1b262..51d9bf7 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -70,6 +70,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     private String id;
     private AbstractExecutable parentExecutable = null;
     private Map<String, String> params = Maps.newHashMap();
+    protected Integer priority;
 
     public AbstractExecutable() {
         setId(RandomUtil.randomUUID().toString());
@@ -491,6 +492,21 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return DEFAULT_PRIORITY;
     }
 
+    public Integer getPriority() {
+        return priority == null ? getDefaultPriority() : priority;
+    }
+
+    public void setPriority(Integer priority) {
+        this.priority = priority;
+    }
+
+    /**
+     * The different jobs have different default priorities.
+     * */
+    public void setPriorityBasedOnPriorityOffset(Integer priorityOffset) {
+        this.priority = getDefaultPriority() + (priorityOffset == null ? 0 : priorityOffset);
+    }
+
     /*
     * discarded is triggered by JobService, the Scheduler is not awake of that
     *
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index b1244b5..5837bd5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -77,6 +77,7 @@ public class ExecutableManager {
         result.setUuid(executable.getId());
         result.setType(executable.getClass().getName());
         result.setParams(executable.getParams());
+        result.setPriority(executable.getPriority());
         if (executable instanceof ChainedExecutable) {
             List<ExecutablePO> tasks = Lists.newArrayList();
             for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
@@ -569,6 +570,7 @@ public class ExecutableManager {
         result.setId(executablePO.getUuid());
         result.setName(executablePO.getName());
         result.setParams(executablePO.getParams());
+        result.setPriority(executablePO.getPriority());
 
         if (!(result instanceof BrokenExecutable)) {
             List<ExecutablePO> tasks = executablePO.getTasks();
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index 21cd8e9..04e40a5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -47,7 +47,13 @@ public class DefaultFetcherRunner extends FetcherRunner {
                 return;
             }
 
-            int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+            nRunning = 0;
+            nReady = 0;
+            nStopped = 0;
+            nOthers = 0;
+            nError = 0;
+            nDiscarded = 0;
+            nSUCCEED = 0;
             for (final String id : getExecutableManger().getAllJobIdsInCache()) {
                 if (isJobPoolFull()) {
                     return;
@@ -61,22 +67,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
                 final Output outputDigest = getExecutableManger().getOutputDigest(id);
                 if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
-                    if (outputDigest.getState() == ExecutableState.SUCCEED) {
-                        nSUCCEED++;
-                    } else if (outputDigest.getState() == ExecutableState.ERROR) {
-                        nError++;
-                    } else if (outputDigest.getState() == ExecutableState.DISCARDED) {
-                        nDiscarded++;
-                    } else if (outputDigest.getState() == ExecutableState.STOPPED) {
-                        nStopped++;
-                    } else {
-                        if (fetchFailed) {
-                            getExecutableManger().forceKillJob(id);
-                            nError++;
-                        } else {
-                            nOthers++;
-                        }
-                    }
+                    jobStateCount(id);
                     continue;
                 }
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
index 9d8f20e..36d6250 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
@@ -24,6 +24,8 @@ import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
 import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +39,7 @@ public abstract class FetcherRunner implements Runnable {
     protected DefaultContext context;
     protected JobExecutor jobExecutor;
     protected volatile boolean fetchFailed = false;
+    protected static int nRunning, nReady, nStopped, nOthers, nError, nDiscarded, nSUCCEED;
 
     public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) {
         this.jobEngineConfig = jobEngineConfig;
@@ -66,6 +69,27 @@ public abstract class FetcherRunner implements Runnable {
             logger.warn(jobDesc + " fail to schedule", ex);
         }
     }
+    
+    protected void jobStateCount(String id) {
+        final Output outputDigest = getExecutableManger().getOutputDigest(id);
+        // logger.debug("Job id:" + id + " not runnable");
+        if (outputDigest.getState() == ExecutableState.SUCCEED) {
+            nSUCCEED++;
+        } else if (outputDigest.getState() == ExecutableState.ERROR) {
+            nError++;
+        } else if (outputDigest.getState() == ExecutableState.DISCARDED) {
+            nDiscarded++;
+        } else if (outputDigest.getState() == ExecutableState.STOPPED) {
+            nStopped++;
+        } else {
+            if (fetchFailed) {
+                getExecutableManger().forceKillJob(id);
+                nError++;
+            } else {
+                nOthers++;
+            }
+        }
+    }
 
     @VisibleForTesting
     void setFetchFailed(boolean fetchFailed) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index 0792ed0..22732ea 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -83,7 +83,13 @@ public class PriorityFetcherRunner extends FetcherRunner {
                         executableWithPriority.getSecond() + 1);
             }
 
-            int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+            nRunning = 0;
+            nReady = 0;
+            nStopped = 0;
+            nOthers = 0;
+            nError = 0;
+            nDiscarded = 0;
+            nSUCCEED = 0;
             for (final String id : getExecutableManger().getAllJobIdsInCache()) {
                 if (runningJobs.containsKey(id)) {
                     // logger.debug("Job id:" + id + " is already running");
@@ -93,23 +99,7 @@ public class PriorityFetcherRunner extends FetcherRunner {
 
                 final Output outputDigest = getExecutableManger().getOutputDigest(id);
                 if ((outputDigest.getState() != ExecutableState.READY)) {
-                    // logger.debug("Job id:" + id + " not runnable");
-                    if (outputDigest.getState() == ExecutableState.SUCCEED) {
-                        nSUCCEED++;
-                    } else if (outputDigest.getState() == ExecutableState.ERROR) {
-                        nError++;
-                    } else if (outputDigest.getState() == ExecutableState.DISCARDED) {
-                        nDiscarded++;
-                    } else if (outputDigest.getState() == ExecutableState.STOPPED) {
-                        nStopped++;
-                    } else {
-                        if (fetchFailed) {
-                            getExecutableManger().forceKillJob(id);
-                            nError++;
-                        } else {
-                            nOthers++;
-                        }
-                    }
+                    jobStateCount(id);
                     continue;
                 }
 
@@ -122,7 +112,7 @@ public class PriorityFetcherRunner extends FetcherRunner {
                 nReady++;
                 Integer priority = leftJobPriorities.get(id);
                 if (priority == null) {
-                    priority = executable.getDefaultPriority();
+                    priority = executable.getPriority();
                 }
                 jobPriorityQueue.add(new Pair<>(executable, priority));
             }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 1695a22..0e78e9c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -42,8 +42,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
     private final IMRBatchCubingInputSide inputSide;
     private final IMRBatchCubingOutputSide2 outputSide;
 
-    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
-        super(newSegment, submitter);
+    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+        super(newSegment, submitter, priorityOffset);
         this.inputSide = MRUtil.getBatchCubingInputSide(seg);
         this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
     }
@@ -86,6 +86,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
         inputSide.addStepPhase4_Cleanup(result);
         outputSide.addStepPhase4_Cleanup(result);
+        
+        // Set the task priority if specified
+        result.setPriorityBasedOnPriorityOffset(priorityOffset);
+        result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));
 
         return result;
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 11c7d36..4a83dea 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -59,6 +59,7 @@ public class JobBuilderSupport {
     final protected JobEngineConfig config;
     final protected CubeSegment seg;
     final protected String submitter;
+    final protected Integer priorityOffset;
 
     final public static String LayeredCuboidFolderPrefix = "level_";
 
@@ -68,10 +69,15 @@ public class JobBuilderSupport {
     final public static Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
 
     public JobBuilderSupport(CubeSegment seg, String submitter) {
+        this(seg, submitter, 0);
+    }
+
+    public JobBuilderSupport(CubeSegment seg, String submitter, Integer priorityOffset) {
         Preconditions.checkNotNull(seg, "segment cannot be null");
         this.config = new JobEngineConfig(seg.getConfig());
         this.seg = seg;
         this.submitter = submitter;
+        this.priorityOffset = priorityOffset;
     }
 
     public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
index 665e791..4aceae0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -38,8 +38,8 @@ public class MRBatchCubingEngine2 implements IBatchCubingEngine {
     }
 
     @Override
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return new BatchCubingJobBuilder2(newSegment, submitter).build();
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+        return new BatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build();
     }
 
     @Override
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
index 47316b4..d3afb03 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
@@ -40,8 +40,8 @@ public class SparkBatchCubingEngine2 implements IBatchCubingEngine {
     }
 
     @Override
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return new SparkBatchCubingJobBuilder2(newSegment, submitter).build();
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+        return new SparkBatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build();
     }
 
     @Override
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 089ca24..1d2e78e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -45,7 +45,11 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
     private final ISparkOutput.ISparkBatchCubingOutputSide outputSide;
 
     public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
-        super(newSegment, submitter);
+        this(newSegment, submitter, 0);
+    }
+    
+    public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+        super(newSegment, submitter, priorityOffset);
         this.inputSide = SparkUtil.getBatchCubingInputSide(seg);
         this.outputSide = SparkUtil.getBatchCubingOutputSide(seg);
     }
@@ -89,6 +93,10 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         inputSide.addStepPhase4_Cleanup(result);
         outputSide.addStepPhase4_Cleanup(result);
 
+        // Set the task priority if specified
+        result.setPriorityBasedOnPriorityOffset(priorityOffset);
+        result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));
+
         return result;
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 08b03a4..c3f45a6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -352,7 +352,7 @@ public class CubeController extends BasicController {
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
         return buildInternal(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null,
-                req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
+                req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment(), req.getPriorityOffset());
     }
 
     /**
@@ -380,19 +380,19 @@ public class CubeController extends BasicController {
     public JobInstance rebuild2(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
         return buildInternal(cubeName, null, new SegmentRange(req.getSourceOffsetStart(), req.getSourceOffsetEnd()),
                 req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(), req.getBuildType(),
-                req.isForce());
+                req.isForce(), req.getPriorityOffset());
     }
 
     private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, //
             Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
-            String buildType, boolean force) {
+            String buildType, boolean force, Integer priorityOffset) {
         try {
             String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
             CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
 
             checkBuildingSegment(cube);
             return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
-                    CubeBuildTypeEnum.valueOf(buildType), force, submitter);
+                    CubeBuildTypeEnum.valueOf(buildType), force, submitter, priorityOffset);
         } catch (Throwable e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage(), e);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java
index 270d2d5..ff747a1 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java
@@ -30,6 +30,8 @@ public class JobBuildRequest {
 
     private boolean forceMergeEmptySegment = false;
     
+    private Integer priorityOffset = 0;
+    
     public long getStartTime() {
         return startTime;
     }
@@ -69,4 +71,12 @@ public class JobBuildRequest {
     public void setForceMergeEmptySegment(boolean forceMergeEmptySegment) {
         this.forceMergeEmptySegment = forceMergeEmptySegment;
     }
+
+    public Integer getPriorityOffset() {
+        return priorityOffset;
+    }
+
+    public void setPriorityOffset(Integer priorityOffset) {
+        this.priorityOffset = priorityOffset;
+    }
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
index a5986ad..ab20996 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
@@ -35,6 +35,8 @@ public class JobBuildRequest2 {
     private String buildType;
 
     private boolean force;
+    
+    private Integer priorityOffset = 0;
 
     public long getSourceOffsetStart() {
         return sourceOffsetStart;
@@ -84,4 +86,11 @@ public class JobBuildRequest2 {
         this.force = force;
     }
 
+    public Integer getPriorityOffset() {
+        return priorityOffset;
+    }
+
+    public void setPriorityOffset(Integer priorityOffset) {
+        this.priorityOffset = priorityOffset;
+    }
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 31a4119..8182f3d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -204,17 +204,17 @@ public class JobService extends BasicService implements InitializingBean {
 
     public JobInstance submitJob(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
             Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
-            CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException {
+            CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
         aclEvaluate.checkProjectOperationPermission(cube);
         JobInstance jobInstance = submitJobInternal(cube, tsRange, segRange, sourcePartitionOffsetStart,
-                sourcePartitionOffsetEnd, buildType, force, submitter);
+                sourcePartitionOffsetEnd, buildType, force, submitter, priorityOffset);
 
         return jobInstance;
     }
 
     public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
             Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, //
-            CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException {
+            CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
         Message msg = MsgPicker.getMsg();
 
         if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
@@ -238,13 +238,13 @@ public class JobService extends BasicService implements InitializingBean {
                         sourcePartitionOffsetEnd);
                 src = source.enrichSourcePartitionBeforeBuild(cube, src);
                 newSeg = getCubeManager().appendSegment(cube, src);
-                job = EngineFactory.createBatchCubingJob(newSeg, submitter);
+                job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
             } else if (buildType == CubeBuildTypeEnum.MERGE) {
                 newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force);
                 job = EngineFactory.createBatchMergeJob(newSeg, submitter);
             } else if (buildType == CubeBuildTypeEnum.REFRESH) {
                 newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange);
-                job = EngineFactory.createBatchCubingJob(newSeg, submitter);
+                job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
             } else {
                 throw new BadRequestException(String.format(Locale.ROOT, msg.getINVALID_BUILD_TYPE(), buildType));
             }
diff --git a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
index b3b1126..eb4f7f7 100644
--- a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
@@ -107,13 +107,13 @@ public class CubeBuildingCLI extends AbstractApplication {
 
         if (buildType == CubeBuildTypeEnum.BUILD) {
             CubeSegment newSeg = cubeManager.appendSegment(cube, tsRange);
-            job = EngineFactory.createBatchCubingJob(newSeg, submitter);
+            job = EngineFactory.createBatchCubingJob(newSeg, submitter, null);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = cubeManager.mergeSegments(cube, tsRange, null, forceMergeEmptySeg);
             job = EngineFactory.createBatchMergeJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.REFRESH) {
             CubeSegment refreshSeg = cubeManager.refreshSegment(cube, tsRange, null);
-            job = EngineFactory.createBatchCubingJob(refreshSeg, submitter);
+            job = EngineFactory.createBatchCubingJob(refreshSeg, submitter, null);
         } else {
             throw new JobException("invalid build type:" + buildType);
         }