You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/16 07:01:45 UTC
[kylin] 14/15: KYLIN-5363 Fix the problem of job metadata change when the number of segments in parallel build is too large
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6cb5d889e7d3961460ba8374f0fac82a97153012
Author: sibingzhang <74...@users.noreply.github.com>
AuthorDate: Fri Oct 28 15:33:09 2022 +0800
KYLIN-5363 Fix the problem of job metadata change when the number of segments in parallel build is too large
from 7124
Co-authored-by: sibing.zhang <19...@qq.com>
---
.../org/apache/kylin/job/dao/NExecutableDao.java | 25 ++++++++++++
.../kylin/job/execution/NExecutableManager.java | 17 ++++++--
.../kylin/job/execution/DagExecutableTest.java | 4 ++
.../org/apache/kylin/rest/service/JobService.java | 1 +
.../kylin/rest/service/DagJobServiceTest.java | 4 ++
.../apache/kylin/rest/service/JobErrorTest.java | 4 +-
.../apache/kylin/rest/service/JobServiceTest.java | 27 ++++++++++++-
.../org/apache/kylin/rest/service/StageTest.java | 47 ++++++++++++++++++++++
.../kylin/engine/spark/job/NSparkExecutable.java | 5 ++-
9 files changed, 127 insertions(+), 7 deletions(-)
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
index 5c353d989c..f95779e96f 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
@@ -19,7 +19,9 @@
package org.apache.kylin.job.dao;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -61,6 +63,8 @@ public class NExecutableDao {
private CachedCrudAssist<ExecutablePO> crud;
+ private Map<String, ExecutablePO> updating = new HashMap<>();
+
private NExecutableDao(KylinConfig config, String project) {
logger.trace("Using metadata url: {}", config);
this.project = project;
@@ -126,6 +130,27 @@ public class NExecutableDao {
}
}
+ public void updateJobWithoutSave(String uuid, Predicate<ExecutablePO> updater) {
+ ExecutablePO executablePO;
+ if (updating.containsKey(uuid)) {
+ executablePO = updating.get(uuid);
+ } else {
+ ExecutablePO executablePOFromCache = getJobByUuid(uuid);
+ Preconditions.checkNotNull(executablePOFromCache);
+ val copyForWrite = JsonUtil.copyBySerialization(executablePOFromCache, JOB_SERIALIZER, null);
+ updating.put(uuid, copyForWrite);
+ executablePO = copyForWrite;
+ }
+ updater.test(executablePO);
+ }
+
+ public void saveUpdatedJob() {
+ for (ExecutablePO executablePO : updating.values()) {
+ crud.save(executablePO);
+ }
+ updating = new HashMap<>();
+ }
+
private ResourceStore getStore() {
return ResourceStore.getKylinMetaStore(config);
}
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index 00dfcd6a1c..a161751468 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -281,6 +281,10 @@ public class NExecutableManager {
}
}
+ public void saveUpdatedJob() {
+ executableDao.saveUpdatedJob();
+ }
+
public Set<String> getYarnApplicationJobs(String id) {
ExecutablePO executablePO = executableDao.getJobByUuid(id);
String appIds = executablePO.getOutput().getInfo().getOrDefault(YARN_APP_IDS, "");
@@ -785,6 +789,7 @@ public class NExecutableManager {
.forEach(stage -> //
updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.READY, null, null));
}
+ saveUpdatedJob();
}
}
});
@@ -866,6 +871,7 @@ public class NExecutableManager {
.forEach(stage -> // when restart, reset stage
updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.READY, null, null, true));
}
+ saveUpdatedJob();
}
}
@@ -908,6 +914,7 @@ public class NExecutableManager {
.forEach(stage -> //
updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.SUICIDAL, null, null));
}
+ saveUpdatedJob();
}
}
});
@@ -933,6 +940,7 @@ public class NExecutableManager {
.forEach(stage -> //
updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.DISCARDED, null, null));
}
+ saveUpdatedJob();
}
}
});
@@ -963,6 +971,7 @@ public class NExecutableManager {
.forEach(stage -> //
updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.ERROR, null, null));
}
+ saveUpdatedJob();
}
}
});
@@ -1002,6 +1011,7 @@ public class NExecutableManager {
.forEach(stage -> //
updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.PAUSED, null, null));
}
+ saveUpdatedJob();
}
}
});
@@ -1158,7 +1168,7 @@ public class NExecutableManager {
public void updateStageStatus(String taskOrJobId, String segmentId, ExecutableState newStatus,
Map<String, String> updateInfo, String failedMsg, Boolean isRestart) {
val jobId = extractJobId(taskOrJobId);
- executableDao.updateJob(jobId, job -> {
+ executableDao.updateJobWithoutSave(jobId, job -> {
final List<Map<String, List<ExecutablePO>>> collect = job.getTasks().stream()//
.map(ExecutablePO::getStagesMap)//
.filter(MapUtils::isNotEmpty)//
@@ -1253,6 +1263,7 @@ public class NExecutableManager {
.forEach(stage -> //
updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.SUCCEED, null, null));
}
+ saveUpdatedJob();
}
}
});
@@ -1278,6 +1289,7 @@ public class NExecutableManager {
.forEach(stage -> //
updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.ERROR, null, null));
}
+ saveUpdatedJob();
}
}
});
@@ -1371,8 +1383,7 @@ public class NExecutableManager {
}
val thread = scheduler.getContext().getRunningJobThread(executable);
if (thread != null) {
- logger.info("Interrupt Job [{}] thread and remove in ExecutableContext",
- executable.getDisplayName());
+ logger.info("Interrupt Job [{}] thread and remove in ExecutableContext", executable.getDisplayName());
thread.interrupt();
scheduler.getContext().removeRunningJob(executable);
}
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
index f39969755d..9357a55159 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
@@ -644,6 +644,7 @@ class DagExecutableTest {
manager.updateStageStatus(stage1.getId(), task.getId(), ExecutableState.RUNNING, null, null);
manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.RUNNING, null, null);
manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.RUNNING, null, null);
+ manager.saveUpdatedJob();
await().pollDelay(Duration.ONE_SECOND).until(() -> true);
manager.updateJobOutput(job.getId(), ExecutableState.SUCCEED);
manager.updateJobOutput(task.getId(), ExecutableState.SUCCEED);
@@ -652,6 +653,7 @@ class DagExecutableTest {
manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
await().pollDelay(Duration.ONE_SECOND).until(() -> true);
manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
+ manager.saveUpdatedJob();
val taskDuration = task.getTaskDurationToTest(task);
val expected = AbstractExecutable.getDuration(stage1.getOutput(task.getId()))
@@ -687,6 +689,7 @@ class DagExecutableTest {
manager.updateStageStatus(stage1.getId(), task.getId(), ExecutableState.RUNNING, null, null);
manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.RUNNING, null, null);
manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.RUNNING, null, null);
+ manager.saveUpdatedJob();
await().pollDelay(Duration.ONE_SECOND).until(() -> true);
manager.updateJobOutput(job.getId(), ExecutableState.SUCCEED);
manager.updateJobOutput(task.getId(), ExecutableState.SUCCEED);
@@ -695,6 +698,7 @@ class DagExecutableTest {
manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
await().pollDelay(Duration.ONE_SECOND).until(() -> true);
manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
+ manager.saveUpdatedJob();
val taskDuration = task.getTaskDurationToTest(task);
val expected = task.getDuration();
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 22ea2b774a..08ef663424 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -987,6 +987,7 @@ public class JobService extends BasicService implements JobSupporter {
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
val executableManager = getManager(NExecutableManager.class, project);
executableManager.updateStageStatus(taskId, segmentId, newStatus, updateInfo, errMsg);
+ executableManager.saveUpdatedJob();
return null;
}, project, UnitOfWork.DEFAULT_MAX_RETRY, UnitOfWork.DEFAULT_EPOCH_ID, jobId);
}
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
index 2acffd1281..bbbed42b0b 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
@@ -165,6 +165,7 @@ class DagJobServiceTest {
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
manager.updateJobOutput(task1.getId(), ExecutableState.ERROR);
manager.updateStageStatus(stage11.getId(), task1.getId(), ExecutableState.ERROR, null, null);
+ manager.saveUpdatedJob();
return null;
}, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
@@ -175,6 +176,7 @@ class DagJobServiceTest {
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
manager.updateJobOutput(task3.getId(), ExecutableState.ERROR);
manager.updateStageStatus(stage31.getId(), task3.getId(), ExecutableState.ERROR, null, null);
+ manager.saveUpdatedJob();
return null;
}, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
@@ -239,6 +241,7 @@ class DagJobServiceTest {
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
manager.updateJobOutput(task1.getId(), ExecutableState.ERROR);
manager.updateStageStatus(stage11.getId(), task1.getId(), ExecutableState.ERROR, null, null);
+ manager.saveUpdatedJob();
return null;
}, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
@@ -249,6 +252,7 @@ class DagJobServiceTest {
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
manager.updateJobOutput(task3.getId(), ExecutableState.ERROR);
manager.updateStageStatus(stage31.getId(), task3.getId(), ExecutableState.ERROR, null, null);
+ manager.saveUpdatedJob();
return null;
}, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
index d08373b613..c9c52fc563 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
@@ -387,7 +387,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
manager.addJob(executable);
var output = manager.getOutput(executable.getId());
- final long[] duration = {AbstractExecutable.getDuration(output)};
+ final long[] duration = { AbstractExecutable.getDuration(output) };
Assert.assertEquals(0, duration[0]);
((DefaultOutput) output).setStartTime(System.currentTimeMillis());
@@ -442,6 +442,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.RUNNING, null, null);
manager.updateStageStatus(logicStep2.getId(), null, ExecutableState.RUNNING, null, null);
manager.updateStageStatus(logicStep3.getId(), null, ExecutableState.RUNNING, null, null);
+ manager.saveUpdatedJob();
val durationWithoutWaiteTime = executable.getDurationFromStepOrStageDurationSum();
@@ -487,6 +488,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.RUNNING, null, null);
manager.updateStageStatus(logicStep2.getId(), null, ExecutableState.RUNNING, null, null);
manager.updateStageStatus(logicStep3.getId(), null, ExecutableState.RUNNING, null, null);
+ manager.saveUpdatedJob();
val durationWithoutWaiteTime = executable.getDurationFromStepOrStageDurationSum();
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index e29d23a875..ee6db5a9be 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -578,6 +578,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+ manager.saveUpdatedJob();
var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
@@ -587,6 +588,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
@@ -596,6 +598,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
Map<String, String> info = Maps.newHashMap();
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -603,6 +606,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "8");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -610,6 +614,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -617,18 +622,21 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
assertTrue(1 == successLogicStep);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
assertTrue(1 == successLogicStep);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -665,6 +673,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+ manager.saveUpdatedJob();
var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
@@ -674,6 +683,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
@@ -683,6 +693,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
Map<String, String> info = Maps.newHashMap();
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -690,6 +701,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -697,6 +709,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -704,18 +717,21 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
assertTrue(0.5 == successLogicStep);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
assertTrue(0.5 == successLogicStep);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap();
successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -751,6 +767,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+ manager.saveUpdatedJob();
var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap().get(segmentId);
@@ -760,6 +777,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap().get(segmentId);
@@ -769,6 +787,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
Map<String, String> info = Maps.newHashMap();
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap().get(segmentId);
successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -778,6 +797,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "8");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap().get(segmentId);
successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -787,6 +807,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap().get(segmentId);
successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -796,6 +817,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap().get(segmentId);
successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -804,6 +826,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
assertTrue(1 == successLogicStep);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap().get(segmentId);
successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -812,6 +835,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
assertTrue(1 == successLogicStep);
manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+ manager.saveUpdatedJob();
buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
.get(0)).getStagesMap().get(segmentId);
successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -863,6 +887,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
assertEquals(logicStep.getId(), logicStepBase.getId());
manager.updateStageStatus(logicStep.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+ manager.saveUpdatedJob();
List<ExecutableStepResponse> jobDetail = jobService.getJobDetail(project, executable.getId());
assertEquals(1, jobDetail.size());
@@ -883,8 +908,8 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
assertTrue(logicStepResponse2.getExecStartTime() < System.currentTimeMillis());
manager.updateStageStatus(logicStep.getId(), segmentId2, ExecutableState.RUNNING, null, "test output");
-
manager.updateStageStatus(logicStep.getId(), null, ExecutableState.SUCCEED, null, "test output");
+ manager.saveUpdatedJob();
jobDetail = jobService.getJobDetail(project, executable.getId());
assertEquals(1, jobDetail.size());
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
index c030785854..bf63baa1e2 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest.service;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.kylin.engine.spark.job.step.NStageForBuild;
@@ -60,7 +61,9 @@ import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.test.util.ReflectionTestUtils;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -204,6 +207,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
manager.addJob(executable);
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, null, null, false);
+ manager.saveUpdatedJob();
manager.updateStagePaused(executable);
@@ -256,6 +260,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
manager.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, null, null);
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, null, null, false);
+ manager.saveUpdatedJob();
manager.updateJobOutput(executable.getId(), ExecutableState.SUCCEED, null, null, null);
manager.makeStageSuccess(sparkExecutable.getId());
@@ -295,12 +300,14 @@ public class StageTest extends NLocalFileMetadataTestCase {
manager.addJob(executable);
manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, errMsg);
+ manager.saveUpdatedJob();
var output1 = manager.getOutput(logicStep1.getId(), segmentId);
Assert.assertEquals(ExecutableState.SUCCEED, output1.getState());
Assert.assertEquals(output1.getShortErrMsg(), errMsg);
Assert.assertTrue(MapUtils.isEmpty(output1.getExtra()));
manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.ERROR, null, errMsg);
+ manager.saveUpdatedJob();
output1 = manager.getOutput(logicStep1.getId(), segmentId);
Assert.assertEquals(ExecutableState.SUCCEED, output1.getState());
Assert.assertEquals(output1.getShortErrMsg(), errMsg);
@@ -316,6 +323,45 @@ public class StageTest extends NLocalFileMetadataTestCase {
Assert.assertTrue(MapUtils.isEmpty(outputLogicStep2.getExtra()));
}
+ @Test
+ public void testUpdateStageStatusNoSaveCache() {
+ val segmentId = RandomUtil.randomUUIDStr();
+ val segmentId2 = RandomUtil.randomUUIDStr();
+
+ val manager = NExecutableManager.getInstance(jobService.getConfig(), getProject());
+ val executable = new SucceedChainedTestExecutable();
+
+ executable.setId(RandomUtil.randomUUIDStr());
+
+ val sparkExecutable = new NSparkExecutable();
+ sparkExecutable.setParam(NBatchConstants.P_SEGMENT_IDS, segmentId + "," + segmentId2);
+ sparkExecutable.setId(RandomUtil.randomUUIDStr());
+ executable.addTask(sparkExecutable);
+
+ val build1 = new NStageForBuild();
+ val build2 = new NStageForBuild();
+ val build3 = new NStageForBuild();
+ sparkExecutable.addStage(build1);
+ sparkExecutable.addStage(build2);
+ sparkExecutable.addStage(build3);
+ sparkExecutable.setStageMap();
+
+ manager.addJob(executable);
+
+ List<AbstractExecutable> tasks = executable.getTasks();
+ tasks.forEach(task -> {
+ final Map<String, List<StageBase>> tasksMap = ((ChainedStageExecutable) task).getStagesMap();
+ for (Map.Entry<String, List<StageBase>> entry : tasksMap.entrySet()) {
+ Optional.ofNullable(entry.getValue()).orElse(Lists.newArrayList())//
+ .forEach(stage -> //
+ manager.updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.DISCARDED, null, null));
+ }
+ manager.saveUpdatedJob();
+ });
+
+ Assert.assertEquals(1, manager.getAllJobs().get(0).getMvcc());
+ }
+
@Test
public void testSetStageOutput() {
NExecutableManager manager = NExecutableManager.getInstance(jobService.getConfig(), getProject());
@@ -445,6 +491,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
manager.updateStageStatus(stage1.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
manager.updateStageStatus(stage2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
manager.updateStageStatus(stage3.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+ manager.saveUpdatedJob();
manager.makeStageError(executable.getId());
var job = manager.getJob(executable.getId());
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 5517f3b2c4..a842c926ca 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -206,8 +206,9 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage
public void waiteForResourceStart(ExecutableContext context) {
// mark waiteForResource stage start
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
- getExecutableManager(getProject()) //
- .updateStageStatus(getId() + "_00", null, ExecutableState.RUNNING, null, null);
+ NExecutableManager manager = getExecutableManager(getProject());
+ manager.updateStageStatus(getId() + "_00", null, ExecutableState.RUNNING, null, null);
+ manager.saveUpdatedJob();
return 0;
}, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), getTempLockName());
}