You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/10 07:38:54 UTC
[kylin] branch master updated: KYLIN-3892 Set cubing job priority
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 7ac5a84 KYLIN-3892 Set cubing job priority
7ac5a84 is described below
commit 7ac5a8449621c191b9e1bcdeaa61a31e144e24a3
Author: Temple Zhou <db...@gmail.com>
AuthorDate: Tue Apr 9 17:31:44 2019 +0800
KYLIN-3892 Set cubing job priority
---
.../org/apache/kylin/engine/EngineFactory.java | 6 ++++-
.../apache/kylin/engine/IBatchCubingEngine.java | 2 +-
.../org/apache/kylin/job/dao/ExecutablePO.java | 11 +++++++++
.../kylin/job/execution/AbstractExecutable.java | 16 +++++++++++++
.../kylin/job/execution/ExecutableManager.java | 2 ++
.../job/impl/threadpool/DefaultFetcherRunner.java | 25 +++++++------------
.../kylin/job/impl/threadpool/FetcherRunner.java | 24 +++++++++++++++++++
.../job/impl/threadpool/PriorityFetcherRunner.java | 28 +++++++---------------
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 8 +++++--
.../apache/kylin/engine/mr/JobBuilderSupport.java | 6 +++++
.../kylin/engine/mr/MRBatchCubingEngine2.java | 4 ++--
.../engine/spark/SparkBatchCubingEngine2.java | 4 ++--
.../engine/spark/SparkBatchCubingJobBuilder2.java | 10 +++++++-
.../kylin/rest/controller/CubeController.java | 8 +++----
.../apache/kylin/rest/request/JobBuildRequest.java | 10 ++++++++
.../kylin/rest/request/JobBuildRequest2.java | 9 +++++++
.../org/apache/kylin/rest/service/JobService.java | 10 ++++----
.../org/apache/kylin/tool/job/CubeBuildingCLI.java | 4 ++--
18 files changed, 131 insertions(+), 56 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 734c470..63eb09a 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -54,7 +54,11 @@ public class EngineFactory {
/** Build a new cube segment, typically its time range appends to the end of current cube. */
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);
+ return createBatchCubingJob(newSegment, submitter, 0);
+ }
+
+ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+ return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter, priorityOffset);
}
/** Merge multiple small segments into a big one. */
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index a618eac..d259a0e 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -32,7 +32,7 @@ public interface IBatchCubingEngine {
public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment);
/** Build a new cube segment, typically its time range appends to the end of current cube. */
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset);
/** Merge multiple small segments into a big one. */
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
index f48c876..1238b66 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -48,6 +48,9 @@ public class ExecutablePO extends RootPersistentEntity {
@JsonProperty("params")
private Map<String, String> params = Maps.newHashMap();
+ @JsonProperty("priority")
+ private Integer priority;
+
public String getName() {
return name;
}
@@ -88,4 +91,12 @@ public class ExecutablePO extends RootPersistentEntity {
this.params = params;
}
+ public Integer getPriority() {
+ return priority;
+ }
+
+ public void setPriority(Integer priority) {
+ this.priority = priority;
+ }
+
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 8e1b262..51d9bf7 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -70,6 +70,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
private String id;
private AbstractExecutable parentExecutable = null;
private Map<String, String> params = Maps.newHashMap();
+ protected Integer priority;
public AbstractExecutable() {
setId(RandomUtil.randomUUID().toString());
@@ -491,6 +492,21 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return DEFAULT_PRIORITY;
}
+ public Integer getPriority() {
+ return priority == null ? getDefaultPriority() : priority;
+ }
+
+ public void setPriority(Integer priority) {
+ this.priority = priority;
+ }
+
+ /**
+ * The different jobs have different default priorities.
+ * */
+ public void setPriorityBasedOnPriorityOffset(Integer priorityOffset) {
+ this.priority = getDefaultPriority() + (priorityOffset == null ? 0 : priorityOffset);
+ }
+
/*
* discarded is triggered by JobService, the Scheduler is not awake of that
*
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index b1244b5..5837bd5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -77,6 +77,7 @@ public class ExecutableManager {
result.setUuid(executable.getId());
result.setType(executable.getClass().getName());
result.setParams(executable.getParams());
+ result.setPriority(executable.getPriority());
if (executable instanceof ChainedExecutable) {
List<ExecutablePO> tasks = Lists.newArrayList();
for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
@@ -569,6 +570,7 @@ public class ExecutableManager {
result.setId(executablePO.getUuid());
result.setName(executablePO.getName());
result.setParams(executablePO.getParams());
+ result.setPriority(executablePO.getPriority());
if (!(result instanceof BrokenExecutable)) {
List<ExecutablePO> tasks = executablePO.getTasks();
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index 21cd8e9..04e40a5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -47,7 +47,13 @@ public class DefaultFetcherRunner extends FetcherRunner {
return;
}
- int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+ nRunning = 0;
+ nReady = 0;
+ nStopped = 0;
+ nOthers = 0;
+ nError = 0;
+ nDiscarded = 0;
+ nSUCCEED = 0;
for (final String id : getExecutableManger().getAllJobIdsInCache()) {
if (isJobPoolFull()) {
return;
@@ -61,22 +67,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
final Output outputDigest = getExecutableManger().getOutputDigest(id);
if ((outputDigest.getState() != ExecutableState.READY)) {
// logger.debug("Job id:" + id + " not runnable");
- if (outputDigest.getState() == ExecutableState.SUCCEED) {
- nSUCCEED++;
- } else if (outputDigest.getState() == ExecutableState.ERROR) {
- nError++;
- } else if (outputDigest.getState() == ExecutableState.DISCARDED) {
- nDiscarded++;
- } else if (outputDigest.getState() == ExecutableState.STOPPED) {
- nStopped++;
- } else {
- if (fetchFailed) {
- getExecutableManger().forceKillJob(id);
- nError++;
- } else {
- nOthers++;
- }
- }
+ jobStateCount(id);
continue;
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
index 9d8f20e..36d6250 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
@@ -24,6 +24,8 @@ import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@ public abstract class FetcherRunner implements Runnable {
protected DefaultContext context;
protected JobExecutor jobExecutor;
protected volatile boolean fetchFailed = false;
+ protected static int nRunning, nReady, nStopped, nOthers, nError, nDiscarded, nSUCCEED;
public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) {
this.jobEngineConfig = jobEngineConfig;
@@ -66,6 +69,27 @@ public abstract class FetcherRunner implements Runnable {
logger.warn(jobDesc + " fail to schedule", ex);
}
}
+
+ protected void jobStateCount(String id) {
+ final Output outputDigest = getExecutableManger().getOutputDigest(id);
+ // logger.debug("Job id:" + id + " not runnable");
+ if (outputDigest.getState() == ExecutableState.SUCCEED) {
+ nSUCCEED++;
+ } else if (outputDigest.getState() == ExecutableState.ERROR) {
+ nError++;
+ } else if (outputDigest.getState() == ExecutableState.DISCARDED) {
+ nDiscarded++;
+ } else if (outputDigest.getState() == ExecutableState.STOPPED) {
+ nStopped++;
+ } else {
+ if (fetchFailed) {
+ getExecutableManger().forceKillJob(id);
+ nError++;
+ } else {
+ nOthers++;
+ }
+ }
+ }
@VisibleForTesting
void setFetchFailed(boolean fetchFailed) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index 0792ed0..22732ea 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -83,7 +83,13 @@ public class PriorityFetcherRunner extends FetcherRunner {
executableWithPriority.getSecond() + 1);
}
- int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+ nRunning = 0;
+ nReady = 0;
+ nStopped = 0;
+ nOthers = 0;
+ nError = 0;
+ nDiscarded = 0;
+ nSUCCEED = 0;
for (final String id : getExecutableManger().getAllJobIdsInCache()) {
if (runningJobs.containsKey(id)) {
// logger.debug("Job id:" + id + " is already running");
@@ -93,23 +99,7 @@ public class PriorityFetcherRunner extends FetcherRunner {
final Output outputDigest = getExecutableManger().getOutputDigest(id);
if ((outputDigest.getState() != ExecutableState.READY)) {
- // logger.debug("Job id:" + id + " not runnable");
- if (outputDigest.getState() == ExecutableState.SUCCEED) {
- nSUCCEED++;
- } else if (outputDigest.getState() == ExecutableState.ERROR) {
- nError++;
- } else if (outputDigest.getState() == ExecutableState.DISCARDED) {
- nDiscarded++;
- } else if (outputDigest.getState() == ExecutableState.STOPPED) {
- nStopped++;
- } else {
- if (fetchFailed) {
- getExecutableManger().forceKillJob(id);
- nError++;
- } else {
- nOthers++;
- }
- }
+ jobStateCount(id);
continue;
}
@@ -122,7 +112,7 @@ public class PriorityFetcherRunner extends FetcherRunner {
nReady++;
Integer priority = leftJobPriorities.get(id);
if (priority == null) {
- priority = executable.getDefaultPriority();
+ priority = executable.getPriority();
}
jobPriorityQueue.add(new Pair<>(executable, priority));
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 1695a22..0e78e9c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -42,8 +42,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
private final IMRBatchCubingInputSide inputSide;
private final IMRBatchCubingOutputSide2 outputSide;
- public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
- super(newSegment, submitter);
+ public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+ super(newSegment, submitter, priorityOffset);
this.inputSide = MRUtil.getBatchCubingInputSide(seg);
this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
}
@@ -86,6 +86,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
inputSide.addStepPhase4_Cleanup(result);
outputSide.addStepPhase4_Cleanup(result);
+
+ // Set the task priority if specified
+ result.setPriorityBasedOnPriorityOffset(priorityOffset);
+ result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));
return result;
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 11c7d36..4a83dea 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -59,6 +59,7 @@ public class JobBuilderSupport {
final protected JobEngineConfig config;
final protected CubeSegment seg;
final protected String submitter;
+ final protected Integer priorityOffset;
final public static String LayeredCuboidFolderPrefix = "level_";
@@ -68,10 +69,15 @@ public class JobBuilderSupport {
final public static Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
public JobBuilderSupport(CubeSegment seg, String submitter) {
+ this(seg, submitter, 0);
+ }
+
+ public JobBuilderSupport(CubeSegment seg, String submitter, Integer priorityOffset) {
Preconditions.checkNotNull(seg, "segment cannot be null");
this.config = new JobEngineConfig(seg.getConfig());
this.seg = seg;
this.submitter = submitter;
+ this.priorityOffset = priorityOffset;
}
public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
index 665e791..4aceae0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -38,8 +38,8 @@ public class MRBatchCubingEngine2 implements IBatchCubingEngine {
}
@Override
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return new BatchCubingJobBuilder2(newSegment, submitter).build();
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+ return new BatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build();
}
@Override
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
index 47316b4..d3afb03 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
@@ -40,8 +40,8 @@ public class SparkBatchCubingEngine2 implements IBatchCubingEngine {
}
@Override
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return new SparkBatchCubingJobBuilder2(newSegment, submitter).build();
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+ return new SparkBatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build();
}
@Override
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 089ca24..1d2e78e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -45,7 +45,11 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
private final ISparkOutput.ISparkBatchCubingOutputSide outputSide;
public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
- super(newSegment, submitter);
+ this(newSegment, submitter, 0);
+ }
+
+ public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+ super(newSegment, submitter, priorityOffset);
this.inputSide = SparkUtil.getBatchCubingInputSide(seg);
this.outputSide = SparkUtil.getBatchCubingOutputSide(seg);
}
@@ -89,6 +93,10 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
inputSide.addStepPhase4_Cleanup(result);
outputSide.addStepPhase4_Cleanup(result);
+ // Set the task priority if specified
+ result.setPriorityBasedOnPriorityOffset(priorityOffset);
+ result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));
+
return result;
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 08b03a4..c3f45a6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -352,7 +352,7 @@ public class CubeController extends BasicController {
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
return buildInternal(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null,
- req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
+ req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment(), req.getPriorityOffset());
}
/**
@@ -380,19 +380,19 @@ public class CubeController extends BasicController {
public JobInstance rebuild2(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
return buildInternal(cubeName, null, new SegmentRange(req.getSourceOffsetStart(), req.getSourceOffsetEnd()),
req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(), req.getBuildType(),
- req.isForce());
+ req.isForce(), req.getPriorityOffset());
}
private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, //
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
- String buildType, boolean force) {
+ String buildType, boolean force, Integer priorityOffset) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
checkBuildingSegment(cube);
return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
- CubeBuildTypeEnum.valueOf(buildType), force, submitter);
+ CubeBuildTypeEnum.valueOf(buildType), force, submitter, priorityOffset);
} catch (Throwable e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage(), e);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java
index 270d2d5..ff747a1 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java
@@ -30,6 +30,8 @@ public class JobBuildRequest {
private boolean forceMergeEmptySegment = false;
+ private Integer priorityOffset = 0;
+
public long getStartTime() {
return startTime;
}
@@ -69,4 +71,12 @@ public class JobBuildRequest {
public void setForceMergeEmptySegment(boolean forceMergeEmptySegment) {
this.forceMergeEmptySegment = forceMergeEmptySegment;
}
+
+ public Integer getPriorityOffset() {
+ return priorityOffset;
+ }
+
+ public void setPriorityOffset(Integer priorityOffset) {
+ this.priorityOffset = priorityOffset;
+ }
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
index a5986ad..ab20996 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
@@ -35,6 +35,8 @@ public class JobBuildRequest2 {
private String buildType;
private boolean force;
+
+ private Integer priorityOffset = 0;
public long getSourceOffsetStart() {
return sourceOffsetStart;
@@ -84,4 +86,11 @@ public class JobBuildRequest2 {
this.force = force;
}
+ public Integer getPriorityOffset() {
+ return priorityOffset;
+ }
+
+ public void setPriorityOffset(Integer priorityOffset) {
+ this.priorityOffset = priorityOffset;
+ }
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 31a4119..8182f3d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -204,17 +204,17 @@ public class JobService extends BasicService implements InitializingBean {
public JobInstance submitJob(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
- CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException {
+ CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
aclEvaluate.checkProjectOperationPermission(cube);
JobInstance jobInstance = submitJobInternal(cube, tsRange, segRange, sourcePartitionOffsetStart,
- sourcePartitionOffsetEnd, buildType, force, submitter);
+ sourcePartitionOffsetEnd, buildType, force, submitter, priorityOffset);
return jobInstance;
}
public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, //
- CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException {
+ CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
Message msg = MsgPicker.getMsg();
if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
@@ -238,13 +238,13 @@ public class JobService extends BasicService implements InitializingBean {
sourcePartitionOffsetEnd);
src = source.enrichSourcePartitionBeforeBuild(cube, src);
newSeg = getCubeManager().appendSegment(cube, src);
- job = EngineFactory.createBatchCubingJob(newSeg, submitter);
+ job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force);
job = EngineFactory.createBatchMergeJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.REFRESH) {
newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange);
- job = EngineFactory.createBatchCubingJob(newSeg, submitter);
+ job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
} else {
throw new BadRequestException(String.format(Locale.ROOT, msg.getINVALID_BUILD_TYPE(), buildType));
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
index b3b1126..eb4f7f7 100644
--- a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
@@ -107,13 +107,13 @@ public class CubeBuildingCLI extends AbstractApplication {
if (buildType == CubeBuildTypeEnum.BUILD) {
CubeSegment newSeg = cubeManager.appendSegment(cube, tsRange);
- job = EngineFactory.createBatchCubingJob(newSeg, submitter);
+ job = EngineFactory.createBatchCubingJob(newSeg, submitter, null);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
CubeSegment newSeg = cubeManager.mergeSegments(cube, tsRange, null, forceMergeEmptySeg);
job = EngineFactory.createBatchMergeJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.REFRESH) {
CubeSegment refreshSeg = cubeManager.refreshSegment(cube, tsRange, null);
- job = EngineFactory.createBatchCubingJob(refreshSeg, submitter);
+ job = EngineFactory.createBatchCubingJob(refreshSeg, submitter, null);
} else {
throw new JobException("invalid build type:" + buildType);
}