You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/08/15 13:34:33 UTC
[kylin] 09/11: KYLIN-4057 Don't merge the job that has been
discarded manually before.
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b767f4c08682155a3d6818aa6619a3380bb846aa
Author: rupengwang <wa...@live.cn>
AuthorDate: Wed Jul 17 17:32:21 2019 +0800
KYLIN-4057 Don't merge the job that has been discarded manually before.
---
.../java/org/apache/kylin/engine/mr/CubingJob.java | 5 +++++
.../org/apache/kylin/rest/service/CubeService.java | 18 +++++++++++++++++-
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index fb1a7f4..13f4183 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -106,6 +106,7 @@ public class CubingJob extends DefaultChainedExecutable {
private static final String DEPLOY_ENV_NAME = "envName";
private static final String PROJECT_INSTANCE_NAME = "projectName";
private static final String JOB_TYPE = "jobType";
+ private static final String SEGMENT_NAME = "segmentName";
public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
return initCubingJob(seg, CubingJobTypeEnum.BUILD.toString(), submitter, config);
@@ -185,6 +186,10 @@ public class CubingJob extends DefaultChainedExecutable {
return getParam(JOB_TYPE);
}
+ public String getSegmentName() {
+ return getParam(SEGMENT_NAME);
+ }
+
void setJobType(String jobType) {
setParam(JOB_TYPE, jobType);
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 2a5ce26..b975cdc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -684,7 +684,7 @@ public class CubeService extends BasicService implements InitializingBean {
try {
cube = getCubeManager().getCube(cubeName);
SegmentRange offsets = cube.autoMergeCubeSegments();
- if (offsets != null) {
+ if (offsets != null && !isMergingJobBeenDiscarded(cube, cubeName, cube.getProject(), offsets)) {
CubeSegment newSeg = getCubeManager().mergeSegments(cube, null, offsets, true);
logger.debug("Will submit merge job on " + newSeg);
DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(newSeg, "SYSTEM");
@@ -698,6 +698,22 @@ public class CubeService extends BasicService implements InitializingBean {
}
}
+ //Don't merge the job that has been discarded manually before
+ private boolean isMergingJobBeenDiscarded(CubeInstance cubeInstance, String cubeName, String projectName, SegmentRange offsets) {
+ SegmentRange.TSRange tsRange = new SegmentRange.TSRange((Long) offsets.start.v, (Long) offsets.end.v);
+ String segmentName = CubeSegment.makeSegmentName(tsRange, null, cubeInstance.getModel());
+ final List<CubingJob> jobInstanceList = jobService.listJobsByRealizationName(cubeName, projectName, EnumSet.of(ExecutableState.DISCARDED));
+ for (CubingJob cubingJob : jobInstanceList) {
+ if (cubingJob.getSegmentName().equals(segmentName)) {
+ logger.debug("Merge job {} has been discarded before, will not merge.", segmentName);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
public void validateCubeDesc(CubeDesc desc, boolean isDraft) {
Message msg = MsgPicker.getMsg();