You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/05/06 06:59:12 UTC

[kylin] 11/38: KYLIN-5527 Data count check for segment layout building

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 59a2e21034f7ccbf859e3aee94668a5ce8a4ec1a
Author: Yinghao Lin <39...@users.noreply.github.com>
AuthorDate: Tue Feb 21 11:35:14 2023 +0800

    KYLIN-5527 Data count check for segment layout building
---
 .../kylin/rest/service/AccessServiceTest.java      |   4 +-
 .../kylin/rest/service/AclTCRServiceTest.java      |   2 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  12 +
 .../org/apache/kylin/common/util/JsonUtil.java     |   1 +
 .../apache/kylin/job/constant/JobStatusEnum.java   |   6 +
 .../kylin/job/execution/AbstractExecutable.java    |  10 +-
 .../kylin/job/execution/DefaultExecutable.java     |  45 ++-
 .../kylin/job/execution/ExecutableState.java       |  11 +-
 .../apache/kylin/job/execution/ExecuteResult.java  |  23 +-
 .../kylin/job/execution/NExecutableManager.java    |   7 +
 .../job/execution/NExecutableManagerTest.java      |  21 +
 .../kylin/metadata/cube/model/NBatchConstants.java |   1 +
 .../kylin/metadata/cube/model/NDataLayout.java     |  19 +-
 .../kylin/metadata/cube/model/NDataSegDetails.java |  33 +-
 .../kylin/metadata/cube/model/NDataSegment.java    |  63 ++-
 .../metadata/cube/model/NDataflowManager.java      |   4 +-
 .../metadata/cube/model/SegmentPartition.java      |   2 +-
 .../org/apache/kylin/metadata/model/ISegment.java  |   2 +
 .../metadata/project/NProjectManagerTest.java      |   2 +-
 .../kylin/metrics/HdfsCapacityMetricsTest.java     |   4 +-
 .../kylin/rest/response/ExecutableResponse.java    |   6 +-
 .../rest/response/ExecutableStepResponse.java      |   3 +
 .../org/apache/kylin/rest/service/JobService.java  |   8 +-
 .../org/apache/kylin/rest/service/StageTest.java   |   3 +
 .../metadata/_global/project/index_build_test.json |  40 ++
 .../3ec47efc-573a-9304-4405-8e05ae184322.json      |  70 ++++
 .../b2f206e1-7a15-c94a-20f5-f608d550ead6.json      |  69 ++++
 .../3ec47efc-573a-9304-4405-8e05ae184322.json      | 204 ++++++++++
 .../3ec47efc-573a-9304-4405-8e05ae184322.json      | 241 +++++++++++
 .../index_build_test/table/SSB.CUSTOMER.json       |  77 ++++
 .../index_build_test/table/SSB.LINEORDER.json      | 131 ++++++
 .../rest/controller/NIndexPlanController.java      |   7 +-
 .../apache/kylin/rest/response/IndexResponse.java  |   4 +
 .../kylin/rest/response/NDataSegmentResponse.java  |   4 +-
 .../kylin/rest/service/FusionIndexService.java     |  24 +-
 .../kylin/rest/service/IndexPlanService.java       |  50 ++-
 .../apache/kylin/rest/service/ModelService.java    |   4 +-
 .../rest/service/params/IndexPlanParams.java}      |  31 +-
 .../rest/service/params/PaginationParams.java}     |  27 +-
 .../kylin/rest/service/ProjectServiceTest.java     |   6 +-
 .../rest/service/QueryHistoryServiceTest.java      |   2 +-
 .../kylin/query/engine/AsyncQueryJobTest.java      |   2 +-
 src/spark-project/engine-spark/pom.xml             |   5 +
 .../engine/spark/application/SparkApplication.java |  17 +-
 .../builder/PartitionDictionaryBuilderHelper.java  |   2 +-
 .../spark/job/ExecutableAddCuboidHandler.java      |  46 ++-
 .../kylin/engine/spark/job/NSparkCubingStep.java   |  42 +-
 .../spark/merger/AfterBuildResourceMerger.java     |   4 +-
 .../merger/AfterMergeOrRefreshResourceMerger.java  |   4 +-
 .../spark/builder/DictionaryBuilderHelper.java     |   2 +-
 .../kylin/engine/spark/job/BuildJobInfos.scala     |  21 +-
 .../apache/kylin/engine/spark/job/DFMergeJob.java  |   2 +-
 .../kylin/engine/spark/job/PartitionExec.scala     |   4 +
 .../kylin/engine/spark/job/SegmentBuildJob.java    |  14 +-
 .../kylin/engine/spark/job/SegmentExec.scala       |  38 +-
 .../apache/kylin/engine/spark/job/SegmentJob.java  |   4 +
 .../kylin/engine/spark/job/SegmentMergeJob.java    |   3 +-
 .../kylin/engine/spark/job/stage/StageExec.scala   | 113 +++---
 .../engine/spark/job/stage/build/BuildLayer.scala  |  23 +-
 .../engine/spark/job/stage/build/BuildStage.scala  |   6 -
 .../job/stage/build/GatherFlatTableStats.scala     |   2 -
 .../spark/job/stage/build/GenerateFlatTable.scala  | 145 ++++++-
 .../build/partition/PartitionBuildLayer.scala      |  23 +-
 .../build/partition/PartitionBuildStage.scala      |   6 -
 .../engine/spark/job/stage/merge/MergeStage.scala  |   2 +-
 .../merge/partition/PartitionMergeStage.scala      |   2 +-
 .../GenerateFlatTableWithSparkSessionTest.java     | 444 +++++++++++++++++++++
 .../kylin/tool/garbage/DataflowCleanerCLI.java     |   2 +-
 68 files changed, 1976 insertions(+), 285 deletions(-)

diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
index b0f9142756..7fc2665b6f 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
@@ -596,14 +596,14 @@ public class AccessServiceTest extends NLocalFileMetadataTestCase {
     @Test
     public void testGetGrantedProjectsOfUser() throws IOException {
         List<String> result = accessService.getGrantedProjectsOfUser("ADMIN");
-        assertEquals(28, result.size());
+        assertEquals(29, result.size());
     }
 
     @Test
     public void testGetGrantedProjectsOfUserOrGroup() throws IOException {
         // admin user
         List<String> result = accessService.getGrantedProjectsOfUserOrGroup("ADMIN", true);
-        assertEquals(28, result.size());
+        assertEquals(29, result.size());
 
         // normal user
         result = accessService.getGrantedProjectsOfUserOrGroup("ANALYST", true);
diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
index 9a0ee62d7d..20e40e74e6 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
@@ -1316,7 +1316,7 @@ public class AclTCRServiceTest extends NLocalFileMetadataTestCase {
         Mockito.when(userService.isGlobalAdmin("ADMIN")).thenReturn(true);
         List<SidPermissionWithAclResponse> responses = accessService.getUserOrGroupAclPermissions(projects, "ADMIN",
                 true);
-        Assert.assertEquals(28, responses.size());
+        Assert.assertEquals(29, responses.size());
         Assert.assertTrue(responses.stream().allMatch(response -> "ADMIN".equals(response.getProjectPermission())));
 
         // test normal group
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f20dca320b..32af04760a 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -980,6 +980,10 @@ public abstract class KylinConfigBase implements Serializable {
         return !getBuildConf().isEmpty() && !getWritingClusterWorkingDir().isEmpty();
     }
 
+    public String getWriteClusterWorkingDir() {
+        return getOptional("kylin.env.write-hdfs-working-dir", "");
+    }
+
     public String getWritingClusterWorkingDir() {
         return getOptional(WRITING_CLUSTER_WORKING_DIR, "");
     }
@@ -3800,6 +3804,14 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.source.ddl.logical-view-catchup-interval", "60"));
     }
 
+    public boolean isDataCountCheckEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.build.data-count-check-enabled", FALSE));
+    }
+
+    public boolean isNonStrictCountCheckAllowed() {
+        return Boolean.parseBoolean(getOptional("kylin.build.allow-non-strict-count-check", FALSE));
+    }
+
     // ============================================================================
     // Cost based index Planner
     // ============================================================================
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
index d74c7ca1d1..fb0b46826c 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
@@ -60,6 +60,7 @@ public class JsonUtil {
 
     static {
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+                .configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
                 .setConfig(mapper.getSerializationConfig().withView(PersistenceView.class));
         mapper.setFilterProvider(simpleFilterProvider);
         indentMapper.configure(SerializationFeature.INDENT_OUTPUT, true)
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/src/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
index 73f634bc85..59b0d14934 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
@@ -100,6 +100,12 @@ public enum JobStatusEnum {
         public boolean checkAction(JobActionEnum actionEnum) {
             return false;
         }
+    },
+    WARNING(2048) {
+        @Override
+        public boolean checkAction(JobActionEnum actionEnum) {
+            return false;
+        }
     };
 
     public abstract boolean checkAction(JobActionEnum actionEnum);
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index bfe6f1fb6a..0f8de4e055 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -285,7 +285,9 @@ public abstract class AbstractExecutable implements Executable {
         MetricsGroup.hostTagCounterInc(MetricsName.JOB_STEP_ATTEMPTED, MetricsCategory.PROJECT, project, retry);
         if (result.succeed()) {
             wrapWithCheckQuit(() -> {
-                updateJobOutput(project, getId(), ExecutableState.SUCCEED, result.getExtraInfo(), result.output(),
+                ExecutableState state = adjustState(ExecutableState.SUCCEED);
+                logger.info("Job {} adjust future state from {} to {}", getId(), ExecutableState.SUCCEED.name(), state.name());
+                updateJobOutput(project, getId(), state, result.getExtraInfo(), result.output(),
                         null);
             });
         } else if (result.skip()) {
@@ -304,6 +306,10 @@ public abstract class AbstractExecutable implements Executable {
         }
     }
 
+    protected ExecutableState adjustState(ExecutableState originalState) {
+        return originalState;
+    }
+
     protected void onExecuteStopHook() {
         onExecuteErrorHook(getId());
     }
@@ -337,7 +343,7 @@ public abstract class AbstractExecutable implements Executable {
 
             //The output will be stored in HDFS,not in RS
             if (this instanceof ChainedStageExecutable) {
-                if (newStatus == ExecutableState.SUCCEED) {
+                if (newStatus.isNotBad()) {
                     executableManager.makeStageSuccess(jobId);
                 } else if (newStatus == ExecutableState.ERROR) {
                     executableManager.makeStageError(jobId);
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
index 5a4e92e58c..78cb903ad1 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
@@ -184,7 +184,7 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
     private void executeStep(Executable executable, ExecutableContext context) throws ExecuteException {
         if (executable.isRunnable()) {
             executable.execute(context);
-        } else if (ExecutableState.SUCCEED == executable.getStatus()) {
+        } else if (executable.getStatus().isNotBad()) {
             logger.info("step {} is already succeed, skip it.", executable.getDisplayName());
         } else {
             throw new IllegalStateException("invalid subtask state, sub task:" + executable.getDisplayName()
@@ -289,27 +289,28 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
             logger.info("Sub-task finished {}, state: {}", task.getDisplayName(), task.getStatus());
             boolean taskSucceed = false;
             switch (task.getStatus()) {
-                case RUNNING:
-                    hasError = true;
-                    break;
-                case ERROR:
-                    hasError = true;
-                    break;
-                case DISCARDED:
-                    hasDiscarded = true;
-                    break;
-                case SUICIDAL:
-                    hasSuicidal = true;
-                    break;
-                case PAUSED:
-                    hasPaused = true;
-                    break;
-                case SKIP:
-                case SUCCEED:
-                    taskSucceed = true;
-                    break;
-                default:
-                    break;
+            case RUNNING:
+                hasError = true;
+                break;
+            case ERROR:
+                hasError = true;
+                break;
+            case DISCARDED:
+                hasDiscarded = true;
+                break;
+            case SUICIDAL:
+                hasSuicidal = true;
+                break;
+            case PAUSED:
+                hasPaused = true;
+                break;
+            case SUCCEED:
+            case SKIP:
+            case WARNING:
+                taskSucceed = true;
+                break;
+            default:
+                break;
             }
             allSucceed &= taskSucceed;
         }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
index 9016ee8a40..40999107ce 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
@@ -33,7 +33,7 @@ import com.google.common.collect.Multimaps;
  */
 public enum ExecutableState {
 
-    READY, RUNNING, ERROR, PAUSED, DISCARDED, SUCCEED, SUICIDAL, SKIP;
+    READY, RUNNING, ERROR, PAUSED, DISCARDED, SUCCEED, SUICIDAL, SKIP, WARNING;
 
     private static Multimap<ExecutableState, ExecutableState> VALID_STATE_TRANSFER;
 
@@ -60,6 +60,7 @@ public enum ExecutableState {
         VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.SUICIDAL);
         VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.PAUSED);
         VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.SKIP);
+        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.WARNING);
 
         VALID_STATE_TRANSFER.put(ExecutableState.PAUSED, ExecutableState.DISCARDED);
         VALID_STATE_TRANSFER.put(ExecutableState.PAUSED, ExecutableState.SUICIDAL);
@@ -94,6 +95,12 @@ public enum ExecutableState {
                 this == READY;//restart case
     }
 
+    public boolean isNotBad() {
+        return this == SUCCEED
+                || this == SKIP
+                || this == WARNING;
+    }
+
     public static boolean isValidStateTransfer(ExecutableState from, ExecutableState to) {
         return VALID_STATE_TRANSFER.containsEntry(from, to);
     }
@@ -115,6 +122,8 @@ public enum ExecutableState {
         case SUICIDAL:
         case DISCARDED:
             return JobStatusEnum.DISCARDED;
+        case WARNING:
+            return JobStatusEnum.WARNING;
         default:
             throw new RuntimeException("invalid state:" + this);
         }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
index c9567f1b10..86c60b35dc 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
@@ -47,17 +47,18 @@ public final class ExecuteResult {
     private ExecuteResult(State state, String output, Throwable throwable) {
         Preconditions.checkArgument(state != null, "state cannot be null");
 
-        if (state == State.SUCCEED) {
-            Preconditions.checkNotNull(output);
-            Preconditions.checkState(throwable == null);
-        } else if (state == State.SKIP) {
-            Preconditions.checkNotNull(output);
-            Preconditions.checkState(throwable == null);
-        } else if (state == State.ERROR) {
-            Preconditions.checkNotNull(throwable);
-            Preconditions.checkState(output == null);
-        } else {
-            throw new IllegalStateException();
+        switch (state) {
+            case SUCCEED:
+            case SKIP:
+                Preconditions.checkNotNull(output);
+                Preconditions.checkState(throwable == null);
+                break;
+            case ERROR:
+                Preconditions.checkNotNull(throwable);
+                Preconditions.checkState(output == null);
+                break;
+            default:
+                throw new IllegalStateException();
         }
 
         this.state = state;
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index b22dee724f..4f1b192f1d 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -1238,6 +1238,7 @@ public class NExecutableManager {
             // DISCARDED must not be transferred to any others status
             if ((oldStatus == ExecutableState.PAUSED && newStatus == ExecutableState.ERROR)
                     || (oldStatus == ExecutableState.SKIP && newStatus == ExecutableState.SUCCEED)
+                    || (oldStatus == ExecutableState.WARNING && newStatus == ExecutableState.SUCCEED)
                     || oldStatus == ExecutableState.DISCARDED) {
                 return false;
             }
@@ -1253,6 +1254,12 @@ public class NExecutableManager {
             final int indexSuccessCount = Integer
                     .parseInt(map.getOrDefault(NBatchConstants.P_INDEX_SUCCESS_COUNT, "0"));
             info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, String.valueOf(indexSuccessCount));
+
+            // Add warning_code to stage output info if exists
+            String warningCode;
+            if ((warningCode = map.get(NBatchConstants.P_WARNING_CODE)) != null) {
+                info.put(NBatchConstants.P_WARNING_CODE, warningCode);
+            }
         });
         jobOutput.setInfo(info);
         jobOutput.setLastModified(System.currentTimeMillis());
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
index 4ba31b27c1..3f05c47032 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
@@ -162,6 +162,27 @@ public class NExecutableManagerTest extends NLocalFileMetadataTestCase {
         assertJobEqual(job, anotherJob);
     }
 
+    @Test
+    public void testExecutableStateCorrectness() {
+        assertTrue(ExecutableState.READY.isProgressing());
+        assertTrue(ExecutableState.RUNNING.isProgressing());
+
+        assertTrue(ExecutableState.SUCCEED.isFinalState());
+        assertTrue(ExecutableState.DISCARDED.isFinalState());
+        assertTrue(ExecutableState.SUICIDAL.isFinalState());
+
+        assertTrue(ExecutableState.ERROR.isNotProgressing());
+        assertTrue(ExecutableState.PAUSED.isNotProgressing());
+
+        assertTrue(ExecutableState.DISCARDED.isStoppedNonVoluntarily());
+        assertTrue(ExecutableState.PAUSED.isStoppedNonVoluntarily());
+        assertTrue(ExecutableState.READY.isStoppedNonVoluntarily());
+
+        assertTrue(ExecutableState.SUCCEED.isNotBad());
+        assertTrue(ExecutableState.SKIP.isNotBad());
+        assertTrue(ExecutableState.WARNING.isNotBad());
+    }
+
     @Test
     public void testValidStateTransfer() {
         SucceedTestExecutable job = new SucceedTestExecutable();
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
index 754769280b..bcca549182 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
@@ -55,6 +55,7 @@ public interface NBatchConstants {
     /** use for stage calculate exec ratio */
     String P_INDEX_COUNT = "indexCount";
     String P_INDEX_SUCCESS_COUNT = "indexSuccessCount";
+    String P_WARNING_CODE = "warning_code";
     /** value like : { "segmentId1": 1223, "segmentId2": 1223 } */
     String P_WAITE_TIME = "waiteTime";
 
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java
index d979ece2f8..08a99459e1 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -43,6 +44,10 @@ import lombok.val;
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
 public class NDataLayout implements Serializable {
 
+    public enum AbnormalType {
+        DATA_INCONSISTENT
+    }
+
     public static NDataLayout newDataLayout(NDataflow df, String segId, long layoutId) {
         return newDataLayout(NDataSegDetails.newSegDetails(df, segId), layoutId);
     }
@@ -54,6 +59,13 @@ public class NDataLayout implements Serializable {
         return r;
     }
 
+    public static boolean filterWorkingLayout(NDataLayout layout) {
+        if (layout == null) {
+            return false;
+        }
+        return Objects.isNull(layout.getAbnormalType());
+    }
+
     // ============================================================================
 
     /**
@@ -97,6 +109,11 @@ public class NDataLayout implements Serializable {
     @JsonProperty("multi_partition")
     private List<LayoutPartition> multiPartition = new ArrayList<>();
 
+    @Getter
+    @Setter
+    @JsonProperty("abnormal_type")
+    private AbnormalType abnormalType;
+
     public NDataLayout() {
         this.createTime = System.currentTimeMillis();
     }
@@ -260,7 +277,7 @@ public class NDataLayout implements Serializable {
         if (segDetails == null || !segDetails.isCachedAndShared())
             return false;
 
-        for (NDataLayout cached : segDetails.getLayouts()) {
+        for (NDataLayout cached : segDetails.getWorkingLayouts()) {
             if (cached == this)
                 return true;
         }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java
index 5607e918c5..e9866f3509 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.cube.model;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
@@ -114,18 +115,40 @@ public class NDataSegDetails extends RootPersistentEntity implements Serializabl
 
     public long getTotalRowCount() {
         long count = 0L;
-        for (NDataLayout cuboid : getLayouts()) {
+        for (NDataLayout cuboid : getWorkingLayouts()) {
             count += cuboid.getRows();
         }
         return count;
     }
 
+    /**
+     * @deprecated Deprecated because of non-working layouts were added.
+     * <p>Use {@link NDataSegDetails#getWorkingLayouts} or {@link NDataSegDetails#getAllLayouts} instead.
+     */
+    @Deprecated
     public List<NDataLayout> getLayouts() {
-        return isCachedAndShared() ? ImmutableList.copyOf(layouts) : layouts;
+        return getAllLayouts();
+    }
+
+    public List<NDataLayout> getWorkingLayouts() {
+        List<NDataLayout> workingLayouts = getLayouts0(false);
+        return isCachedAndShared() ? ImmutableList.copyOf(workingLayouts) : workingLayouts;
+    }
+
+    public List<NDataLayout> getAllLayouts() {
+        List<NDataLayout> allLayouts = getLayouts0(true);
+        return isCachedAndShared() ? ImmutableList.copyOf(allLayouts) : allLayouts;
+    }
+
+    private List<NDataLayout> getLayouts0(boolean includingNonWorkingLayouts) {
+        if (includingNonWorkingLayouts) {
+            return layouts;
+        }
+        return layouts.stream().filter(NDataLayout::filterWorkingLayout).collect(Collectors.toList());
     }
 
     public NDataLayout getLayoutById(long layoutId) {
-        for (NDataLayout cuboid : getLayouts()) {
+        for (NDataLayout cuboid : getAllLayouts()) {
             if (cuboid.getLayoutId() == layoutId)
                 return cuboid;
         }
@@ -156,8 +179,8 @@ public class NDataSegDetails extends RootPersistentEntity implements Serializabl
         if (another == this)
             return false;
 
-        List<NDataLayout> currentSortedLayouts = getSortedLayouts(getLayouts());
-        List<NDataLayout> anotherSortedLayouts = getSortedLayouts(another.getLayouts());
+        List<NDataLayout> currentSortedLayouts = getSortedLayouts(getAllLayouts());
+        List<NDataLayout> anotherSortedLayouts = getSortedLayouts(another.getAllLayouts());
         int size = currentSortedLayouts.size();
         if (size != anotherSortedLayouts.size())
             return false;
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
index ddb81e407a..00c2d105a7 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
@@ -275,7 +275,20 @@ public class NDataSegment implements ISegment, Serializable {
         return getLayoutInfo().getLayoutSize();
     }
 
+    @Override
+    public int getWorkingLayoutSize() {
+        return (int) getLayoutInfo().getLayoutsMap().values().stream()
+                .filter(NDataLayout::filterWorkingLayout).count();
+    }
+
     public NDataLayout getLayout(long layoutId) {
+        return getLayout(layoutId, false);
+    }
+
+    public NDataLayout getLayout(long layoutId, boolean includingNonWorkingLayout) {
+        if (includingNonWorkingLayout) {
+            return getAllLayoutsMap().get(layoutId);
+        }
         return getLayoutsMap().get(layoutId);
     }
 
@@ -283,6 +296,10 @@ public class NDataSegment implements ISegment, Serializable {
         return getLayoutInfo().getLayoutsMap();
     }
 
+    public Map<Long, NDataLayout> getAllLayoutsMap() {
+        return getLayoutInfo().getAllLayoutsMap();
+    }
+
     public Set<Long> getLayoutIds() {
         return getLayoutInfo().getLayoutIds();
     }
@@ -314,7 +331,8 @@ public class NDataSegment implements ISegment, Serializable {
         // not required by spark cubing
         private NDataSegDetails segDetails;
         // not required by spark cubing
-        private Map<Long, NDataLayout> layoutsMap = Collections.emptyMap();
+        private Map<Long, NDataLayout> allLayoutsMap = Collections.emptyMap();
+        private Map<Long, NDataLayout> workingLayoutsMap = Collections.emptyMap();
         /**
          * for each layout, partition id -> bucket id
          */
@@ -329,7 +347,10 @@ public class NDataSegment implements ISegment, Serializable {
         }
 
         public LayoutInfo(Map<Long, NDataLayout> layoutsMap) {
-            this.layoutsMap = layoutsMap;
+            this.allLayoutsMap = layoutsMap;
+            this.workingLayoutsMap = layoutsMap.entrySet().stream()
+                    .filter(entry -> NDataLayout.filterWorkingLayout(entry.getValue()))
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
         }
 
         private LayoutInfo(boolean loadDetail) {
@@ -344,37 +365,45 @@ public class NDataSegment implements ISegment, Serializable {
 
             IndexPlan indexPlan = dataflow.getIndexPlan();
             if (!indexPlan.isBroken()) {
-                List<NDataLayout> filteredCuboids = segDetails.getLayouts().stream()
+                List<NDataLayout> filteredCuboids = segDetails.getAllLayouts().stream()
                         .filter(dataLayout -> dataLayout.getLayout() != null).collect(Collectors.toList());
                 segDetails.setLayouts(filteredCuboids);
             }
 
             segDetails.setCachedAndShared(dataflow.isCachedAndShared());
-            List<NDataLayout> cuboids = segDetails.getLayouts();
-            layoutsMap = new HashMap<>(cuboids.size());
-            for (NDataLayout cuboid : cuboids) {
-                layoutsMap.put(cuboid.getLayoutId(), cuboid);
-                Map<Long, Long> cuboidBucketMap = Maps.newHashMap();
-                cuboid.getMultiPartition().forEach(dataPartition -> cuboidBucketMap.put(dataPartition.getPartitionId(),
-                        dataPartition.getBucketId()));
-                partitionBucketMap.put(cuboid.getLayoutId(), cuboidBucketMap);
+            List<NDataLayout> allLayouts = segDetails.getAllLayouts();
+            allLayoutsMap = Maps.newHashMap();
+            workingLayoutsMap = Maps.newHashMap();
+            for (NDataLayout layout : allLayouts) {
+                allLayoutsMap.put(layout.getLayoutId(), layout);
+                if (NDataLayout.filterWorkingLayout(layout)) {
+                    workingLayoutsMap.put(layout.getLayoutId(), layout);
+                    Map<Long, Long> cuboidBucketMap = Maps.newHashMap();
+                    layout.getMultiPartition().forEach(dataPartition -> cuboidBucketMap.put(dataPartition.getPartitionId(),
+                            dataPartition.getBucketId()));
+                    partitionBucketMap.put(layout.getLayoutId(), cuboidBucketMap);
+                }
             }
         }
 
         public int getLayoutSize() {
-            return layoutsMap.size();
+            return workingLayoutsMap.size();
         }
 
         public NDataLayout getLayout(long layoutId) {
-            return layoutsMap.get(layoutId);
+            return workingLayoutsMap.get(layoutId);
         }
 
         public Map<Long, NDataLayout> getLayoutsMap() {
-            return layoutsMap;
+            return workingLayoutsMap;
+        }
+
+        public Map<Long, NDataLayout> getAllLayoutsMap() {
+            return allLayoutsMap;
         }
 
         public Set<Long> getLayoutIds() {
-            return layoutsMap.keySet();
+            return workingLayoutsMap.keySet();
         }
 
         public List<Long> getMultiPartitionIds() {
@@ -390,8 +419,8 @@ public class NDataSegment implements ISegment, Serializable {
         }
 
         public boolean isAlreadyBuilt(long layoutId) {
-            if (Objects.nonNull(layoutsMap) && layoutsMap.containsKey(layoutId)) {
-                return layoutsMap.get(layoutId).isReady();
+            if (Objects.nonNull(workingLayoutsMap) && workingLayoutsMap.containsKey(layoutId)) {
+                return workingLayoutsMap.get(layoutId).isReady();
             }
             return false;
         }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
index f22dd1838d..875201fe65 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
@@ -730,7 +730,7 @@ public class NDataflowManager implements IRealizationProvider {
 
     private void updateSegmentStatus(NDataSegment seg) {
         NDataSegDetails segDetails = NDataSegDetailsManager.getInstance(seg.getConfig(), project).getForSegment(seg);
-        if (seg.getStatus() == SegmentStatusEnum.WARNING && segDetails != null && segDetails.getLayouts().isEmpty()) {
+        if (seg.getStatus() == SegmentStatusEnum.WARNING && segDetails != null && segDetails.getAllLayouts().isEmpty()) {
             seg.setStatus(SegmentStatusEnum.READY);
         }
     }
@@ -830,7 +830,7 @@ public class NDataflowManager implements IRealizationProvider {
         }
         val affectedLayouts = Lists.newArrayList();
         for (NDataSegment segment : updateSegments) {
-            val layouts = segment.getSegDetails().getLayouts();
+            val layouts = segment.getSegDetails().getAllLayouts();
             layouts.forEach(dataLayout -> {
                 if (dataLayout.removeMultiPartition(toBeDeletedPartIds)) {
                     affectedLayouts.add(dataLayout);
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java
index dfe2cd16f9..0b4b8ca873 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java
@@ -142,7 +142,7 @@ public class SegmentPartition implements Serializable {
                 return 0;
             }
             storageSize = dataSegment.getSegDetails() //
-                    .getLayouts().stream() //
+                    .getWorkingLayouts().stream() //
                     .flatMap(layout -> layout.getMultiPartition().stream()) //
                     .filter(partition -> partition.getPartitionId() == partitionId) //
                     .mapToLong(LayoutPartition::getByteSize).sum();
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
index 46f72933ef..146eead0f4 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
@@ -43,6 +43,8 @@ public interface ISegment extends Comparable<ISegment> {
 
     public int getLayoutSize();
 
+    public int getWorkingLayoutSize();
+
     public NDataModel getModel();
 
     public SegmentStatusEnum getStatus();
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
index 0d2e6cbaf3..f3ed44b83f 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
@@ -68,7 +68,7 @@ public class NProjectManagerTest extends NLocalFileMetadataTestCase {
         }
 
         val projects = projectManager.listAllProjects();
-        Assert.assertEquals(28, projects.size());
+        Assert.assertEquals(29, projects.size());
         Assert.assertTrue(projects.stream().noneMatch(p -> p.getName().equals("test")));
     }
 
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java
index ce18241853..28a8c5fada 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java
@@ -90,7 +90,7 @@ public class HdfsCapacityMetricsTest extends NLocalFileMetadataTestCase {
         }
         Assert.assertTrue(hdfsCapacityMetrics.getWorkingDirCapacity().isEmpty());
         hdfsCapacityMetrics.writeHdfsMetrics();
-        Assert.assertEquals(28, hdfsCapacityMetrics.getWorkingDirCapacity().size());
+        Assert.assertEquals(29, hdfsCapacityMetrics.getWorkingDirCapacity().size());
 
     }
 
@@ -139,4 +139,4 @@ public class HdfsCapacityMetricsTest extends NLocalFileMetadataTestCase {
         HdfsCapacityMetrics hdfsCapacityMetrics = new HdfsCapacityMetrics(getTestConfig());
         Assert.assertEquals(0L, (long) hdfsCapacityMetrics.getHdfsCapacityByProject("kylin"));
     }
-}
\ No newline at end of file
+}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
index c974b4caeb..36719e6abb 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
@@ -249,7 +249,8 @@ public class ExecutableResponse implements Comparable<ExecutableResponse> {
                     continue;
                 }
             }
-            if (ExecutableState.SUCCEED == task.getStatus() || ExecutableState.SKIP == task.getStatus()) {
+
+            if (task.getStatus().isNotBad()) {
                 successSteps++;
             }
         }
@@ -279,7 +280,8 @@ public class ExecutableResponse implements Comparable<ExecutableResponse> {
         var successStages = 0D;
         for (StageBase stage : stageBases) {
             if (ExecutableState.SUCCEED == stage.getStatus(segmentId)
-                    || stage.getStatus(segmentId) == ExecutableState.SKIP) {
+                    || ExecutableState.SKIP == stage.getStatus(segmentId)
+                    || ExecutableState.WARNING == stage.getStatus(segmentId)) {
                 successStages += 1;
                 continue;
             }
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableStepResponse.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableStepResponse.java
index edd980320f..c0c9f1282b 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableStepResponse.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableStepResponse.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.Maps;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
 
@@ -152,5 +153,7 @@ public class ExecutableStepResponse {
         private long execEndTime;
         @JsonProperty("stage")
         private List<ExecutableStepResponse> stage;
+        @JsonProperty("info")
+        private Map<String, String> info = Maps.newHashMap();
     }
 }
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index eb668435fa..dad56e01d3 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -829,6 +829,12 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl
         val stepCount = stageResponses.isEmpty() ? 1 : stageResponses.size();
         val stepRatio = (float) ExecutableResponse.calculateSuccessStage(task, segmentId, stageBases, true) / stepCount;
         segmentSubStages.setStepRatio(stepRatio);
+
+        // Put warning message into segment_sub_stages.info if exists
+        Optional<ExecutableStepResponse> warningStageRes = stageResponses.stream().filter(stageRes ->
+                stageRes.getStatus() == JobStatusEnum.WARNING).findFirst();
+        warningStageRes.ifPresent(res -> segmentSubStages.getInfo().put(NBatchConstants.P_WARNING_CODE,
+                res.getInfo().getOrDefault(NBatchConstants.P_WARNING_CODE, null)));
     }
 
     private void setStage(List<ExecutableStepResponse> responses, ExecutableStepResponse newResponse) {
@@ -844,7 +850,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl
              */
             Set<JobStatusEnum> jobStatusEnums = Sets.newHashSet(JobStatusEnum.ERROR, JobStatusEnum.STOPPED,
                     JobStatusEnum.DISCARDED);
-            Set<JobStatusEnum> jobFinishOrSkip = Sets.newHashSet(JobStatusEnum.FINISHED, JobStatusEnum.SKIP);
+            Set<JobStatusEnum> jobFinishOrSkip = Sets.newHashSet(JobStatusEnum.FINISHED, JobStatusEnum.SKIP, JobStatusEnum.WARNING);
             if (oldResponse.getStatus() != newResponse.getStatus()
                     && !jobStatusEnums.contains(oldResponse.getStatus())) {
                 if (jobStatusEnums.contains(newResponse.getStatus())) {
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
index b69c00eb9b..e565a194fd 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
@@ -549,6 +549,9 @@ public class StageTest extends NLocalFileMetadataTestCase {
 
         result = ExecutableState.DISCARDED.toJobStatus();
         Assert.assertEquals(JobStatusEnum.DISCARDED, result);
+
+        result = ExecutableState.WARNING.toJobStatus();
+        Assert.assertEquals(JobStatusEnum.WARNING, result);
     }
 
     @Test
diff --git a/src/examples/test_case_data/localmeta/metadata/_global/project/index_build_test.json b/src/examples/test_case_data/localmeta/metadata/_global/project/index_build_test.json
new file mode 100644
index 0000000000..7c1aadeca6
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/_global/project/index_build_test.json
@@ -0,0 +1,40 @@
+{
+  "uuid": "63b21fc0-1bbd-3570-b2ac-54fbb0fb8181",
+  "last_modified": 1675944232432,
+  "create_time": 1675944228905,
+  "version": "4.0.0.0",
+  "name": "index_build_test",
+  "owner": "ADMIN",
+  "status": "ENABLED",
+  "create_time_utc": 1675944228905,
+  "default_database": "DEFAULT",
+  "description": null,
+  "principal": null,
+  "keytab": null,
+  "maintain_model_type": "MANUAL_MAINTAIN",
+  "override_kylin_properties": {
+    "kylin.metadata.semi-automatic-mode": "true",
+    "kylin.query.metadata.expose-computed-column": "true",
+    "kylin.source.default": "9"
+  },
+  "segment_config": {
+    "auto_merge_enabled": false,
+    "auto_merge_time_ranges": [
+      "WEEK",
+      "MONTH",
+      "QUARTER",
+      "YEAR"
+    ],
+    "volatile_range": {
+      "volatile_range_number": 0,
+      "volatile_range_enabled": false,
+      "volatile_range_type": "DAY"
+    },
+    "retention_range": {
+      "retention_range_number": 1,
+      "retention_range_enabled": false,
+      "retention_range_type": "MONTH"
+    },
+    "create_empty_segment_enabled": false
+  }
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow/3ec47efc-573a-9304-4405-8e05ae184322.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow/3ec47efc-573a-9304-4405-8e05ae184322.json
new file mode 100644
index 0000000000..d020817c44
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow/3ec47efc-573a-9304-4405-8e05ae184322.json
@@ -0,0 +1,70 @@
+{
+  "uuid": "3ec47efc-573a-9304-4405-8e05ae184322",
+  "last_modified": 1675944717956,
+  "create_time": 1675944415945,
+  "version": "4.0.0.0",
+  "status": "ONLINE",
+  "last_status": null,
+  "cost": 50,
+  "query_hit_count": 0,
+  "last_query_time": 0,
+  "layout_query_hit_count": {},
+  "segments": [
+    {
+      "id": "b2f206e1-7a15-c94a-20f5-f608d550ead6",
+      "name": "FULL_BUILD",
+      "create_time_utc": 1675944504890,
+      "status": "READY",
+      "segRange": {
+        "@class": "org.apache.kylin.metadata.model.SegmentRange$TimePartitionedSegmentRange",
+        "date_range_start": 0,
+        "date_range_end": 9223372036854775807
+      },
+      "timeRange": null,
+      "dimension_range_info_map": {
+        "0": {
+          "min": "1",
+          "max": "6"
+        },
+        "24": {
+          "min": "1",
+          "max": "5"
+        },
+        "13": {
+          "min": "5",
+          "max": "15"
+        },
+        "22": {
+          "min": "Customer#000000001",
+          "max": "Customer#000000005"
+        },
+        "16": {
+          "min": "1",
+          "max": "5"
+        }
+      },
+      "parameters": null,
+      "dictionaries": null,
+      "snapshots": null,
+      "last_build_time": 1675944668815,
+      "source_count": 6,
+      "source_bytes_size": 27732,
+      "column_source_bytes": {
+        "SSB.CUSTOMER.C_NAME": 108,
+        "SSB.LINEORDER.LO_QUANTITY": 9,
+        "SSB.LINEORDER.LO_ORDERKEY": 6,
+        "SSB.LINEORDER.LO_CUSTKEY": 6,
+        "SSB.CUSTOMER.C_CUSTKEY": 6
+      },
+      "ori_snapshot_size": {},
+      "additionalInfo": {},
+      "is_realtime_segment": false,
+      "is_snapshot_ready": false,
+      "is_dict_ready": true,
+      "is_flat_table_ready": true,
+      "is_fact_view_ready": false,
+      "multi_partitions": [],
+      "max_bucket_id": -1
+    }
+  ]
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow_details/3ec47efc-573a-9304-4405-8e05ae184322/b2f206e1-7a15-c94a-20f5-f608d550ead6.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow_details/3ec47efc-573a-9304-4405-8e05ae184322/b2f206e1-7a15-c94a-20f5-f608d550ead6.json
new file mode 100644
index 0000000000..b7512dc9f2
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow_details/3ec47efc-573a-9304-4405-8e05ae184322/b2f206e1-7a15-c94a-20f5-f608d550ead6.json
@@ -0,0 +1,69 @@
+{
+  "uuid": "b2f206e1-7a15-c94a-20f5-f608d550ead6",
+  "last_modified": 1675944504890,
+  "create_time": 1675944504890,
+  "version": "4.0.0.0",
+  "dataflow": "3ec47efc-573a-9304-4405-8e05ae184322",
+  "layout_instances": [
+    {
+      "layout_id": 1,
+      "build_job_id": "944d7367-29e5-a5bd-373c-f01b34c078d2-3ec47efc-573a-9304-4405-8e05ae184322",
+      "rows": 6,
+      "byte_size": 2021,
+      "file_count": 1,
+      "source_rows": 6,
+      "source_byte_size": 0,
+      "partition_num": 1,
+      "partition_values": [],
+      "is_ready": false,
+      "create_time": 1675944665659,
+      "multi_partition": [],
+      "abnormal_type": null
+    },
+    {
+      "layout_id": 10001,
+      "build_job_id": "944d7367-29e5-a5bd-373c-f01b34c078d2-3ec47efc-573a-9304-4405-8e05ae184322",
+      "rows": 6,
+      "byte_size": 2021,
+      "file_count": 1,
+      "source_rows": 6,
+      "source_byte_size": 0,
+      "partition_num": 1,
+      "partition_values": [],
+      "is_ready": false,
+      "create_time": 1675944665659,
+      "multi_partition": [],
+      "abnormal_type": null
+    },
+    {
+      "layout_id": 20000000001,
+      "build_job_id": "944d7367-29e5-a5bd-373c-f01b34c078d2-3ec47efc-573a-9304-4405-8e05ae184322",
+      "rows": 6,
+      "byte_size": 1739,
+      "file_count": 1,
+      "source_rows": 6,
+      "source_byte_size": 0,
+      "partition_num": 1,
+      "partition_values": [],
+      "is_ready": false,
+      "create_time": 1675944665659,
+      "multi_partition": [],
+      "abnormal_type": null
+    },
+    {
+      "layout_id": 20000010001,
+      "build_job_id": "944d7367-29e5-a5bd-373c-f01b34c078d2-3ec47efc-573a-9304-4405-8e05ae184322",
+      "rows": 6,
+      "byte_size": 1739,
+      "file_count": 1,
+      "source_rows": 6,
+      "source_byte_size": 0,
+      "partition_num": 1,
+      "partition_values": [],
+      "is_ready": false,
+      "create_time": 1675944665659,
+      "multi_partition": [],
+      "abnormal_type": null
+    }
+  ]
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/index_plan/3ec47efc-573a-9304-4405-8e05ae184322.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/index_plan/3ec47efc-573a-9304-4405-8e05ae184322.json
new file mode 100644
index 0000000000..f53bfcb30c
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/index_plan/3ec47efc-573a-9304-4405-8e05ae184322.json
@@ -0,0 +1,204 @@
+{
+  "uuid": "3ec47efc-573a-9304-4405-8e05ae184322",
+  "last_modified": 1675945219064,
+  "create_time": 1675944415943,
+  "version": "4.0.0.0",
+  "description": null,
+  "rule_based_index": null,
+  "indexes": [
+    {
+      "id": 10000,
+      "dimensions": [
+        24,
+        22
+      ],
+      "measures": [
+        100000,
+        100001
+      ],
+      "layouts": [
+        {
+          "id": 10001,
+          "name": null,
+          "owner": null,
+          "col_order": [
+            24,
+            22,
+            100000,
+            100001
+          ],
+          "shard_by_columns": [],
+          "partition_by_columns": [],
+          "sort_by_columns": [],
+          "storage_type": 20,
+          "update_time": 1675945215262,
+          "manual": false,
+          "auto": true,
+          "base": false,
+          "draft_version": null,
+          "index_range": null
+        }
+      ],
+      "next_layout_offset": 2
+    },
+    {
+      "id": 20000000000,
+      "dimensions": [
+        0,
+        13,
+        16,
+        24,
+        22
+      ],
+      "measures": [],
+      "layouts": [
+        {
+          "id": 20000000001,
+          "name": null,
+          "owner": null,
+          "col_order": [
+            0,
+            13,
+            16,
+            24,
+            22
+          ],
+          "shard_by_columns": [],
+          "partition_by_columns": [],
+          "sort_by_columns": [],
+          "storage_type": 20,
+          "update_time": 1675944415943,
+          "manual": false,
+          "auto": false,
+          "base": true,
+          "draft_version": null,
+          "index_range": null
+        }
+      ],
+      "next_layout_offset": 2
+    },
+    {
+      "id": 20000010000,
+      "dimensions": [
+        0,
+        13
+      ],
+      "measures": [],
+      "layouts": [
+        {
+          "id": 20000010001,
+          "name": null,
+          "owner": null,
+          "col_order": [
+            0,
+            13
+          ],
+          "shard_by_columns": [],
+          "partition_by_columns": [],
+          "sort_by_columns": [],
+          "storage_type": 20,
+          "update_time": 1675944415943,
+          "manual": false,
+          "auto": false,
+          "base": true,
+          "draft_version": null,
+          "index_range": null
+        }
+      ],
+      "next_layout_offset": 2
+    },
+    {
+      "id": 20000,
+      "dimensions": [
+        0,
+        13,
+        16,
+        24,
+        22
+      ],
+      "measures": [
+        100000,
+        100001
+      ],
+      "layouts": [
+        {
+          "id": 20001,
+          "name": null,
+          "owner": "ADMIN",
+          "col_order": [
+            0,
+            13,
+            16,
+            24,
+            22,
+            100000,
+            100001
+          ],
+          "shard_by_columns": [],
+          "partition_by_columns": [],
+          "sort_by_columns": [],
+          "storage_type": 20,
+          "update_time": 1675945219064,
+          "manual": false,
+          "auto": false,
+          "base": true,
+          "draft_version": null,
+          "index_range": null
+        }
+      ],
+      "next_layout_offset": 2
+    }
+  ],
+  "override_properties": {},
+  "to_be_deleted_indexes": [
+    {
+      "id": 0,
+      "dimensions": [
+        0,
+        13,
+        16,
+        24,
+        22
+      ],
+      "measures": [
+        100000
+      ],
+      "layouts": [
+        {
+          "id": 1,
+          "name": null,
+          "owner": null,
+          "col_order": [
+            0,
+            13,
+            16,
+            24,
+            22,
+            100000
+          ],
+          "shard_by_columns": [],
+          "partition_by_columns": [],
+          "sort_by_columns": [],
+          "storage_type": 20,
+          "update_time": 1675944415943,
+          "manual": false,
+          "auto": false,
+          "base": true,
+          "draft_version": null,
+          "index_range": null
+        }
+      ],
+      "next_layout_offset": 2
+    }
+  ],
+  "auto_merge_time_ranges": null,
+  "retention_range": 0,
+  "engine_type": 80,
+  "next_aggregation_index_id": 30000,
+  "next_table_index_id": 20000020000,
+  "agg_shard_by_columns": [],
+  "extend_partition_columns": [],
+  "layout_bucket_num": {},
+  "approved_additional_recs": 1,
+  "approved_removal_recs": 0
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/model_desc/3ec47efc-573a-9304-4405-8e05ae184322.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/model_desc/3ec47efc-573a-9304-4405-8e05ae184322.json
new file mode 100644
index 0000000000..f75ddbffb2
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/model_desc/3ec47efc-573a-9304-4405-8e05ae184322.json
@@ -0,0 +1,241 @@
+{
+  "uuid": "3ec47efc-573a-9304-4405-8e05ae184322",
+  "last_modified": 1675999084906,
+  "create_time": 1675999084809,
+  "version": "4.0.0.0",
+  "alias": "INDEX_BUILD_ON_SEGMENT",
+  "owner": "ADMIN",
+  "config_last_modifier": null,
+  "config_last_modified": 0,
+  "description": "",
+  "fact_table": "SSB.LINEORDER",
+  "fact_table_alias": null,
+  "management_type": "MODEL_BASED",
+  "join_tables": [
+    {
+      "table": "SSB.CUSTOMER",
+      "kind": "LOOKUP",
+      "alias": "CUSTOMER",
+      "join": {
+        "type": "INNER",
+        "primary_key": [
+          "CUSTOMER.C_CUSTKEY"
+        ],
+        "foreign_key": [
+          "LINEORDER.LO_CUSTKEY"
+        ],
+        "non_equi_join_condition": null,
+        "primary_table": null,
+        "foreign_table": null
+      },
+      "flattenable": "flatten",
+      "join_relation_type": "MANY_TO_ONE"
+    }
+  ],
+  "filter_condition": "",
+  "partition_desc": null,
+  "capacity": "MEDIUM",
+  "segment_config": {
+    "auto_merge_enabled": null,
+    "auto_merge_time_ranges": null,
+    "volatile_range": null,
+    "retention_range": null,
+    "create_empty_segment_enabled": false
+  },
+  "data_check_desc": null,
+  "semantic_version": 0,
+  "storage_type": 0,
+  "model_type": "BATCH",
+  "all_named_columns": [
+    {
+      "id": 0,
+      "name": "LO_ORDERKEY",
+      "column": "LINEORDER.LO_ORDERKEY",
+      "status": "DIMENSION"
+    },
+    {
+      "id": 1,
+      "name": "LO_PARTKEY",
+      "column": "LINEORDER.LO_PARTKEY"
+    },
+    {
+      "id": 2,
+      "name": "LO_DISCOUNT",
+      "column": "LINEORDER.LO_DISCOUNT"
+    },
+    {
+      "id": 3,
+      "name": "LO_SUPPLYCOST",
+      "column": "LINEORDER.LO_SUPPLYCOST"
+    },
+    {
+      "id": 4,
+      "name": "LO_COMMITDATE",
+      "column": "LINEORDER.LO_COMMITDATE"
+    },
+    {
+      "id": 5,
+      "name": "LO_EXTENDEDPRICE",
+      "column": "LINEORDER.LO_EXTENDEDPRICE"
+    },
+    {
+      "id": 6,
+      "name": "LO_TAX",
+      "column": "LINEORDER.LO_TAX"
+    },
+    {
+      "id": 7,
+      "name": "LO_SUPPKEY",
+      "column": "LINEORDER.LO_SUPPKEY"
+    },
+    {
+      "id": 8,
+      "name": "LO_ORDTOTALPRICE",
+      "column": "LINEORDER.LO_ORDTOTALPRICE"
+    },
+    {
+      "id": 9,
+      "name": "LO_REVENUE",
+      "column": "LINEORDER.LO_REVENUE"
+    },
+    {
+      "id": 10,
+      "name": "LO_ORDERDATE",
+      "column": "LINEORDER.LO_ORDERDATE"
+    },
+    {
+      "id": 11,
+      "name": "LO_ORDERPRIOTITY",
+      "column": "LINEORDER.LO_ORDERPRIOTITY"
+    },
+    {
+      "id": 12,
+      "name": "LO_SHIPPRIOTITY",
+      "column": "LINEORDER.LO_SHIPPRIOTITY"
+    },
+    {
+      "id": 13,
+      "name": "LO_QUANTITY",
+      "column": "LINEORDER.LO_QUANTITY",
+      "status": "DIMENSION"
+    },
+    {
+      "id": 14,
+      "name": "LO_SHIPMODE",
+      "column": "LINEORDER.LO_SHIPMODE"
+    },
+    {
+      "id": 15,
+      "name": "LO_LINENUMBER",
+      "column": "LINEORDER.LO_LINENUMBER"
+    },
+    {
+      "id": 16,
+      "name": "LO_CUSTKEY",
+      "column": "LINEORDER.LO_CUSTKEY",
+      "status": "DIMENSION"
+    },
+    {
+      "id": 17,
+      "name": "C_ADDRESS",
+      "column": "CUSTOMER.C_ADDRESS"
+    },
+    {
+      "id": 18,
+      "name": "C_NATION",
+      "column": "CUSTOMER.C_NATION"
+    },
+    {
+      "id": 19,
+      "name": "C_CITY",
+      "column": "CUSTOMER.C_CITY"
+    },
+    {
+      "id": 20,
+      "name": "C_PHONE",
+      "column": "CUSTOMER.C_PHONE"
+    },
+    {
+      "id": 21,
+      "name": "C_REGION",
+      "column": "CUSTOMER.C_REGION"
+    },
+    {
+      "id": 22,
+      "name": "C_NAME",
+      "column": "CUSTOMER.C_NAME",
+      "status": "DIMENSION"
+    },
+    {
+      "id": 23,
+      "name": "C_MKTSEGMENT",
+      "column": "CUSTOMER.C_MKTSEGMENT"
+    },
+    {
+      "id": 24,
+      "name": "C_CUSTKEY",
+      "column": "CUSTOMER.C_CUSTKEY",
+      "status": "DIMENSION"
+    }
+  ],
+  "all_measures": [
+    {
+      "name": "COUNT_ALL",
+      "function": {
+        "expression": "COUNT",
+        "parameters": [
+          {
+            "type": "constant",
+            "value": "1"
+          }
+        ],
+        "returntype": "bigint"
+      },
+      "column": null,
+      "comment": null,
+      "id": 100000,
+      "type": "NORMAL",
+      "internal_ids": []
+    },
+    {
+      "name": "SUM_LINEORDER_LO_QUANTITY",
+      "function": {
+        "expression": "SUM",
+        "parameters": [
+          {
+            "type": "column",
+            "value": "LINEORDER.LO_QUANTITY"
+          }
+        ],
+        "returntype": "bigint"
+      },
+      "column": null,
+      "comment": null,
+      "id": 100001,
+      "type": "NORMAL",
+      "internal_ids": []
+    }
+  ],
+  "recommendations_count": 0,
+  "computed_columns": [],
+  "canvas": {
+    "coordinate": {
+      "LINEORDER": {
+        "x": 752.8888617621528,
+        "y": 126.22219509548611,
+        "width": 200.0,
+        "height": 230.0
+      },
+      "CUSTOMER": {
+        "x": 745.111083984375,
+        "y": 468.44441731770837,
+        "width": 200.0,
+        "height": 230.0
+      }
+    },
+    "zoom": 9.0
+  },
+  "multi_partition_desc": null,
+  "multi_partition_key_mapping": null,
+  "fusion_id": null
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.CUSTOMER.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.CUSTOMER.json
new file mode 100644
index 0000000000..c84432550e
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.CUSTOMER.json
@@ -0,0 +1,77 @@
+{
+  "uuid": "970b1e03-75fd-6152-ce22-7c6d19bb1760",
+  "last_modified": 0,
+  "create_time": 1675998714528,
+  "version": "4.0.0.0",
+  "name": "CUSTOMER",
+  "columns": [
+    {
+      "id": "1",
+      "name": "C_CUSTKEY",
+      "datatype": "bigint",
+      "case_sensitive_name": "c_custkey"
+    },
+    {
+      "id": "2",
+      "name": "C_NAME",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "c_name"
+    },
+    {
+      "id": "3",
+      "name": "C_ADDRESS",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "c_address"
+    },
+    {
+      "id": "4",
+      "name": "C_CITY",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "c_city"
+    },
+    {
+      "id": "5",
+      "name": "C_NATION",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "c_nation"
+    },
+    {
+      "id": "6",
+      "name": "C_REGION",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "c_region"
+    },
+    {
+      "id": "7",
+      "name": "C_PHONE",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "c_phone"
+    },
+    {
+      "id": "8",
+      "name": "C_MKTSEGMENT",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "c_mktsegment"
+    }
+  ],
+  "source_type": 9,
+  "table_type": "EXTERNAL",
+  "top": false,
+  "increment_loading": false,
+  "last_snapshot_path": null,
+  "last_snapshot_size": 0,
+  "snapshot_last_modified": 0,
+  "query_hit_count": 0,
+  "partition_column": null,
+  "snapshot_partitions": {},
+  "snapshot_partitions_info": {},
+  "snapshot_total_rows": 0,
+  "snapshot_partition_col": null,
+  "selected_snapshot_partition_col": null,
+  "temp_snapshot_path": null,
+  "snapshot_has_broken": false,
+  "database": "SSB",
+  "transactional": false,
+  "rangePartition": false,
+  "partition_desc": null
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.LINEORDER.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.LINEORDER.json
new file mode 100644
index 0000000000..08bda05859
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.LINEORDER.json
@@ -0,0 +1,131 @@
+{
+  "uuid": "33529927-8bc2-d0fb-33a3-562db986be6b",
+  "last_modified": 0,
+  "create_time": 1675998714544,
+  "version": "4.0.0.0",
+  "name": "LINEORDER",
+  "columns": [
+    {
+      "id": "1",
+      "name": "LO_ORDERKEY",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_orderkey"
+    },
+    {
+      "id": "2",
+      "name": "LO_LINENUMBER",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_linenumber"
+    },
+    {
+      "id": "3",
+      "name": "LO_CUSTKEY",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_custkey"
+    },
+    {
+      "id": "4",
+      "name": "LO_PARTKEY",
+      "datatype": "integer",
+      "case_sensitive_name": "lo_partkey"
+    },
+    {
+      "id": "5",
+      "name": "LO_SUPPKEY",
+      "datatype": "integer",
+      "case_sensitive_name": "lo_suppkey"
+    },
+    {
+      "id": "6",
+      "name": "LO_ORDERDATE",
+      "datatype": "date",
+      "case_sensitive_name": "lo_orderdate"
+    },
+    {
+      "id": "7",
+      "name": "LO_ORDERPRIOTITY",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "lo_orderpriotity"
+    },
+    {
+      "id": "8",
+      "name": "LO_SHIPPRIOTITY",
+      "datatype": "integer",
+      "case_sensitive_name": "lo_shippriotity"
+    },
+    {
+      "id": "9",
+      "name": "LO_QUANTITY",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_quantity"
+    },
+    {
+      "id": "10",
+      "name": "LO_EXTENDEDPRICE",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_extendedprice"
+    },
+    {
+      "id": "11",
+      "name": "LO_ORDTOTALPRICE",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_ordtotalprice"
+    },
+    {
+      "id": "12",
+      "name": "LO_DISCOUNT",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_discount"
+    },
+    {
+      "id": "13",
+      "name": "LO_REVENUE",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_revenue"
+    },
+    {
+      "id": "14",
+      "name": "LO_SUPPLYCOST",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_supplycost"
+    },
+    {
+      "id": "15",
+      "name": "LO_TAX",
+      "datatype": "bigint",
+      "case_sensitive_name": "lo_tax"
+    },
+    {
+      "id": "16",
+      "name": "LO_COMMITDATE",
+      "datatype": "date",
+      "case_sensitive_name": "lo_commitdate"
+    },
+    {
+      "id": "17",
+      "name": "LO_SHIPMODE",
+      "datatype": "varchar(4096)",
+      "case_sensitive_name": "lo_shipmode"
+    }
+  ],
+  "source_type": 9,
+  "table_type": "EXTERNAL",
+  "top": false,
+  "increment_loading": false,
+  "last_snapshot_path": null,
+  "last_snapshot_size": 0,
+  "snapshot_last_modified": 0,
+  "query_hit_count": 0,
+  "partition_column": null,
+  "snapshot_partitions": {},
+  "snapshot_partitions_info": {},
+  "snapshot_total_rows": 0,
+  "snapshot_partition_col": null,
+  "selected_snapshot_partition_col": null,
+  "temp_snapshot_path": null,
+  "snapshot_has_broken": false,
+  "database": "SSB",
+  "transactional": false,
+  "rangePartition": false,
+  "partition_desc": null
+}
diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java
index 24124071a5..fe5bdc8cfc 100644
--- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java
+++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java
@@ -48,6 +48,8 @@ import org.apache.kylin.rest.response.TableIndexResponse;
 import org.apache.kylin.rest.service.FusionIndexService;
 import org.apache.kylin.rest.service.IndexPlanService;
 import org.apache.kylin.rest.service.ModelService;
+import org.apache.kylin.rest.service.params.IndexPlanParams;
+import org.apache.kylin.rest.service.params.PaginationParams;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.web.bind.annotation.DeleteMapping;
@@ -178,6 +180,7 @@ public class NIndexPlanController extends NBasicController {
     @GetMapping(value = "/index")
     public EnvelopeResponse<FusionRuleDataResult<List<IndexResponse>>> getIndex(
             @RequestParam(value = "project") String project, @RequestParam(value = "model") String modelId, //
+            @RequestParam(value = "segment_id", required = false, defaultValue = "") String segmentId,
             @RequestParam(value = "sort_by", required = false, defaultValue = "") String order,
             @RequestParam(value = "reverse", required = false, defaultValue = "false") Boolean desc,
             @RequestParam(value = "sources", required = false, defaultValue = "") List<IndexEntity.Source> sources,
@@ -189,7 +192,9 @@ public class NIndexPlanController extends NBasicController {
             @RequestParam(value = "range", required = false, defaultValue = "") List<IndexEntity.Range> range) {
         checkProjectName(project);
         checkRequiredArg(MODEL_ID, modelId);
-        val indexes = fusionIndexService.getIndexes(project, modelId, key, status, order, desc, sources, ids, range);
+        IndexPlanParams indexPlanParams = new IndexPlanParams(project, modelId, segmentId, ids, sources, status, range);
+        PaginationParams paginationParams = new PaginationParams(offset, limit, order, desc);
+        val indexes = fusionIndexService.getIndexes(indexPlanParams, paginationParams, key);
         val indexUpdateEnabled = FusionIndexService.checkUpdateIndexEnabled(project, modelId);
         return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
                 FusionRuleDataResult.get(indexes, offset, limit, indexUpdateEnabled), "");
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/IndexResponse.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/IndexResponse.java
index b437c2f857..4ae77b249a 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/IndexResponse.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/IndexResponse.java
@@ -19,6 +19,7 @@ package org.apache.kylin.rest.response;
 
 import java.util.List;
 
+import org.apache.kylin.metadata.cube.model.NDataLayout;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.cube.model.IndexEntity.Source;
@@ -101,6 +102,9 @@ public class IndexResponse {
 
     }
 
+    @JsonProperty("abnormal_type")
+    private NDataLayout.AbnormalType abnormalType;
+
     @Data
     @AllArgsConstructor
     public static class ColOrderPair {
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java
index 47eb6a6140..8af94f0868 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java
@@ -120,14 +120,14 @@ public class NDataSegmentResponse extends NDataSegment {
         startTime = Long.parseLong(getSegRange().getStart().toString());
         endTime = Long.parseLong(getSegRange().getEnd().toString());
         storage = bytesSize;
-        indexCount = segment.getLayoutSize();
+        indexCount = segment.getWorkingLayoutSize();
         indexCountTotal = segment.getIndexPlan().getAllLayoutsSize(true);
         multiPartitionCount = segment.getMultiPartitions().size();
         hasBaseAggIndex = segment.getIndexPlan().containBaseAggLayout();
         hasBaseTableIndex = segment.getIndexPlan().containBaseTableLayout();
         if (segment.getIndexPlan().getBaseTableLayout() != null) {
             val indexPlan = segment.getDataflow().getIndexPlan();
-            long segmentFileCount = segment.getSegDetails().getLayouts().stream()
+            long segmentFileCount = segment.getSegDetails().getWorkingLayouts().stream()
                     .filter(layout -> indexPlan.getLayoutEntity(layout.getLayoutId()) != null
                             && indexPlan.getLayoutEntity(layout.getLayoutId()).isBaseIndex())
                     .mapToLong(NDataLayout::getFileCount).sum();
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java
index 80043d3fd5..b048cd3815 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java
@@ -55,6 +55,8 @@ import org.apache.kylin.rest.response.AggIndexResponse;
 import org.apache.kylin.rest.response.BuildIndexResponse;
 import org.apache.kylin.rest.response.DiffRuleBasedIndexResponse;
 import org.apache.kylin.rest.response.IndexResponse;
+import org.apache.kylin.rest.service.params.IndexPlanParams;
+import org.apache.kylin.rest.service.params.PaginationParams;
 import org.apache.kylin.streaming.manager.StreamingJobManager;
 import org.apache.kylin.streaming.metadata.StreamingJobMeta;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -195,10 +197,24 @@ public class FusionIndexService extends BasicService {
     }
 
     public List<IndexResponse> getIndexes(String project, String modelId, String key, List<IndexEntity.Status> status,
-            String orderBy, Boolean desc, List<IndexEntity.Source> sources, List<Long> ids,
-            List<IndexEntity.Range> range) {
-        List<IndexResponse> indexes = indexPlanService.getIndexes(project, modelId, key, status, orderBy, desc,
-                sources);
+                                          String orderBy, Boolean desc, List<IndexEntity.Source> sources, List<Long> ids,
+                                          List<IndexEntity.Range> range) {
+        return getIndexes(new IndexPlanParams(project, modelId, null, ids, sources, status, range),
+                new PaginationParams(null, null, orderBy, desc),
+                key);
+    }
+
+    public List<IndexResponse> getIndexes(IndexPlanParams indexPlanParams, PaginationParams paginationParams, String key) {
+        String project = indexPlanParams.getProject();
+        String modelId = indexPlanParams.getModelId();
+        List<Long> ids = indexPlanParams.getIds();
+        List<IndexEntity.Source> sources = indexPlanParams.getSources();
+        List<IndexEntity.Status> status = indexPlanParams.getStatus();
+        List<IndexEntity.Range> range = indexPlanParams.getRange();
+        String orderBy = paginationParams.getOrderBy();
+        Boolean desc = paginationParams.getReverse();
+
+        List<IndexResponse> indexes = indexPlanService.getIndexes(indexPlanParams, paginationParams, key);
         NDataModel model = getManager(NDataModelManager.class, project).getDataModelDesc(modelId);
         if (model.isFusionModel()) {
             FusionModel fusionModel = getManager(FusionModelManager.class, project).getFusionModel(modelId);
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
index 1b9e681ef6..767b098dd5 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
@@ -98,6 +98,8 @@ import org.apache.kylin.rest.response.IndexGraphResponse;
 import org.apache.kylin.rest.response.IndexResponse;
 import org.apache.kylin.rest.response.IndexStatResponse;
 import org.apache.kylin.rest.response.TableIndexResponse;
+import org.apache.kylin.rest.service.params.IndexPlanParams;
+import org.apache.kylin.rest.service.params.PaginationParams;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.SpringContext;
 import org.slf4j.Logger;
@@ -651,7 +653,21 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
     }
 
     public List<IndexResponse> getIndexes(String project, String modelId, String key, List<IndexEntity.Status> status,
-            String orderBy, Boolean desc, List<IndexEntity.Source> sources) {
+                                          String orderBy, Boolean desc, List<IndexEntity.Source> sources) {
+        return getIndexes(new IndexPlanParams(project, modelId, null, null, sources, status, null),
+                new PaginationParams(null, null, orderBy, desc),
+                key);
+    }
+
+    public List<IndexResponse> getIndexes(IndexPlanParams indexPlanParams, PaginationParams paginationParams, String key) {
+        String project = indexPlanParams.getProject();
+        String modelId = indexPlanParams.getModelId();
+        String segmentId = indexPlanParams.getSegmentId();
+        List<IndexEntity.Source> sources = indexPlanParams.getSources();
+        List<IndexEntity.Status> status = indexPlanParams.getStatus();
+        String orderBy = paginationParams.getOrderBy();
+        Boolean desc = paginationParams.getReverse();
+
         aclEvaluate.checkProjectReadPermission(project);
         Set<IndexEntity.Status> statusSet = Sets.newHashSet(status);
 
@@ -662,7 +678,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
         Set<Long> layoutsByRunningJobs = getLayoutsByRunningJobs(project, modelId);
         if (StringUtils.isBlank(key)) {
             return sortAndFilterLayouts(layouts.stream()
-                    .map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs))
+                    .map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs, segmentId))
                     .filter(indexResponse -> statusSet.isEmpty() || statusSet.contains(indexResponse.getStatus())),
                     orderBy, desc, sources);
         }
@@ -676,7 +692,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
             return String.valueOf(index.getId()).equals(key.trim())
                     || !Sets.intersection(matchDimensions, colOrderSet).isEmpty()
                     || !Sets.intersection(matchMeasures, colOrderSet).isEmpty();
-        }).map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs))
+        }).map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs, segmentId))
                 .filter(indexResponse -> statusSet.isEmpty() || statusSet.contains(indexResponse.getStatus())), orderBy,
                 desc, sources);
     }
@@ -735,7 +751,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
         for (NDataSegment seg : readySegments) {
             val lockedIndexCountInSeg = seg.getLayoutsMap().values().stream()
                     .filter(nDataLayout -> nDataLayout.getLayout().isToBeDeleted()).count();
-            if ((seg.getSegDetails().getLayouts().size() - lockedIndexCountInSeg) != allIndexCountWithoutTobeDel) {
+            if ((seg.getSegDetails().getAllLayouts().size() - lockedIndexCountInSeg) != allIndexCountWithoutTobeDel) {
                 segmentToComplementCount += 1;
             }
         }
@@ -840,11 +856,11 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
     }
 
     private IndexResponse convertToResponse(LayoutEntity layoutEntity, NDataModel model) {
-        return convertToResponse(layoutEntity, model, Sets.newHashSet());
+        return convertToResponse(layoutEntity, model, Sets.newHashSet(), null);
     }
 
     private IndexResponse convertToResponse(LayoutEntity layoutEntity, NDataModel model,
-            Set<Long> layoutIdsOfRunningJobs) {
+            Set<Long> layoutIdsOfRunningJobs, String segmentId) {
 
         // remove all internal measures
         val colOrders = Lists.newArrayList(layoutEntity.getColOrder());
@@ -862,20 +878,32 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
         val dataflow = dfMgr.getDataflow(layoutEntity.getIndex().getIndexPlan().getUuid());
         long dataSize = 0L;
         int readyCount = 0;
-        for (NDataSegment segment : dataflow.getSegments()) {
-            val dataCuboid = segment.getLayout(layoutEntity.getId());
-            if (dataCuboid == null) {
+        boolean hasDataInconsistent = false;
+
+        List<NDataSegment> segments = StringUtils.isBlank(segmentId) ? dataflow.getSegments()
+                : dataflow.getSegments().stream().filter(seg -> seg.getId().equals(segmentId)).collect(Collectors.toList());
+        for (NDataSegment segment : segments) {
+            NDataLayout layout = segment.getLayout(layoutEntity.getId(), true);
+            if (layout == null) {
+                continue;
+            }
+            if (layout.getAbnormalType() == NDataLayout.AbnormalType.DATA_INCONSISTENT) {
+                hasDataInconsistent = true;
                 continue;
             }
             readyCount++;
-            dataSize += dataCuboid.getByteSize();
+            dataSize += layout.getByteSize();
         }
 
         IndexEntity.Status status;
         if (readyCount <= 0) {
-            status = IndexEntity.Status.NO_BUILD;
             if (layoutIdsOfRunningJobs.contains(layoutEntity.getId())) {
                 status = IndexEntity.Status.BUILDING;
+            } else {
+                status = IndexEntity.Status.NO_BUILD;
+                if (hasDataInconsistent) {
+                    response.setAbnormalType(NDataLayout.AbnormalType.DATA_INCONSISTENT);
+                }
             }
         } else {
             status = IndexEntity.Status.ONLINE;
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index de7d4191f4..9a917fb61c 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -1177,7 +1177,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
             boolean allToComplement, Set<Long> allIndexWithoutTobeDel, NDataSegment segment) {
         if (allToComplement) {
             // find seg that does not have all indexes(don't include tobeDeleted)
-            val segLayoutIds = segment.getSegDetails().getLayouts().stream().map(NDataLayout::getLayoutId)
+            val segLayoutIds = segment.getSegDetails().getWorkingLayouts().stream().map(NDataLayout::getLayoutId)
                     .collect(Collectors.toSet());
             return !Sets.difference(allIndexWithoutTobeDel, segLayoutIds).isEmpty();
         }
@@ -2386,7 +2386,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
             for (String segmentId : segmentIds) {
                 NDataSegment seg = dataflow.getSegment(segmentId);
                 NDataSegDetails segDetails = seg.getSegDetails();
-                List<NDataLayout> layouts = new LinkedList<>(segDetails.getLayouts());
+                List<NDataLayout> layouts = new LinkedList<>(segDetails.getAllLayouts());
                 layouts.removeIf(layout -> indexIds.contains(layout.getLayoutId()));
                 dfManger.updateDataflowDetailsLayouts(seg, layouts);
             }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/IndexPlanParams.java
similarity index 58%
copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
copy to src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/IndexPlanParams.java
index a8cf9dc237..ea917984d8 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/IndexPlanParams.java
@@ -16,21 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.engine.spark.job.stage.build
+package org.apache.kylin.rest.service.params;
 
-import org.apache.kylin.engine.spark.job.SegmentJob
-import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.kylin.metadata.cube.model.IndexEntity;
 
-class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
-  extends BuildStage(jobContext, dataSegment, buildParam) {
+import java.util.List;
 
-  override def execute(): Unit = {
-    // Build layers.
-    buildLayouts()
-    // Drain results immediately after building.
-    drain()
-  }
-
-  override def getStageName: String = "BuildLayer"
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class IndexPlanParams {
+    private String project;
+    private String modelId;
+    private String segmentId;
+    private List<Long> ids;
+    private List<IndexEntity.Source> sources;
+    private List<IndexEntity.Status> status;
+    private List<IndexEntity.Range> range;
 }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/PaginationParams.java
similarity index 58%
copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
copy to src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/PaginationParams.java
index a8cf9dc237..b0b495f3bc 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/PaginationParams.java
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.engine.spark.job.stage.build
+package org.apache.kylin.rest.service.params;
 
-import org.apache.kylin.engine.spark.job.SegmentJob
-import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
-  extends BuildStage(jobContext, dataSegment, buildParam) {
-
-  override def execute(): Unit = {
-    // Build layers.
-    buildLayouts()
-    // Drain results immediately after building.
-    drain()
-  }
-
-  override def getStageName: String = "BuildLayer"
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class PaginationParams {
+    private Integer pageOffset;
+    private Integer pageSize;
+    private String orderBy;
+    private Boolean reverse;
 }
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
index 70936d305e..be0b4814f7 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
@@ -235,14 +235,14 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
     public void testGetReadableProjects() {
         Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
         List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false);
-        Assert.assertEquals(28, projectInstances.size());
+        Assert.assertEquals(29, projectInstances.size());
     }
 
     @Test
     public void testGetAdminProjects() throws Exception {
         Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
         List<ProjectInstance> projectInstances = projectService.getAdminProjects();
-        Assert.assertEquals(28, projectInstances.size());
+        Assert.assertEquals(29, projectInstances.size());
     }
 
     @Test
@@ -256,7 +256,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
     public void testGetReadableProjectsHasNoPermissionProject() {
         Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
         List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false);
-        Assert.assertEquals(28, projectInstances.size());
+        Assert.assertEquals(29, projectInstances.size());
 
     }
 
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
index 2c46e619ea..0ce477be53 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
@@ -364,7 +364,7 @@ public class QueryHistoryServiceTest extends NLocalFileMetadataTestCase {
 
         // get all tables
         tableMap = queryHistoryService.getQueryHistoryTableMap(null);
-        Assert.assertEquals(28, tableMap.size());
+        Assert.assertEquals(29, tableMap.size());
 
         // not existing project
         tableMap = queryHistoryService.getQueryHistoryTableMap(Lists.newArrayList("not_existing_project"));
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
index 93fe27494f..c6b2848523 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
@@ -217,7 +217,7 @@ public class AsyncQueryJobTest extends NLocalFileMetadataTestCase {
                 rawResourceMap.put(zipEntry.getName(), raw);
             }
         }
-        Assert.assertEquals(84, rawResourceMap.size());
+        Assert.assertEquals(85, rawResourceMap.size());
     }
 
     private void testKylinConfig(FileSystem workingFileSystem, FileStatus metaFileStatus) throws IOException {
diff --git a/src/spark-project/engine-spark/pom.xml b/src/spark-project/engine-spark/pom.xml
index b22aefa720..6af33ecf15 100644
--- a/src/spark-project/engine-spark/pom.xml
+++ b/src/spark-project/engine-spark/pom.xml
@@ -177,6 +177,11 @@
             <artifactId>junit-jupiter-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.junit.vintage</groupId>
             <artifactId>junit-vintage-engine</artifactId>
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index cb395e88ba..a7b72aa242 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -35,7 +35,9 @@ import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +73,7 @@ import org.apache.kylin.engine.spark.job.UdfManager;
 import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
+import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
@@ -118,6 +121,7 @@ public abstract class SparkApplication implements Application {
     protected String project;
     protected int layoutSize = -1;
     protected BuildJobInfos infos;
+    protected ConcurrentHashMap<String, Boolean> skipFollowingStagesMap = new ConcurrentHashMap<>();
     /**
      * path for spark app args on HDFS
      */
@@ -215,6 +219,17 @@ public abstract class SparkApplication implements Application {
         return clusterManager.getBuildTrackingUrl(sparkSession);
     }
 
+    public void setSkipFollowingStages(String segmentId) {
+        skipFollowingStagesMap.put(segmentId, true);
+    }
+
+    public boolean isSkipFollowingStages(String segmentId) {
+        if (segmentId == null) {
+            return false;
+        }
+        return Optional.ofNullable(skipFollowingStagesMap.get(segmentId)).orElse(false);
+    }
+
     private String tryReplaceHostAddress(String url) {
         String originHost = null;
         try {
@@ -446,7 +461,7 @@ public abstract class SparkApplication implements Application {
 
     protected void waiteForResourceSuccess() throws Exception {
         val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
-        waiteForResource.onStageFinished(true);
+        waiteForResource.onStageFinished(ExecutableState.SUCCEED);
         infos.recordStageId("");
     }
 
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java
index cd4fcf341e..2d376b51bc 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java
@@ -49,7 +49,7 @@ public class PartitionDictionaryBuilderHelper extends DictionaryBuilderHelper {
                     .filter(partition -> !partition.getStatus().equals(PartitionStatusEnum.READY))
                     .collect(Collectors.toSet());
             if (CollectionUtils.isEmpty(newPartitions)) {
-                for (NDataLayout cuboid : seg.getSegDetails().getLayouts()) {
+                for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) {
                     buildedLayouts.add(cuboid.getLayout());
                 }
             }
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/ExecutableAddCuboidHandler.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/ExecutableAddCuboidHandler.java
index 0f1b68cd2d..43e9e521f5 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/ExecutableAddCuboidHandler.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/ExecutableAddCuboidHandler.java
@@ -18,21 +18,24 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import com.google.common.base.Preconditions;
-import lombok.val;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Set;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.spark.merger.AfterBuildResourceMerger;
 import org.apache.kylin.job.execution.DefaultExecutableOnModel;
 import org.apache.kylin.job.execution.ExecutableHandler;
+import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedHashSet;
-import java.util.Optional;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+
+import lombok.val;
 
 public class ExecutableAddCuboidHandler extends ExecutableHandler {
     private static final Logger logger = LoggerFactory.getLogger(ExecutableAddCuboidHandler.class);
@@ -59,6 +62,28 @@ public class ExecutableAddCuboidHandler extends ExecutableHandler {
                 .filter(task -> ((NSparkExecutable) task).needMergeMetadata())
                 .forEach(task -> ((NSparkExecutable) task).mergerMetadata(merger));
 
+        tryRemoveToBeDeletedLayouts(project, modelId, jobId, executable);
+        markDFStatus();
+    }
+
+    @Override
+    public void handleDiscardOrSuicidal() {
+        val job = getExecutable();
+        // anyTargetSegmentExists && checkCuttingInJobByModel need restart job
+        if (!(job.checkCuttingInJobByModel() && job.checkAnyTargetSegmentAndPartitionExists())) {
+            return;
+        }
+    }
+
+    private void tryRemoveToBeDeletedLayouts(String project, String modelId, String jobId, DefaultExecutableOnModel executable) {
+        if (!(executable instanceof NSparkCubingJob)) {
+            return;
+        }
+        NSparkCubingJob job = (NSparkCubingJob) executable;
+        if (job.getSparkCubingStep().getStatus() != ExecutableState.SUCCEED) {
+            return;
+        }
+
         Optional.ofNullable(executable.getParams()).ifPresent(params -> {
             String toBeDeletedLayoutIdsStr = params.get(NBatchConstants.P_TO_BE_DELETED_LAYOUT_IDS);
             if (StringUtils.isNotBlank(toBeDeletedLayoutIdsStr)) {
@@ -72,16 +97,5 @@ public class ExecutableAddCuboidHandler extends ExecutableHandler {
                         copyForWrite -> copyForWrite.removeLayouts(toBeDeletedLayoutIds, true, true));
             }
         });
-        markDFStatus();
     }
-
-    @Override
-    public void handleDiscardOrSuicidal() {
-        val job = getExecutable();
-        // anyTargetSegmentExists && checkCuttingInJobByModel need restart job
-        if (!(job.checkCuttingInJobByModel() && job.checkAnyTargetSegmentAndPartitionExists())) {
-            return;
-        }
-    }
-
 }
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
index 62ea6ec1e7..8f633509a4 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
@@ -18,23 +18,30 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import com.google.common.collect.Sets;
-import lombok.NoArgsConstructor;
-import lombok.val;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.engine.spark.merger.MetadataMerger;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.execution.StageBase;
 import org.apache.kylin.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.Set;
+import com.google.common.collect.Sets;
+
+import lombok.NoArgsConstructor;
+import lombok.val;
 
 @NoArgsConstructor
 public class NSparkCubingStep extends NSparkExecutable {
@@ -101,6 +108,29 @@ public class NSparkCubingStep extends NSparkExecutable {
         return result;
     }
 
+    @Override
+    protected ExecutableState adjustState(ExecutableState originalState) {
+        if (hasWarningStage()) {
+            return ExecutableState.WARNING;
+        }
+        return super.adjustState(originalState);
+    }
+
+    protected boolean hasWarningStage() {
+        NExecutableManager executableManager = getManager();
+        Map<String, List<StageBase>> stagesMap = getStagesMap();
+        for (Map.Entry<String, List<StageBase>> entry : stagesMap.entrySet()) {
+            String segmentId = entry.getKey();
+            List<StageBase> stages = entry.getValue();
+            boolean hasWarning = stages.stream()
+                    .anyMatch(stage -> executableManager.getOutput(stage.getId(), segmentId).getState() == ExecutableState.WARNING);
+            if (hasWarning) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public static class Mockup {
         public static void main(String[] args) {
             logger.info(Mockup.class + ".main() invoked, args: " + Arrays.toString(args));
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
index f0582567f0..d5efe0722a 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
@@ -110,7 +110,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger {
         theSeg.setStatus(SegmentStatusEnum.READY);
         dfUpdate.setToUpdateSegs(theSeg);
         dfUpdate.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[toRemoveSegments.size()]));
-        dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getLayouts().toArray(new NDataLayout[0]));
+        dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getWorkingLayouts().toArray(new NDataLayout[0]));
 
         localDataflowManager.updateDataflow(dfUpdate);
         updateIndexPlan(flowName, remoteStore);
@@ -147,7 +147,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger {
             }
             remoteSeg.setLastBuildTime(remoteSeg.getSegDetails().getLastModified());
             for (long layoutId : availableLayoutIds) {
-                NDataLayout dataCuboid = remoteSeg.getLayout(layoutId);
+                NDataLayout dataCuboid = remoteSeg.getLayout(layoutId, true);
                 Preconditions.checkNotNull(dataCuboid);
                 addCuboids.add(dataCuboid);
             }
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
index 3935675a82..8aa4073ae7 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
@@ -83,7 +83,7 @@ public class AfterMergeOrRefreshResourceMerger extends SparkJobMetadataMerger {
                 partition.setLastBuildTime(lastBuildTime);
             });
             mergedSegment.setLastBuildTime(lastBuildTime);
-            toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getLayouts()));
+            toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getWorkingLayouts()));
         } else {
             mergedSegment = upsertSegmentPartition(localSegment, remoteSegment, partitions);
             for (val segId : segmentIds) {
@@ -158,7 +158,7 @@ public class AfterMergeOrRefreshResourceMerger extends SparkJobMetadataMerger {
                 mergedSegment.setStatus(SegmentStatusEnum.WARNING);
             }
         }
-        toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getLayouts()));
+        toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getWorkingLayouts()));
 
         update.setToAddOrUpdateLayouts(toUpdateCuboids.toArray(new NDataLayout[0]));
         update.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[0]));
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
index c179c7d947..698de6d745 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
@@ -138,7 +138,7 @@ public class DictionaryBuilderHelper {
 
         List<LayoutEntity> buildedLayouts = Lists.newArrayList();
         if (seg.getSegDetails() != null) {
-            for (NDataLayout cuboid : seg.getSegDetails().getLayouts()) {
+            for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) {
                 buildedLayouts.add(cuboid.getLayout());
             }
         }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
index 4b2f70329a..b5b193c527 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
@@ -24,10 +24,12 @@ import org.apache.spark.application.RetryInfo
 import org.apache.spark.sql.execution.SparkPlan
 
 import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
 
 class BuildJobInfos {
   // BUILD
-  private val seg2cuboidsNumPerLayer: util.Map[String, util.List[Int]] = new util.HashMap[String, util.List[Int]]
+  private val seg2cuboidsNumPerLayer: util.Map[String, AtomicInteger] = new ConcurrentHashMap[String, AtomicInteger]()
 
   private val seg2SpanningTree: java.util.Map[String, NSpanningTree] = new util.HashMap[String, NSpanningTree]
 
@@ -157,22 +159,19 @@ class BuildJobInfos {
   }
 
   def recordCuboidsNumPerLayer(segId: String, num: Int): Unit = {
-    if (seg2cuboidsNumPerLayer.containsKey(segId)) {
-      seg2cuboidsNumPerLayer.get(segId).add(num)
-    } else {
-      val nums = new util.LinkedList[Int]()
-      nums.add(num)
-      seg2cuboidsNumPerLayer.put(segId, nums)
+    var counter = seg2cuboidsNumPerLayer.get(segId)
+    if (counter == null) {
+      seg2cuboidsNumPerLayer.putIfAbsent(segId, new AtomicInteger())
+      counter = seg2cuboidsNumPerLayer.get(segId)
     }
+    counter.addAndGet(num)
   }
 
   def clearCuboidsNumPerLayer(segId: String): Unit = {
-    if (seg2cuboidsNumPerLayer.containsKey(segId)) {
-      seg2cuboidsNumPerLayer.get(segId).clear()
-    }
+    seg2cuboidsNumPerLayer.remove(segId)
   }
 
-  def getSeg2cuboidsNumPerLayer: util.Map[String, util.List[Int]] = {
+  def getSeg2cuboidsNumPerLayer: util.Map[String, AtomicInteger] = {
     seg2cuboidsNumPerLayer
   }
 
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java
index f3bc59837b..7109e0a297 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java
@@ -73,7 +73,7 @@ public class DFMergeJob extends SparkApplication {
         // collect layouts need to merge
         Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist = Maps.newConcurrentMap();
         for (NDataSegment seg : mergingSegments) {
-            for (NDataLayout cuboid : seg.getSegDetails().getLayouts()) {
+            for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) {
                 long layoutId = cuboid.getLayoutId();
 
                 DFLayoutMergeAssist assist = mergeCuboidsAssist.get(layoutId);
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/PartitionExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/PartitionExec.scala
index 887b252c01..f9a1fdf075 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/PartitionExec.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/PartitionExec.scala
@@ -91,6 +91,10 @@ private[job] trait PartitionExec {
     }
     logInfo(s"Segment $segmentId drained layout partition: " + //
       s"${results.asScala.map(lp => s"(${lp.layoutId} ${lp.partitionId})").mkString("[", ",", "]")}")
+
+    val buildJobInfos = KylinBuildEnv.get().buildJobInfos
+    buildJobInfos.recordCuboidsNumPerLayer(segmentId, results.size())
+
     class DFUpdate extends UnitOfWork.Callback[Int] {
       override def process(): Int = {
         // Merge into the newest data segment.
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
index e3c930c883..caf4835827 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
@@ -29,10 +29,14 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
+import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
+import org.apache.kylin.engine.spark.job.SegmentJob;
+import org.apache.kylin.engine.spark.job.SparkJobConstants;
 import org.apache.kylin.engine.spark.job.exec.BuildExec;
 import org.apache.kylin.engine.spark.job.stage.BuildParam;
 import org.apache.kylin.engine.spark.job.stage.StageExec;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
+import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
@@ -88,7 +92,7 @@ public class SegmentBuildJob extends SegmentJob {
             checkDateFormatIfExist(project, dataflowId);
         }
         val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
-        waiteForResource.onStageFinished(true);
+        waiteForResource.onStageFinished(ExecutableState.SUCCEED);
         infos.recordStageId("");
     }
 
@@ -135,8 +139,6 @@ public class SegmentBuildJob extends SegmentJob {
     protected void build() throws IOException {
         Stream<NDataSegment> segmentStream = config.isSegmentParallelBuildEnabled() ? //
                 readOnlySegments.parallelStream() : readOnlySegments.stream();
-        AtomicLong finishedSegmentCount = new AtomicLong(0);
-        val segmentsCount = readOnlySegments.size();
         segmentStream.forEach(seg -> {
             try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig
                     .setAndUnsetThreadLocalConfig(config)) {
@@ -156,14 +158,10 @@ public class SegmentBuildJob extends SegmentJob {
 
                 GATHER_FLAT_TABLE_STATS.createStage(this, seg, buildParam, exec);
                 BUILD_LAYER.createStage(this, seg, buildParam, exec);
+                REFRESH_COLUMN_BYTES.createStage(this, seg, buildParam, exec);
 
                 buildSegment(seg, exec);
 
-                val refreshColumnBytes = REFRESH_COLUMN_BYTES.createStage(this, seg, buildParam, exec);
-                refreshColumnBytes.toWorkWithoutFinally();
-                if (finishedSegmentCount.incrementAndGet() < segmentsCount) {
-                    refreshColumnBytes.onStageFinished(true);
-                }
             } catch (IOException e) {
                 Throwables.propagate(e);
             }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
index e0f635acad..61675ac4a8 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
@@ -18,25 +18,24 @@
 
 package org.apache.kylin.engine.spark.job
 
-import java.util
-import java.util.Objects
-import java.util.concurrent.{BlockingQueue, ForkJoinPool, LinkedBlockingQueue, TimeUnit}
-
 import com.google.common.collect.{Lists, Queues}
-import org.apache.kylin.engine.spark.job.SegmentExec.{LayoutResult, ResultType, SourceStats}
-import org.apache.kylin.engine.spark.job.stage.merge.MergeStage
-import org.apache.kylin.engine.spark.scheduler.JobRuntime
-import org.apache.kylin.metadata.cube.model._
-import org.apache.kylin.metadata.model.NDataModel
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.kylin.common.persistence.transaction.UnitOfWork
 import org.apache.kylin.common.{KapConfig, KylinConfig}
-import org.apache.kylin.metadata.model.TblColRef
+import org.apache.kylin.engine.spark.job.SegmentExec.{LayoutResult, ResultType, SourceStats, filterSuccessfulLayoutResult}
+import org.apache.kylin.engine.spark.job.stage.merge.MergeStage
+import org.apache.kylin.engine.spark.scheduler.JobRuntime
+import org.apache.kylin.metadata.cube.model.NDataLayout.AbnormalType
+import org.apache.kylin.metadata.cube.model._
+import org.apache.kylin.metadata.model.{NDataModel, TblColRef}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.datasource.storage.{StorageListener, StorageStoreFactory, WriteTaskStats}
 import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
 import org.apache.spark.tracker.BuildContext
 
+import java.util
+import java.util.Objects
+import java.util.concurrent.{BlockingQueue, ForkJoinPool, LinkedBlockingQueue, TimeUnit}
 import scala.collection.JavaConverters._
 import scala.collection.parallel.ForkJoinTaskSupport
 
@@ -202,6 +201,9 @@ trait SegmentExec extends Logging {
     logInfo(s"Segment $segmentId drained layouts: " + //
       s"${results.asScala.map(_.layoutId).mkString("[", ",", "]")}")
 
+    val buildJobInfos = KylinBuildEnv.get().buildJobInfos
+    buildJobInfos.recordCuboidsNumPerLayer(segmentId, results.asScala.count(filterSuccessfulLayoutResult))
+
     class DFUpdate extends UnitOfWork.Callback[Int] {
       override def process(): Int = {
 
@@ -227,6 +229,7 @@ trait SegmentExec extends Logging {
           dataLayout.setPartitionValues(taskStats.partitionValues)
           dataLayout.setFileCount(taskStats.numFiles)
           dataLayout.setByteSize(taskStats.numBytes)
+          dataLayout.setAbnormalType(lr.abnormalType)
           dataLayout
         }
         logInfo(s"Segment $segmentId update the data layouts $dataLayouts")
@@ -290,7 +293,13 @@ trait SegmentExec extends Logging {
     val storagePath = NSparkCubingUtil.getStoragePath(segment, layout.getId)
     val taskStats = saveWithStatistics(layout, layoutDS, storagePath, readableDesc, storageListener)
     val sourceStats = newSourceStats(layout, taskStats)
-    pipe.offer(LayoutResult(layout.getId, taskStats, sourceStats))
+    pipe.offer(LayoutResult(layout.getId, taskStats, sourceStats, null))
+  }
+
+  protected final def newEmptyDataLayout(layout: LayoutEntity, abnormalType: AbnormalType): Unit = {
+    val taskStats = WriteTaskStats(0, 0, 0, 0, 0, 0, new util.ArrayList[String]())
+    val sourceStats = SourceStats(0)
+    pipe.offer(LayoutResult(layout.getId, taskStats, sourceStats, abnormalType))
   }
 
   protected def newSourceStats(layout: LayoutEntity, taskStats: WriteTaskStats): SourceStats = {
@@ -417,6 +426,11 @@ object SegmentExec {
 
   case class SourceStats(rows: Long)
 
-  case class LayoutResult(layoutId: java.lang.Long, stats: WriteTaskStats, sourceStats: SourceStats) extends ResultType
+  case class LayoutResult(layoutId: java.lang.Long, stats: WriteTaskStats, sourceStats: SourceStats,
+                          abnormalType: NDataLayout.AbnormalType) extends ResultType
+
+  protected def filterSuccessfulLayoutResult(layoutResult: LayoutResult): Boolean = {
+    layoutResult.abnormalType == null
+  }
 
 }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentJob.java
index 28174f8d4f..0fd435bea0 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentJob.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentJob.java
@@ -247,4 +247,8 @@ public abstract class SegmentJob extends SparkApplication {
     public BuildContext getBuildContext() {
         return buildContext;
     }
+
+    public IndexPlan getIndexPlan() {
+        return indexPlan;
+    }
 }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentMergeJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentMergeJob.java
index 9bb9e12999..9660cf4102 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentMergeJob.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentMergeJob.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.spark.job.exec.MergeExec;
 import org.apache.kylin.engine.spark.job.stage.BuildParam;
+import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.spark.tracker.BuildContext;
 
@@ -72,7 +73,7 @@ public class SegmentMergeJob extends SegmentJob {
                 mergeColumnBytes.toWorkWithoutFinally();
 
                 if (finishedSegmentCount.incrementAndGet() < segmentsCount) {
-                    mergeColumnBytes.onStageFinished(true);
+                    mergeColumnBytes.onStageFinished(ExecutableState.SUCCEED);
                 }
             } catch (IOException e) {
                 Throwables.propagate(e);
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
index 7f99feff77..8eca14faca 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.spark.job.stage
 
 import com.google.common.base.Throwables
+import io.kyligence.kap.guava20.shaded.common.util.concurrent.RateLimiter
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.JsonUtil
 import org.apache.kylin.job.execution.ExecutableState
@@ -44,98 +45,74 @@ trait StageExec extends Logging {
 
   def execute(): Unit
 
-  def onStageStart(): Unit = {
-    val taskId = getId
-    val segmentId = getSegmentId
-    val project = getJobContext.getProject
-    val status = ExecutableState.RUNNING.toString
-    val errMsg = null
-    val updateInfo: util.HashMap[String, String] = null
-
-    updateStageInfo(taskId, segmentId, project, status, errMsg, updateInfo)
-  }
-
-  def updateStageInfo(taskId: String, segmentId: String, project: String, status: String,
-                      errMsg: String, updateInfo: util.HashMap[String, String]): Unit = {
-    val context = getJobContext
-
-    val url = "/kylin/api/jobs/stage/status"
-
-    val payload: util.HashMap[String, Object] = new util.HashMap[String, Object](6)
-    payload.put("task_id", taskId)
-    payload.put("segment_id", segmentId)
-    payload.put("project", project)
-    payload.put("status", status)
-    payload.put("err_msg", errMsg)
-    payload.put("update_info", updateInfo)
-    val json = JsonUtil.writeValueAsString(payload)
-    val params = new util.HashMap[String, String]()
-    val config = KylinConfig.getInstanceFromEnv
-    params.put(ParamsConstants.TIME_OUT, config.getUpdateJobInfoTimeout.toString)
-    params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true))
-    context.getReport.updateSparkJobInfo(params, url, json)
+  def createRateLimiter(permitsPerSecond: Double = 0.1): RateLimiter = {
+    RateLimiter.create(permitsPerSecond)
   }
 
-  def onStageFinished(result: Boolean): Unit = {
-    val taskId = getId
-    val segmentId = getSegmentId
-    val project = getJobContext.getProject
-    val status = if (result) ExecutableState.SUCCEED.toString else ExecutableState.ERROR.toString
-    val errMsg = null
-    val updateInfo: util.HashMap[String, String] = null
-
-    updateStageInfo(taskId, segmentId, project, status, errMsg, updateInfo)
+  def onStageStart(): Unit = {
+    if (getJobContext.isSkipFollowingStages(getSegmentId)) {
+      return
+    }
+    updateStageInfo(ExecutableState.RUNNING.toString, null, null)
   }
 
-  def onBuildLayoutSuccess(layoutCount: Int): Unit = {
-    val taskId = getId
-    val segmentId = getSegmentId
-    val project = getJobContext.getProject
-    val status = null
-    val errMsg = null
-    val updateInfo: util.HashMap[String, String] = new util.HashMap[String, String]
-    updateInfo.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, String.valueOf(layoutCount))
-
-    updateStageInfo(taskId, segmentId, project, status, errMsg, updateInfo)
+  def onStageFinished(state: ExecutableState = ExecutableState.SUCCEED): Unit = {
+    updateStageInfo(state.toString, null, null)
   }
 
   def onStageSkipped(): Unit = {
-    val taskId = getId
-    val segmentId = getSegmentId
-    val project = getJobContext.getProject
-    val status = ExecutableState.SKIP.toString
-    val errMsg = null
-    val updateInfo: util.HashMap[String, String] = null
-
-    updateStageInfo(taskId, segmentId, project, status, errMsg, updateInfo)
+    updateStageInfo(ExecutableState.SKIP.toString, null, null)
   }
 
   def toWork(): Unit = {
-    onStageStart()
-    var result: Boolean = false
-    try {
-      execute()
-      result = true
-    } catch {
-      case throwable: Throwable =>
-        KylinBuildEnv.get().buildJobInfos.recordSegmentId(getSegmentId)
-        KylinBuildEnv.get().buildJobInfos.recordStageId(getId)
-        Throwables.propagate(throwable)
-    } finally onStageFinished(result)
+    toWork0()
   }
 
   def toWorkWithoutFinally(): Unit = {
+    toWork0(false)
+  }
+
+  def toWork0(doFinally: Boolean = true): Unit = {
     onStageStart()
+    var state: ExecutableState = ExecutableState.SUCCEED
     try {
+      if (getJobContext.isSkipFollowingStages(getSegmentId)) {
+        state = ExecutableState.SKIP
+        return
+      }
       execute()
     } catch {
       case throwable: Throwable =>
+        state = ExecutableState.ERROR
         KylinBuildEnv.get().buildJobInfos.recordSegmentId(getSegmentId)
         KylinBuildEnv.get().buildJobInfos.recordStageId(getId)
         Throwables.propagate(throwable)
+    } finally {
+      if (doFinally) {
+        onStageFinished(state)
+      }
     }
   }
 
+  def updateStageInfo(status: String, errMsg: String, updateInfo: util.Map[String, String]): Unit = {
+    val context = getJobContext
+
+    val url = "/kylin/api/jobs/stage/status"
+
+    val payload: util.HashMap[String, Object] = new util.HashMap[String, Object](6)
+    payload.put("task_id", getId)
+    payload.put("segment_id", getSegmentId)
+    payload.put("project", context.getProject)
+    payload.put("status", status)
+    payload.put("err_msg", errMsg)
+    payload.put("update_info", updateInfo)
+    val json = JsonUtil.writeValueAsString(payload)
+    val params = new util.HashMap[String, String]()
+    val config = KylinConfig.getInstanceFromEnv
+    params.put(ParamsConstants.TIME_OUT, config.getUpdateJobInfoTimeout.toString)
+    params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(context.getProject, true))
+    context.getReport.updateSparkJobInfo(params, url, json)
+  }
 
   def setId(id: String): Unit = {
     this.id = id
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
index a8cf9dc237..11e71d081a 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
@@ -18,14 +18,22 @@
 
 package org.apache.kylin.engine.spark.job.stage.build
 
-import org.apache.kylin.engine.spark.job.SegmentJob
+import io.kyligence.kap.guava20.shaded.common.util.concurrent.RateLimiter
 import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.engine.spark.job.{KylinBuildEnv, SegmentJob}
+import org.apache.kylin.metadata.cube.model.{NBatchConstants, NDataSegment}
+
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
 
 class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
   extends BuildStage(jobContext, dataSegment, buildParam) {
 
+  private val rateLimiter: RateLimiter = createRateLimiter()
+
   override def execute(): Unit = {
+    // Start an independent thread doing drain at a fixed rate
+    scheduleCheckpoint()
     // Build layers.
     buildLayouts()
     // Drain results immediately after building.
@@ -33,4 +41,15 @@ class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam:
   }
 
   override def getStageName: String = "BuildLayer"
+
+  override protected def drain(timeout: Long, unit: TimeUnit): Unit = {
+    super.drain(timeout, unit)
+
+    val buildJobInfos = KylinBuildEnv.get().buildJobInfos
+    val layoutCount = buildJobInfos.getSeg2cuboidsNumPerLayer.get(segmentId)
+    if (rateLimiter.tryAcquire() && layoutCount != null) {
+      updateStageInfo(null, null, mapAsJavaMap(Map(NBatchConstants.P_INDEX_SUCCESS_COUNT ->
+        String.valueOf(layoutCount.get()))))
+    }
+  }
 }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
index 414335d9fa..dba42cca96 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
@@ -148,12 +148,6 @@ abstract class BuildStage(private val jobContext: SegmentJob,
 
   override protected def recordTaskInfo(t: Task): Unit = {
     logInfo(s"Segment $segmentId submit task: ${t.getTaskDesc}")
-    KylinBuildEnv.get().buildJobInfos.recordCuboidsNumPerLayer(segmentId, 1)
-  }
-
-  override protected def reportTaskProgress(): Unit = {
-    val layoutCount = KylinBuildEnv.get().buildJobInfos.getSeg2cuboidsNumPerLayer.get(segmentId).asScala.sum
-    onBuildLayoutSuccess(layoutCount)
   }
 
   protected def buildStatistics(): Statistics = {
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
index 6668593a42..a5faa5374e 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
@@ -28,8 +28,6 @@ class GatherFlatTableStats(jobContext: SegmentJob, dataSegment: NDataSegment, bu
   extends BuildStage(jobContext, dataSegment, buildParam) {
 
   override def execute(): Unit = {
-    scheduleCheckpoint()
-
     // Build flat table?
     if (buildParam.getSpanningTree.fromFlatTable()) {
       // Collect statistics for flat table.
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
index ba62f0bfd9..823079a59b 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
@@ -18,26 +18,153 @@
 
 package org.apache.kylin.engine.spark.job.stage.build
 
-import org.apache.kylin.engine.spark.job.SegmentJob
+import com.google.common.collect.{Maps, Queues}
 import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.engine.spark.job.stage.build.GenerateFlatTable._
+import org.apache.kylin.engine.spark.job.{SanityChecker, SegmentJob}
+import org.apache.kylin.job.execution.ExecutableState
+import org.apache.kylin.metadata.cube.model._
+import org.apache.spark.sql.datasource.storage.StorageStoreUtils
 import org.apache.spark.sql.{Dataset, Row}
 
+import scala.collection.JavaConverters._
+
 class GenerateFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
   extends FlatTableAndDictBase(jobContext, dataSegment, buildParam) {
 
-  override def execute(): Unit = {
-    val flatTable: Dataset[Row] = generateFlatTable()
-    buildParam.setFlatTable(flatTable)
-    val flatTablePart: Dataset[Row] = generateFlatTablePart()
-    buildParam.setFlatTablePart(flatTablePart)
+  private var dataCountCheckGood: Boolean = true
 
+  override def execute(): Unit = {
+    buildParam.setFlatTable(generateFlatTable())
+    buildParam.setFlatTablePart(generateFlatTablePart())
     buildParam.setBuildFlatTable(this)
 
-    if (buildParam.isSkipGenerateFlatTable) {
-      onStageSkipped()
+    if (!checkDataCountPass()) {
+      drainEmptyLayoutOnDataCountCheckFailed()
+      return
     }
   }
 
   override def getStageName: String = "GenerateFlatTable"
+
+  override def onStageFinished(state: ExecutableState): Unit = {
+    if (dataCountCheckGood) {
+      super.onStageFinished(state)
+    } else {
+      updateStageInfo(ExecutableState.WARNING.toString, null,
+        Maps.newHashMap(Map(NBatchConstants.P_WARNING_CODE -> NDataLayout.AbnormalType.DATA_INCONSISTENT.name()).asJava))
+    }
+  }
+
+  private def checkDataCountPass(): Boolean = {
+    if (jobContext.getConfig.isDataCountCheckEnabled) {
+      logInfo(s"segment ${dataSegment.getId} dataCountCheck is enabled")
+      val result = checkDataCount()
+      logInfo(s"segment ${dataSegment.getId} dataCountCheck result: ${result}")
+      return result
+    }
+    logInfo(s"segment ${dataSegment.getId} dataCountCheck is not enabled")
+    true
+  }
+
+  private def drainEmptyLayoutOnDataCountCheckFailed(): Unit = {
+    dataCountCheckGood = false
+    jobContext.setSkipFollowingStages(dataSegment.getId)
+    readOnlyLayouts.asScala.foreach(newEmptyDataLayout(_, NDataLayout.AbnormalType.DATA_INCONSISTENT))
+    drain()
+  }
+
+  /**
+   * Data count check for segment layouts before index building, including 2 checks:
+   * <p>Check1: Equality check for sum value of count among existing layouts
+   * (not the layouts that are under construction)
+   * <p>Check2: Equality check for sum value of count between layouts above and flat table
+   *
+   * <p>Special case for check1 is to allow un-equality count sum between agg layouts and table layouts
+   * when <tt>kylin.build.allow-non-strict-count-check</tt> enabled, and should not be used in most cases.
+   *
+   * <p>Check2 will not execute when all the new layouts can be covered by existing layouts in index building.
+   *
+   * <p>Count measure will be used in calculating sum value of count for agg layouts,
+   * and for table layouts simply counting data row numbers.
+   *
+   * @return <tt>true</tt> if data count check pass, <tt>false</tt> otherwise
+   */
+  private def checkDataCount(): Boolean = {
+    val layouts = dataSegment.getSegDetails.getWorkingLayouts.asScala.map(lay => jobContext.getIndexPlan.getLayoutEntity(lay.getLayoutId))
+    val tasks = layouts.map(layout => new DataCountCheckTask(layout, StorageStoreUtils.toDF(dataSegment, layout, sparkSession)))
+    val resultsQueue = Queues.newLinkedBlockingQueue[DataCountCheckResult]()
+
+    if (layouts.isEmpty) return true
+
+    // Start calculating data count
+    slowStartExec(tasks.iterator, (task: DataCountCheckTask) => {
+      val layout = task.layout
+      val ds = task.ds
+      resultsQueue.offer(new DataCountCheckResult(layout, SanityChecker.getCount(ds, layout)))
+    })
+
+    val reduceFor: (DataCountCheckResult => Boolean) => Long = layoutCountReducer(resultsQueue.asScala)
+    val aggLayoutCount = reduceFor(result => !IndexEntity.isTableIndex(result.layout.getId))
+    val tableLayoutCount = reduceFor(result => IndexEntity.isTableIndex(result.layout.getId))
+
+    // All agg layouts count or table layouts count must be same
+    if (isInvalidCount(aggLayoutCount) || isInvalidCount(tableLayoutCount)) {
+      logWarning(s"segment ${dataSegment.getId} dataCountCheck check1 failed, " +
+        s"count number in agg layouts or table layouts are not same, " +
+        s"agg layouts count: ${aggLayoutCount}, table layouts count: ${tableLayoutCount}")
+      return false
+    }
+    // Count number between agg layout and table layout should be same when non-strict count check is not enabled
+    if (bothLayoutsExist(aggLayoutCount, tableLayoutCount)
+      && (aggLayoutCount != tableLayoutCount && !config.isNonStrictCountCheckAllowed)) {
+      logWarning(s"segment ${dataSegment.getId} dataCountCheck check1 failed, " +
+        s"count number between agg layouts and table layouts are not same, " +
+        s"agg layouts count: ${aggLayoutCount}, table layouts count: ${tableLayoutCount}")
+      return false
+    }
+    // Agg layout count equals table layout count, comparing with flat table count or simply return true
+    val layoutCount = if (isLayoutExists(aggLayoutCount)) aggLayoutCount else tableLayoutCount
+    if (isLayoutExists(layoutCount) && buildParam.getSpanningTree.fromFlatTable) {
+      val flatTableCount = buildParam.getFlatTable.count
+      val check2Result = layoutCount == flatTableCount
+      if (!check2Result) {
+        logWarning(s"segment ${dataSegment.getId} dataCountCheck check2 failed, " +
+        s"layouts count: ${layoutCount}, flat table count: ${flatTableCount}")
+      }
+      check2Result
+    } else {
+      true
+    }
+  }
+
+  private def layoutCountReducer(results: Iterable[DataCountCheckResult])(filter: DataCountCheckResult => Boolean): Long = {
+    results.filter(filter)
+      .map(_.count)
+      .reduceOption((prev, cur) => if (prev == cur) cur else InvalidCountFlag)
+      .getOrElse(LayoutNonExistsFlag)
+  }
+
+  sealed class DataCountCheckTask(val layout: LayoutEntity, val ds: Dataset[Row]) extends Task {
+    override def getTaskDesc: String = s"layout ${layout.getId} data count check"
+  }
+
+  sealed class DataCountCheckResult(val layout: LayoutEntity, val count: Long)
+}
+
+object GenerateFlatTable {
+  val InvalidCountFlag: Long = SanityChecker.SKIP_FLAG
+  val LayoutNonExistsFlag: Long = -2L
+
+  def isInvalidCount(count: Long): Boolean = {
+    count == InvalidCountFlag
+  }
+
+  def isLayoutExists(count: Long): Boolean = {
+    count > LayoutNonExistsFlag
+  }
+
+  def bothLayoutsExist(aggLayoutCount: Long, tableLayoutCount: Long): Boolean = {
+    aggLayoutCount > LayoutNonExistsFlag && tableLayoutCount > LayoutNonExistsFlag
+  }
 }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
index 995bb588cc..621424403a 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
@@ -18,12 +18,19 @@
 
 package org.apache.kylin.engine.spark.job.stage.build.partition
 
-import org.apache.kylin.engine.spark.job.SegmentJob
+import io.kyligence.kap.guava20.shaded.common.util.concurrent.RateLimiter
 import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.engine.spark.job.{KylinBuildEnv, SegmentJob}
+import org.apache.kylin.metadata.cube.model.{NBatchConstants, NDataSegment}
+
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
 
 class PartitionBuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
   extends PartitionBuildStage(jobContext, dataSegment, buildParam) {
+
+  private val rateLimiter: RateLimiter = createRateLimiter()
+
   override def execute(): Unit = {
     // Build layers.
     buildLayouts()
@@ -32,4 +39,16 @@ class PartitionBuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, bui
   }
 
   override def getStageName: String = "PartitionBuildLayer"
+
+  override protected def drain(timeout: Long, unit: TimeUnit): Unit = {
+    super.drain(timeout, unit)
+
+    val buildJobInfos = KylinBuildEnv.get().buildJobInfos
+    val layoutCountTotal = buildJobInfos.getSeg2cuboidsNumPerLayer.get(segmentId)
+    if (rateLimiter.tryAcquire() && layoutCountTotal != null) {
+      val layoutCount = layoutCountTotal.get() / partitions.size()
+      updateStageInfo(null, null, mapAsJavaMap(Map(NBatchConstants.P_INDEX_SUCCESS_COUNT ->
+        String.valueOf(layoutCount))))
+    }
+  }
 }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildStage.scala
index 331151ecae..6d2bb42355 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildStage.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildStage.scala
@@ -67,12 +67,6 @@ abstract class PartitionBuildStage(jobContext: SegmentJob, dataSegment: NDataSeg
 
   override protected def columnIdFunc(colRef: TblColRef): String = flatTableDesc.getColumnIdAsString(colRef)
 
-
-  override protected def reportTaskProgress(): Unit = {
-    val layoutCount = KylinBuildEnv.get().buildJobInfos.getSeg2cuboidsNumPerLayer.get(segmentId).asScala.sum
-    onBuildLayoutSuccess(layoutCount / partitions.size())
-  }
-
   override protected def buildLayouts(): Unit = {
 
     val taskIter = new BuildTaskIterator[PartitionBuildTask] {
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala
index 0ac0a14701..54aebff88c 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala
@@ -78,7 +78,7 @@ abstract class MergeStage(private val jobContext: SegmentJob,
     // Cleanup previous potentially left temp layout data.
     cleanupLayoutTempData(dataSegment, jobContext.getReadOnlyLayouts.asScala.toSeq)
 
-    val tasks = unmerged.flatMap(segment => segment.getSegDetails.getLayouts.asScala) //
+    val tasks = unmerged.flatMap(segment => segment.getSegDetails.getWorkingLayouts.asScala) //
       .groupBy(_.getLayoutId).values.map(LayoutMergeTask)
     slowStartExec(tasks.iterator, mergeLayout)
   }
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala
index b31f25049c..b932331a8a 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala
@@ -50,7 +50,7 @@ abstract class PartitionMergeStage(private val jobContext: SegmentJob,
 
   override protected def mergeIndices(): Unit = {
     val tasks = unmerged.flatMap(segment =>
-      segment.getSegDetails.getLayouts.asScala.flatMap(layout =>
+      segment.getSegDetails.getWorkingLayouts.asScala.flatMap(layout =>
         layout.getMultiPartition.asScala.map(partition => (layout, partition))
       )).groupBy(tp => (tp._1.getLayoutId, tp._2.getPartitionId)).values.map(PartitionMergeTask)
     slowStartExec(tasks.iterator, mergePartition)
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTableWithSparkSessionTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTableWithSparkSessionTest.java
new file mode 100644
index 0000000000..8099dc7d4d
--- /dev/null
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTableWithSparkSessionTest.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job.stage.build;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KapConfig;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.spark.IndexDataConstructor;
+import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.engine.spark.job.NSparkCubingJob;
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.cube.model.IndexPlan;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.datasource.storage.StorageStore;
+import org.apache.spark.sql.datasource.storage.StorageStoreFactory;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import io.kyligence.kap.guava20.shaded.common.collect.Sets;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@SuppressWarnings("unchecked")
+public class GenerateFlatTableWithSparkSessionTest extends NLocalWithSparkSessionTest {
+
+    @Getter
+    @AllArgsConstructor
+    static class ColumnStruct {
+        private Long rowId;
+        private String rowName;
+        private DataType type;
+    }
+
+    private static final LinkedHashMap<Long, ColumnStruct> DIM_SCHEMAS = new LinkedHashMap<>();
+    private static final LinkedHashMap<Long, ColumnStruct> MEASURE_SCHEMAS = new LinkedHashMap<>();
+    static {
+        DIM_SCHEMAS.put(0L, new ColumnStruct(0L, "LO_ORDERKEY", DataTypes.LongType));
+        DIM_SCHEMAS.put(13L, new ColumnStruct(13L, "LO_QUANTITY", DataTypes.LongType));
+        DIM_SCHEMAS.put(16L, new ColumnStruct(16L, "LO_CUSTKEY", DataTypes.LongType));
+        DIM_SCHEMAS.put(22L, new ColumnStruct(22L, "C_NAME", DataTypes.StringType));
+        DIM_SCHEMAS.put(24L, new ColumnStruct(24L, "C_CUSTKEY", DataTypes.LongType));
+
+        MEASURE_SCHEMAS.put(100000L, new ColumnStruct(100000L, "COUNT_ALL", DataTypes.LongType));
+        MEASURE_SCHEMAS.put(100001L, new ColumnStruct(100001L, "SUM_LINEORDER_LO_QUANTITY", DataTypes.LongType));
+    }
+
+    private static final String MODEL_ID = "3ec47efc-573a-9304-4405-8e05ae184322";
+    private static final String SEGMENT_ID = "b2f206e1-7a15-c94a-20f5-f608d550ead6";
+    private static final long LAYOUT_ID_TO_BUILD = 20001L;
+
+    private KylinConfig config;
+    private NIndexPlanManager indexPlanManager;
+    private NDataflowManager dataflowManager;
+
+    @Override
+    public String getProject() {
+        return "index_build_test";
+    }
+
+    @BeforeAll
+    public static void beforeClass() {
+        NLocalWithSparkSessionTest.beforeClass();
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        NLocalWithSparkSessionTest.afterClass();
+    }
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+
+        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
+        overwriteSystemProp("kylin.engine.persist-flattable-threshold", "0");
+        overwriteSystemProp("kylin.engine.persist-flatview", "true");
+
+        NDefaultScheduler.destroyInstance();
+        NDefaultScheduler scheduler = NDefaultScheduler.getInstance(getProject());
+        scheduler.init(new JobEngineConfig(getTestConfig()));
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+
+        config = getTestConfig();
+        indexPlanManager = NIndexPlanManager.getInstance(config, getProject());
+        dataflowManager = NDataflowManager.getInstance(config, getProject());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        super.tearDown();
+        NDefaultScheduler.destroyInstance();
+    }
+
+    private Object[] initDataCountCheckTest() {
+        // Enable data count check
+        overwriteSystemProp("kylin.build.data-count-check-enabled", "true");
+        // Index Plan
+        IndexPlan indexPlan = indexPlanManager.getIndexPlan(MODEL_ID);
+        // Segment
+        NDataflow dataflow = dataflowManager.getDataflow(MODEL_ID);
+        NDataSegment segment = dataflow.getSegment(SEGMENT_ID);
+        // Layouts to build
+        Set<Long> layoutIDs = Sets.newHashSet(LAYOUT_ID_TO_BUILD);
+        Set<LayoutEntity> readOnlyLayouts = Collections.unmodifiableSet(NSparkCubingUtil.toLayouts(indexPlan, layoutIDs));
+
+        return new Object[] {indexPlan, dataflow, segment, readOnlyLayouts};
+    }
+
+    @Test
+    void testCheckDataCount_Good() throws Exception {
+        Object[] objs = initDataCountCheckTest();
+        IndexPlan indexPlan = (IndexPlan) objs[0];
+        NDataflow dataflow = (NDataflow) objs[1];
+        NDataSegment segment = (NDataSegment) objs[2];
+        Set<LayoutEntity> readOnlyLayouts = (Set<LayoutEntity>) objs[3];
+
+        // Prepare flat table parquet
+        prepareFlatTableParquet(dataflow, segment, false);
+        // Prepare already existing layouts' parquet
+        prepareLayoutsParquet(indexPlan, segment, false, false, false);
+        // Execute job
+        ExecutableState state = executeJob(segment, readOnlyLayouts);
+
+        // Verify
+        assertEquals(ExecutableState.SUCCEED, state);
+        NDataflow dataflowForVerify = dataflowManager.getDataflow(MODEL_ID);
+        NDataSegment segmentForVerify = dataflowForVerify.getSegment(SEGMENT_ID);
+        NDataLayout layoutForVerify = segmentForVerify.getLayout(LAYOUT_ID_TO_BUILD, true);
+        assertNotNull(layoutForVerify);
+        assertNull(layoutForVerify.getAbnormalType());
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "1,10001",
+            "20000000001,20000010001",
+            "1,10001,20000000001,20000010001"
+    })
+    void testCheckDataCount_Good_WithNonExistingLayouts(String nonExistingLayouts) throws Exception {
+        // Enable data count check
+        overwriteSystemProp("kylin.build.data-count-check-enabled", "true");
+
+        Set<Long> nonExistingLayoutsSet = Arrays.stream(nonExistingLayouts.split(",")).map(Long::parseLong)
+                .collect(Collectors.toSet());
+
+        IndexPlan indexPlan = indexPlanManager.getIndexPlan(MODEL_ID);
+        // Layouts to build
+        Set<Long> layoutIDs = Sets.newHashSet(LAYOUT_ID_TO_BUILD);
+        Set<LayoutEntity> readOnlyLayouts = Collections.unmodifiableSet(NSparkCubingUtil.toLayouts(indexPlan, layoutIDs));
+        // Remove all layouts for test
+        indexPlanManager.updateIndexPlan(MODEL_ID, copyForWrite -> {
+            copyForWrite.removeLayouts(nonExistingLayoutsSet, true, true);
+        });
+        dataflowManager.reloadAll();
+        NDataflow dataflow = dataflowManager.getDataflow(MODEL_ID);
+        NDataSegment segment = dataflow.getSegment(SEGMENT_ID);
+
+        // Prepare flat table parquet
+        prepareFlatTableParquet(dataflow, segment, false);
+        // Prepare already existing layouts' parquet
+        prepareLayoutsParquet(indexPlan, segment, false, false, false,
+                nonExistingLayoutsSet);
+        // Execute job
+        ExecutableState state = executeJob(segment, readOnlyLayouts);
+
+        // Verify
+        assertEquals(ExecutableState.SUCCEED, state);
+        NDataflow dataflowForVerify = dataflowManager.getDataflow(MODEL_ID);
+        NDataSegment segmentForVerify = dataflowForVerify.getSegment(SEGMENT_ID);
+        NDataLayout layoutForVerify = segmentForVerify.getLayout(LAYOUT_ID_TO_BUILD, true);
+        assertNotNull(layoutForVerify);
+        assertNull(layoutForVerify.getAbnormalType());
+    }
+
+    @Test
+    void testCheckDataCount_Good_WithNonStrictCountCheckEnabled() throws Exception {
+        Object[] objs = initDataCountCheckTest();
+        IndexPlan indexPlan = (IndexPlan) objs[0];
+        NDataflow dataflow = (NDataflow) objs[1];
+        NDataSegment segment = (NDataSegment) objs[2];
+        Set<LayoutEntity> readOnlyLayouts = (Set<LayoutEntity>) objs[3];
+
+        overwriteSystemProp("kylin.build.allow-non-strict-count-check", "true");
+
+        // Prepare flat table parquet
+        prepareFlatTableParquet(dataflow, segment, false);
+        // Prepare already existing layouts' parquet
+        prepareLayoutsParquet(indexPlan, segment, false, false, true);
+        // Execute job
+        ExecutableState state = executeJob(segment, readOnlyLayouts);
+
+        // Verify
+        assertEquals(ExecutableState.SUCCEED, state);
+        NDataflow dataflowForVerify = dataflowManager.getDataflow(MODEL_ID);
+        NDataSegment segmentForVerify = dataflowForVerify.getSegment(SEGMENT_ID);
+        NDataLayout layoutForVerify = segmentForVerify.getLayout(LAYOUT_ID_TO_BUILD, true);
+        assertNotNull(layoutForVerify);
+        assertNull(layoutForVerify.getAbnormalType());
+    }
+
+    @ParameterizedTest
+    @CsvSource({
+            "true, false, false",
+            "false, true, false",
+            "false, false, true"
+    })
+    void testCheckDataCount_Failed(boolean loseOneAggLayoutRecord,
+                                   boolean loseOneTableLayoutRecord,
+                                   boolean loseTowTableLayoutRecords) throws Exception {
+        Object[] objs = initDataCountCheckTest();
+        IndexPlan indexPlan = (IndexPlan) objs[0];
+        NDataflow dataflow = (NDataflow) objs[1];
+        NDataSegment segment = (NDataSegment) objs[2];
+        Set<LayoutEntity> readOnlyLayouts = (Set<LayoutEntity>) objs[3];
+
+        // Prepare flat table parquet
+        prepareFlatTableParquet(dataflow, segment, false);
+        // Prepare already existing layouts' parquet
+        prepareLayoutsParquet(indexPlan, segment, loseOneAggLayoutRecord, loseOneTableLayoutRecord, loseTowTableLayoutRecords);
+        // Execute job
+        ExecutableState state = executeJob(segment, readOnlyLayouts);
+
+        // Verify
+        assertEquals(ExecutableState.SUCCEED, state);
+        NDataflow dataflowForVerify = dataflowManager.getDataflow(MODEL_ID);
+        NDataSegment segmentForVerify = dataflowForVerify.getSegment(SEGMENT_ID);
+        NDataLayout layoutForVerify = segmentForVerify.getLayout(LAYOUT_ID_TO_BUILD, true);
+        assertNotNull(layoutForVerify);
+        assertEquals(NDataLayout.AbnormalType.DATA_INCONSISTENT, layoutForVerify.getAbnormalType());
+    }
+
+    private void prepareFlatTableParquet(NDataflow dataflow, NDataSegment segment, boolean loseOneRecordForTest) {
+        List<StructField> fields = new ArrayList<>();
+        for (ColumnStruct cs : DIM_SCHEMAS.values()) {
+            StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+            fields.add(field);
+        }
+        StructType schema = DataTypes.createStructType(fields);
+
+        List<Row> rows = new ArrayList<>();
+        rows.add(RowFactory.create(1L, 5L, 1L, "Customer#000000001", 1L));
+        rows.add(RowFactory.create(2L, 15L, 1L, "Customer#000000001", 1L));
+        rows.add(RowFactory.create(3L, 5L, 2L, "Customer#000000002", 2L));
+        rows.add(RowFactory.create(4L, 15L, 3L, "Customer#000000003", 3L));
+        rows.add(RowFactory.create(5L, 5L, 5L, "Customer#000000005", 5L));
+        if (!loseOneRecordForTest) {
+            rows.add(RowFactory.create(6L, 15L, 5L, "Customer#000000005", 5L));
+        }
+        Dataset<Row> flatTableDS = ss.createDataFrame(rows, schema);
+
+        ss.sessionState().conf().setLocalProperty("spark.sql.sources.repartitionWritingDataSource", "true");
+        flatTableDS.write().mode(SaveMode.Overwrite).parquet(config.getFlatTableDir(getProject(), dataflow.getId(), segment.getId()).toString());
+    }
+
+    private void prepareLayoutsParquet(IndexPlan indexPlan, NDataSegment segment,
+                                       boolean loseOneAggLayoutRecordForTest,
+                                       boolean loseOneTableLayoutRecordForTest,
+                                       boolean loseTwoTableLayoutRecordsForTest) {
+        prepareLayoutsParquet(indexPlan, segment, loseOneAggLayoutRecordForTest, loseOneTableLayoutRecordForTest,
+                loseTwoTableLayoutRecordsForTest, null);
+    }
+
+    private void prepareLayoutsParquet(IndexPlan indexPlan, NDataSegment segment,
+                                       boolean loseOneAggLayoutRecordForTest,
+                                       boolean loseOneTableLayoutRecordForTest,
+                                       boolean loseTwoTableLayoutRecordsForTest,
+                                       Set<Long> nonExistingLayouts) {
+        StorageStore store = StorageStoreFactory.create(1);
+
+        // Prepare layout 1
+        {
+            List<StructField> fields = new ArrayList<>();
+            for (ColumnStruct cs : toColumnStructs("0 13 16 24 22 100000")) {
+                StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+                fields.add(field);
+            }
+            StructType schema = DataTypes.createStructType(fields);
+
+            List<Row> rows = new ArrayList<>();
+            rows.add(RowFactory.create(1L, 5L, 1L, 1L, "Customer#000000001", 1L));
+            rows.add(RowFactory.create(2L, 15L, 1L, 1L, "Customer#000000001", 1L));
+            rows.add(RowFactory.create(3L, 5L, 2L, 2L, "Customer#000000002", 1L));
+            rows.add(RowFactory.create(4L, 15L, 3L, 3L, "Customer#000000003", 1L));
+            rows.add(RowFactory.create(5L, 5L, 5L, 5L, "Customer#000000005", 1L));
+            if (!loseOneAggLayoutRecordForTest) {
+                rows.add(RowFactory.create(6L, 15L, 5L, 5L, "Customer#000000005", 1L));
+            }
+
+            Dataset<Row> layoutDS = ss.createDataFrame(rows, schema);
+
+            final long layoutId = 1L;
+            LayoutEntity layoutEntity = indexPlan.getLayoutEntity(layoutId);
+            if (CollectionUtils.isEmpty(nonExistingLayouts) || !nonExistingLayouts.contains(layoutId)) {
+                store.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(segment, layoutEntity.getId())), KapConfig.wrap(config), layoutDS);
+            }
+        }
+        // Prepare layout 10001
+        {
+            List<StructField> fields = new ArrayList<>();
+            for (ColumnStruct cs : toColumnStructs("24 22 100000 100001")) {
+                StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+                fields.add(field);
+            }
+            StructType schema = DataTypes.createStructType(fields);
+
+            List<Row> rows = new ArrayList<>();
+            rows.add(RowFactory.create(1L, "Customer#000000001", 2L, 20L));
+            rows.add(RowFactory.create(2L, "Customer#000000002", 1L, 5L));
+            rows.add(RowFactory.create(3L, "Customer#000000003", 1L, 15L));
+            rows.add(RowFactory.create(5L, "Customer#000000005", 2L, 20L));
+            Dataset<Row> layoutDS = ss.createDataFrame(rows, schema);
+
+            final long layoutId = 10001L;
+            LayoutEntity layoutEntity = indexPlan.getLayoutEntity(layoutId);
+            if (CollectionUtils.isEmpty(nonExistingLayouts) || !nonExistingLayouts.contains(layoutId)) {
+                store.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(segment, layoutEntity.getId())), KapConfig.wrap(config), layoutDS);
+            }
+        }
+        // Prepare layout 20000000001
+        {
+            List<StructField> fields = new ArrayList<>();
+            for (ColumnStruct cs : toColumnStructs("0 13 16 24 22")) {
+                StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+                fields.add(field);
+            }
+            StructType schema = DataTypes.createStructType(fields);
+
+            List<Row> rows = new ArrayList<>();
+            rows.add(RowFactory.create(1L, 5L, 1L, 1L, "Customer#000000001"));
+            rows.add(RowFactory.create(2L, 15L, 1L, 1L, "Customer#000000001"));
+            rows.add(RowFactory.create(3L, 5L, 2L, 2L, "Customer#000000002"));
+            rows.add(RowFactory.create(4L, 15L, 3L, 3L, "Customer#000000003"));
+            rows.add(RowFactory.create(5L, 5L, 5L, 5L, "Customer#000000005"));
+            if (!loseOneTableLayoutRecordForTest && !loseTwoTableLayoutRecordsForTest) {
+                rows.add(RowFactory.create(6L, 15L, 5L, 5L, "Customer#000000005"));
+            }
+            Dataset<Row> layoutDS = ss.createDataFrame(rows, schema);
+
+            final long layoutId = 20_000_000_001L;
+            LayoutEntity layoutEntity = indexPlan.getLayoutEntity(layoutId);
+            if (CollectionUtils.isEmpty(nonExistingLayouts) || !nonExistingLayouts.contains(layoutId)) {
+                store.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(segment, layoutEntity.getId())), KapConfig.wrap(config), layoutDS);
+            }
+        }
+        // Prepare layout 20000010001
+        {
+            List<StructField> fields = new ArrayList<>();
+            for (ColumnStruct cs : toColumnStructs("0 13")) {
+                StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+                fields.add(field);
+            }
+            StructType schema = DataTypes.createStructType(fields);
+
+            List<Row> rows = new ArrayList<>();
+            rows.add(RowFactory.create(1L, 5L));
+            rows.add(RowFactory.create(2L, 15L));
+            rows.add(RowFactory.create(3L, 5L));
+            rows.add(RowFactory.create(4L, 15L));
+            rows.add(RowFactory.create(5L, 5L));
+            if (!loseTwoTableLayoutRecordsForTest) {
+                rows.add(RowFactory.create(6L, 15L));
+            }
+            Dataset<Row> layoutDS = ss.createDataFrame(rows, schema);
+
+            final long layoutId = 20_000_010_001L;
+            LayoutEntity layoutEntity = indexPlan.getLayoutEntity(layoutId);
+            if (CollectionUtils.isEmpty(nonExistingLayouts) || !nonExistingLayouts.contains(layoutId)) {
+                store.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(segment, layoutEntity.getId())), KapConfig.wrap(config), layoutDS);
+            }
+        }
+    }
+
+    private List<ColumnStruct> toColumnStructs(String columnIds) {
+        return Arrays.stream(columnIds.split(" ")).map(id -> {
+            ColumnStruct cs = DIM_SCHEMAS.get(Long.valueOf(id));
+            if (cs == null) {
+                cs = MEASURE_SCHEMAS.get(Long.valueOf(id));
+            }
+            return cs;
+        }).collect(Collectors.toList());
+    }
+
+    private ExecutableState executeJob(NDataSegment segment, Set<LayoutEntity> readOnlyLayouts) throws InterruptedException {
+        NExecutableManager executableManager = NExecutableManager.getInstance(config, getProject());
+
+        NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), readOnlyLayouts, "ADMIN", null);
+        executableManager.addJob(job);
+        return IndexDataConstructor.wait(job);
+    }
+}
diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/DataflowCleanerCLI.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/DataflowCleanerCLI.java
index 22f2e5f93a..a6390150a1 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/DataflowCleanerCLI.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/DataflowCleanerCLI.java
@@ -78,7 +78,7 @@ public class DataflowCleanerCLI {
         val layoutIds = getLayouts(dataflow);
         val toBeRemoved = Sets.<Long> newHashSet();
         for (NDataSegment segment : dataflow.getSegments()) {
-            toBeRemoved.addAll(segment.getSegDetails().getLayouts().stream().map(NDataLayout::getLayoutId)
+            toBeRemoved.addAll(segment.getSegDetails().getAllLayouts().stream().map(NDataLayout::getLayoutId)
                     .filter(id -> !layoutIds.contains(id)).collect(Collectors.toSet()));
         }
         dataflowManager.removeLayouts(dataflow, Lists.newArrayList(toBeRemoved));