You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/16 07:01:45 UTC

[kylin] 14/15: KYLIN-5363 Fix the problem of job metadata change when the number of segments in parallel build is too large

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6cb5d889e7d3961460ba8374f0fac82a97153012
Author: sibingzhang <74...@users.noreply.github.com>
AuthorDate: Fri Oct 28 15:33:09 2022 +0800

    KYLIN-5363 Fix the problem of job metadata change when the number of segments in parallel build is too large
    
    from 7124
    
    Co-authored-by: sibing.zhang <19...@qq.com>
---
 .../org/apache/kylin/job/dao/NExecutableDao.java   | 25 ++++++++++++
 .../kylin/job/execution/NExecutableManager.java    | 17 ++++++--
 .../kylin/job/execution/DagExecutableTest.java     |  4 ++
 .../org/apache/kylin/rest/service/JobService.java  |  1 +
 .../kylin/rest/service/DagJobServiceTest.java      |  4 ++
 .../apache/kylin/rest/service/JobErrorTest.java    |  4 +-
 .../apache/kylin/rest/service/JobServiceTest.java  | 27 ++++++++++++-
 .../org/apache/kylin/rest/service/StageTest.java   | 47 ++++++++++++++++++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   |  5 ++-
 9 files changed, 127 insertions(+), 7 deletions(-)

diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
index 5c353d989c..f95779e96f 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
@@ -19,7 +19,9 @@
 package org.apache.kylin.job.dao;
 
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -61,6 +63,8 @@ public class NExecutableDao {
 
     private CachedCrudAssist<ExecutablePO> crud;
 
+    private Map<String, ExecutablePO> updating = new HashMap<>();
+
     private NExecutableDao(KylinConfig config, String project) {
         logger.trace("Using metadata url: {}", config);
         this.project = project;
@@ -126,6 +130,27 @@ public class NExecutableDao {
         }
     }
 
+    public void updateJobWithoutSave(String uuid, Predicate<ExecutablePO> updater) {
+        ExecutablePO executablePO;
+        if (updating.containsKey(uuid)) {
+            executablePO = updating.get(uuid);
+        } else {
+            ExecutablePO executablePOFromCache = getJobByUuid(uuid);
+            Preconditions.checkNotNull(executablePOFromCache);
+            val copyForWrite = JsonUtil.copyBySerialization(executablePOFromCache, JOB_SERIALIZER, null);
+            updating.put(uuid, copyForWrite);
+            executablePO = copyForWrite;
+        }
+        updater.test(executablePO);
+    }
+
+    public void saveUpdatedJob() {
+        for (ExecutablePO executablePO : updating.values()) {
+            crud.save(executablePO);
+        }
+        updating = new HashMap<>();
+    }
+
     private ResourceStore getStore() {
         return ResourceStore.getKylinMetaStore(config);
     }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index 00dfcd6a1c..a161751468 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -281,6 +281,10 @@ public class NExecutableManager {
         }
     }
 
+    public void saveUpdatedJob() {
+        executableDao.saveUpdatedJob();
+    }
+
     public Set<String> getYarnApplicationJobs(String id) {
         ExecutablePO executablePO = executableDao.getJobByUuid(id);
         String appIds = executablePO.getOutput().getInfo().getOrDefault(YARN_APP_IDS, "");
@@ -785,6 +789,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.READY, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -866,6 +871,7 @@ public class NExecutableManager {
                                     .forEach(stage -> // when restart, reset stage
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.READY, null, null, true));
                         }
+                        saveUpdatedJob();
                     }
                 }
 
@@ -908,6 +914,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.SUICIDAL, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -933,6 +940,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.DISCARDED, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -963,6 +971,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.ERROR, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -1002,6 +1011,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.PAUSED, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -1158,7 +1168,7 @@ public class NExecutableManager {
     public void updateStageStatus(String taskOrJobId, String segmentId, ExecutableState newStatus,
             Map<String, String> updateInfo, String failedMsg, Boolean isRestart) {
         val jobId = extractJobId(taskOrJobId);
-        executableDao.updateJob(jobId, job -> {
+        executableDao.updateJobWithoutSave(jobId, job -> {
             final List<Map<String, List<ExecutablePO>>> collect = job.getTasks().stream()//
                     .map(ExecutablePO::getStagesMap)//
                     .filter(MapUtils::isNotEmpty)//
@@ -1253,6 +1263,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.SUCCEED, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -1278,6 +1289,7 @@ public class NExecutableManager {
                                     .forEach(stage -> //
                             updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.ERROR, null, null));
                         }
+                        saveUpdatedJob();
                     }
                 }
             });
@@ -1371,8 +1383,7 @@ public class NExecutableManager {
         }
         val thread = scheduler.getContext().getRunningJobThread(executable);
         if (thread != null) {
-            logger.info("Interrupt Job [{}] thread and remove in ExecutableContext",
-                    executable.getDisplayName());
+            logger.info("Interrupt Job [{}] thread and remove in ExecutableContext", executable.getDisplayName());
             thread.interrupt();
             scheduler.getContext().removeRunningJob(executable);
         }
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
index f39969755d..9357a55159 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
@@ -644,6 +644,7 @@ class DagExecutableTest {
         manager.updateStageStatus(stage1.getId(), task.getId(), ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.RUNNING, null, null);
+        manager.saveUpdatedJob();
         await().pollDelay(Duration.ONE_SECOND).until(() -> true);
         manager.updateJobOutput(job.getId(), ExecutableState.SUCCEED);
         manager.updateJobOutput(task.getId(), ExecutableState.SUCCEED);
@@ -652,6 +653,7 @@ class DagExecutableTest {
         manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
         await().pollDelay(Duration.ONE_SECOND).until(() -> true);
         manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
+        manager.saveUpdatedJob();
 
         val taskDuration = task.getTaskDurationToTest(task);
         val expected = AbstractExecutable.getDuration(stage1.getOutput(task.getId()))
@@ -687,6 +689,7 @@ class DagExecutableTest {
         manager.updateStageStatus(stage1.getId(), task.getId(), ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.RUNNING, null, null);
+        manager.saveUpdatedJob();
         await().pollDelay(Duration.ONE_SECOND).until(() -> true);
         manager.updateJobOutput(job.getId(), ExecutableState.SUCCEED);
         manager.updateJobOutput(task.getId(), ExecutableState.SUCCEED);
@@ -695,6 +698,7 @@ class DagExecutableTest {
         manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
         await().pollDelay(Duration.ONE_SECOND).until(() -> true);
         manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.SUCCEED, null, null);
+        manager.saveUpdatedJob();
 
         val taskDuration = task.getTaskDurationToTest(task);
         val expected = task.getDuration();
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 22ea2b774a..08ef663424 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -987,6 +987,7 @@ public class JobService extends BasicService implements JobSupporter {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             val executableManager = getManager(NExecutableManager.class, project);
             executableManager.updateStageStatus(taskId, segmentId, newStatus, updateInfo, errMsg);
+            executableManager.saveUpdatedJob();
             return null;
         }, project, UnitOfWork.DEFAULT_MAX_RETRY, UnitOfWork.DEFAULT_EPOCH_ID, jobId);
     }
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
index 2acffd1281..bbbed42b0b 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
@@ -165,6 +165,7 @@ class DagJobServiceTest {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             manager.updateJobOutput(task1.getId(), ExecutableState.ERROR);
             manager.updateStageStatus(stage11.getId(), task1.getId(), ExecutableState.ERROR, null, null);
+            manager.saveUpdatedJob();
             return null;
         }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
 
@@ -175,6 +176,7 @@ class DagJobServiceTest {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             manager.updateJobOutput(task3.getId(), ExecutableState.ERROR);
             manager.updateStageStatus(stage31.getId(), task3.getId(), ExecutableState.ERROR, null, null);
+            manager.saveUpdatedJob();
             return null;
         }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
 
@@ -239,6 +241,7 @@ class DagJobServiceTest {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             manager.updateJobOutput(task1.getId(), ExecutableState.ERROR);
             manager.updateStageStatus(stage11.getId(), task1.getId(), ExecutableState.ERROR, null, null);
+            manager.saveUpdatedJob();
             return null;
         }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
 
@@ -249,6 +252,7 @@ class DagJobServiceTest {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             manager.updateJobOutput(task3.getId(), ExecutableState.ERROR);
             manager.updateStageStatus(stage31.getId(), task3.getId(), ExecutableState.ERROR, null, null);
+            manager.saveUpdatedJob();
             return null;
         }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID);
 
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
index d08373b613..c9c52fc563 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
@@ -387,7 +387,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
         manager.addJob(executable);
 
         var output = manager.getOutput(executable.getId());
-        final long[] duration = {AbstractExecutable.getDuration(output)};
+        final long[] duration = { AbstractExecutable.getDuration(output) };
         Assert.assertEquals(0, duration[0]);
 
         ((DefaultOutput) output).setStartTime(System.currentTimeMillis());
@@ -442,6 +442,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(logicStep2.getId(), null, ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(logicStep3.getId(), null, ExecutableState.RUNNING, null, null);
+        manager.saveUpdatedJob();
 
         val durationWithoutWaiteTime = executable.getDurationFromStepOrStageDurationSum();
 
@@ -487,6 +488,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(logicStep2.getId(), null, ExecutableState.RUNNING, null, null);
         manager.updateStageStatus(logicStep3.getId(), null, ExecutableState.RUNNING, null, null);
+        manager.saveUpdatedJob();
 
         val durationWithoutWaiteTime = executable.getDurationFromStepOrStageDurationSum();
 
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index e29d23a875..ee6db5a9be 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -578,6 +578,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
 
         var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
@@ -587,6 +588,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+        manager.saveUpdatedJob();
 
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
@@ -596,6 +598,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         Map<String, String> info = Maps.newHashMap();
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -603,6 +606,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "8");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -610,6 +614,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -617,18 +622,21 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
         assertTrue(1 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
         assertTrue(1 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -665,6 +673,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
 
         var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
@@ -674,6 +683,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+        manager.saveUpdatedJob();
 
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
@@ -683,6 +693,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         Map<String, String> info = Maps.newHashMap();
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -690,6 +701,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -697,6 +709,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -704,18 +717,21 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
         assertTrue(0.5 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
         assertTrue(0.5 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap();
         successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps);
@@ -751,6 +767,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output");
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
 
         var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
@@ -760,6 +777,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
         manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true);
+        manager.saveUpdatedJob();
 
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
@@ -769,6 +787,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         Map<String, String> info = Maps.newHashMap();
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -778,6 +797,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "8");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -787,6 +807,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -796,6 +817,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
 
         info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12");
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -804,6 +826,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         assertTrue(1 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -812,6 +835,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         assertTrue(1 == successLogicStep);
 
         manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
         buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks()
                 .get(0)).getStagesMap().get(segmentId);
         successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true);
@@ -863,6 +887,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         assertEquals(logicStep.getId(), logicStepBase.getId());
 
         manager.updateStageStatus(logicStep.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
 
         List<ExecutableStepResponse> jobDetail = jobService.getJobDetail(project, executable.getId());
         assertEquals(1, jobDetail.size());
@@ -883,8 +908,8 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
         assertTrue(logicStepResponse2.getExecStartTime() < System.currentTimeMillis());
 
         manager.updateStageStatus(logicStep.getId(), segmentId2, ExecutableState.RUNNING, null, "test output");
-
         manager.updateStageStatus(logicStep.getId(), null, ExecutableState.SUCCEED, null, "test output");
+        manager.saveUpdatedJob();
 
         jobDetail = jobService.getJobDetail(project, executable.getId());
         assertEquals(1, jobDetail.size());
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
index c030785854..bf63baa1e2 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.service;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.kylin.engine.spark.job.step.NStageForBuild;
@@ -60,7 +61,9 @@ import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.test.util.ReflectionTestUtils;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -204,6 +207,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
         manager.addJob(executable);
 
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, null, null, false);
+        manager.saveUpdatedJob();
 
         manager.updateStagePaused(executable);
 
@@ -256,6 +260,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
 
         manager.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, null, null);
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, null, null, false);
+        manager.saveUpdatedJob();
         manager.updateJobOutput(executable.getId(), ExecutableState.SUCCEED, null, null, null);
         manager.makeStageSuccess(sparkExecutable.getId());
 
@@ -295,12 +300,14 @@ public class StageTest extends NLocalFileMetadataTestCase {
         manager.addJob(executable);
 
         manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, errMsg);
+        manager.saveUpdatedJob();
         var output1 = manager.getOutput(logicStep1.getId(), segmentId);
         Assert.assertEquals(ExecutableState.SUCCEED, output1.getState());
         Assert.assertEquals(output1.getShortErrMsg(), errMsg);
         Assert.assertTrue(MapUtils.isEmpty(output1.getExtra()));
 
         manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.ERROR, null, errMsg);
+        manager.saveUpdatedJob();
         output1 = manager.getOutput(logicStep1.getId(), segmentId);
         Assert.assertEquals(ExecutableState.SUCCEED, output1.getState());
         Assert.assertEquals(output1.getShortErrMsg(), errMsg);
@@ -316,6 +323,45 @@ public class StageTest extends NLocalFileMetadataTestCase {
         Assert.assertTrue(MapUtils.isEmpty(outputLogicStep2.getExtra()));
     }
 
+    @Test
+    public void testUpdateStageStatusNoSaveCache() {
+        val segmentId = RandomUtil.randomUUIDStr();
+        val segmentId2 = RandomUtil.randomUUIDStr();
+
+        val manager = NExecutableManager.getInstance(jobService.getConfig(), getProject());
+        val executable = new SucceedChainedTestExecutable();
+
+        executable.setId(RandomUtil.randomUUIDStr());
+
+        val sparkExecutable = new NSparkExecutable();
+        sparkExecutable.setParam(NBatchConstants.P_SEGMENT_IDS, segmentId + "," + segmentId2);
+        sparkExecutable.setId(RandomUtil.randomUUIDStr());
+        executable.addTask(sparkExecutable);
+
+        val build1 = new NStageForBuild();
+        val build2 = new NStageForBuild();
+        val build3 = new NStageForBuild();
+        sparkExecutable.addStage(build1);
+        sparkExecutable.addStage(build2);
+        sparkExecutable.addStage(build3);
+        sparkExecutable.setStageMap();
+
+        manager.addJob(executable);
+
+        List<AbstractExecutable> tasks = executable.getTasks();
+        tasks.forEach(task -> {
+            final Map<String, List<StageBase>> tasksMap = ((ChainedStageExecutable) task).getStagesMap();
+            for (Map.Entry<String, List<StageBase>> entry : tasksMap.entrySet()) {
+                Optional.ofNullable(entry.getValue()).orElse(Lists.newArrayList())//
+                        .forEach(stage -> //
+                manager.updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.DISCARDED, null, null));
+            }
+            manager.saveUpdatedJob();
+        });
+
+        Assert.assertEquals(1, manager.getAllJobs().get(0).getMvcc());
+    }
+
     @Test
     public void testSetStageOutput() {
         NExecutableManager manager = NExecutableManager.getInstance(jobService.getConfig(), getProject());
@@ -445,6 +491,7 @@ public class StageTest extends NLocalFileMetadataTestCase {
         manager.updateStageStatus(stage1.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
         manager.updateStageStatus(stage2.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
         manager.updateStageStatus(stage3.getId(), segmentId, ExecutableState.RUNNING, null, "test output");
+        manager.saveUpdatedJob();
 
         manager.makeStageError(executable.getId());
         var job = manager.getJob(executable.getId());
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 5517f3b2c4..a842c926ca 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -206,8 +206,9 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage
     public void waiteForResourceStart(ExecutableContext context) {
         // mark waiteForResource stage start
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            getExecutableManager(getProject()) //
-                    .updateStageStatus(getId() + "_00", null, ExecutableState.RUNNING, null, null);
+            NExecutableManager manager = getExecutableManager(getProject());
+            manager.updateStageStatus(getId() + "_00", null, ExecutableState.RUNNING, null, null);
+            manager.saveUpdatedJob();
             return 0;
         }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), getTempLockName());
     }