You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:27:59 UTC

[kylin] 03/17: KYLIN-5390 Build segment support for overlap segments

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit a9de3a2a27169a9695684a7dd1c2f7c5f807e10d
Author: sibingzhang <74...@users.noreply.github.com>
AuthorDate: Wed Nov 9 14:49:15 2022 +0800

    KYLIN-5390 Build segment support for overlap segments
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++
 .../resources/kylin_error_msg_conf_cn.properties   |  2 +-
 .../resources/kylin_error_msg_conf_en.properties   |  2 +-
 .../apache/kylin/common/KylinConfigBaseTest.java   |  9 +++
 .../org/apache/kylin/job/common/SegmentUtil.java   | 18 ++---
 .../org/apache/kylin/job/common/SegmentsTest.java  | 53 ++++++++++++--
 .../kylin/rest/controller/SegmentController.java   |  3 +-
 .../kylin/rest/service/ModelBuildService.java      | 18 ++++-
 .../apache/kylin/rest/service/ModelService.java    | 53 +++++++++-----
 .../kylin/rest/service/ModelServiceTest.java       | 80 +++++++++++++++++++++-
 .../spark/merger/AfterBuildResourceMerger.java     |  6 +-
 11 files changed, 209 insertions(+), 39 deletions(-)

diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 64606df279..309ce19ced 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3670,6 +3670,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.second-storage.wait-lock-timeout", "180"));
     }
 
+    public boolean isBuildSegmentOverlapEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.build.segment-overlap-enabled", FALSE));
+    }
+
     public boolean getDDLEnabled(){
         return Boolean.parseBoolean(getOptional("kylin.source.ddl.enabled", FALSE));
     }
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
index 759514ee40..7fab056ac7 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
@@ -40,7 +40,7 @@ KE-010025201=无法找到相关 Cube。
 
 ## 100222XX Segment
 KE-010022201=无法刷新,Segment 范围 “%s” 超出了加载数据的范围 “%s”。请修改后重试。
-KE-010022202=无法构建,待构建的范围和已构建的范围在 “%s” 到 “%s” 之间存在重合。请修改后重试。
+KE-010022202=无法构建,待构建的范围和已构建的范围 “%s” 存在重合。请修改后重试。
 KE-010022203=无法刷新,请选择 Segment 后再试。
 KE-010022204=无法刷新,部分 Segment 正在构建。请稍后再试。
 KE-010022205=无法刷新,所选 Segment 范围为空。请重新选择后再试。
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
index 02ecccdb25..967a6c7d8c 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
@@ -40,7 +40,7 @@ KE-010025201=Can't find the cube.
 
 ## 100222XX Segment
 KE-010022201=Can't refresh. The segment range "%s" exceeds the range of loaded data, which is "%s". Please modify and try again.
-KE-010022202=Can't build segment. The specified data range overlaps with the built segments from "%s" to "%s". Please modify and try again.
+KE-010022202=Can't build segment. The specified data range overlaps with the built segments "%s". Please modify and try again.
 KE-010022203=Can't refresh. Please select segment and try again.
 KE-010022204=Can't refresh, some segments are being built. Please try again later.
 KE-010022205=Can't refresh, the selected segment range is empty. Please reselect and try again.
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 16f653d46b..f2c3cb1a2f 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1380,6 +1380,15 @@ class KylinConfigBaseTest {
         val sub2 = config.getSubstitutor();
         Assertions.assertSame(sub1, sub2);
     }
+
+    @Test
+    void testIsBuildSegmentOverlapEnabled() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.build.segment-overlap-enabled", "false");
+        assertFalse(config.isBuildSegmentOverlapEnabled());
+        config.setProperty("kylin.build.segment-overlap-enabled", "true");
+        assertTrue(config.isBuildSegmentOverlapEnabled());
+    }
 }
 
 class EnvironmentUpdateUtils {
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
index e04777ef7f..0185cee866 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.kylin.job.common;
 
+import static org.apache.kylin.job.execution.JobTypeEnum.INC_BUILD;
 import static org.apache.kylin.job.execution.JobTypeEnum.INDEX_BUILD;
 import static org.apache.kylin.job.execution.JobTypeEnum.SUB_PARTITION_BUILD;
 
@@ -30,13 +31,13 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay;
 import org.apache.kylin.metadata.model.Segments;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
@@ -64,12 +65,13 @@ public class SegmentUtil {
         Segments<T> overlapSegs = segments.getSegmentsByRange(segment.getSegRange());
         overlapSegs.remove(segment);
         if (SegmentStatusEnum.NEW == segment.getStatus()) {
-            if (CollectionUtils.isEmpty(overlapSegs)) {
-                return SegmentStatusEnumToDisplay.LOADING;
+            if (!CollectionUtils.isEmpty(overlapSegs)
+                    && overlapSegs.get(0).getSegRange().entireOverlaps(segment.getSegRange())) {
+                return SegmentStatusEnumToDisplay.REFRESHING;
             }
 
-            if (overlapSegs.get(0).getSegRange().entireOverlaps(segment.getSegRange())) {
-                return SegmentStatusEnumToDisplay.REFRESHING;
+            if (CollectionUtils.isEmpty(overlapSegs) || anyIndexJobRunning(segment, executables)) {
+                return SegmentStatusEnumToDisplay.LOADING;
             }
 
             return SegmentStatusEnumToDisplay.MERGING;
@@ -102,7 +104,7 @@ public class SegmentUtil {
     protected static <T extends ISegment> boolean anyIndexJobRunning(T segment) {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, segment.getModel().getProject());
-        val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INDEX_BUILD,
+        val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INDEX_BUILD, INC_BUILD,
                 SUB_PARTITION_BUILD);
         return executables.stream().anyMatch(task -> task.getSegmentIds().contains(segment.getId()));
     }
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java b/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java
index 97eab4cdb8..587e688e21 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java
@@ -19,19 +19,23 @@
 package org.apache.kylin.job.common;
 
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.job.execution.DefaultExecutable;
+import org.apache.kylin.junit.TimeZoneTestRunner;
+import org.apache.kylin.metadata.cube.model.NBatchConstants;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay;
 import org.apache.kylin.metadata.model.Segments;
-import org.apache.kylin.junit.TimeZoneTestRunner;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
 
+import com.google.common.collect.Lists;
+
 import lombok.val;
 
 @RunWith(TimeZoneTestRunner.class)
@@ -59,6 +63,41 @@ public class SegmentsTest {
         Assert.assertEquals(status, SegmentStatusEnumToDisplay.LOADING);
     }
 
+    @Test
+    public void testGetSegmentStatusToDisplay_Loading() {
+        Segments segments = new Segments();
+        val seg = NDataSegment.empty();
+        seg.setId("1");
+        seg.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 10L));
+        seg.setStatus(SegmentStatusEnum.READY);
+        segments.add(seg);
+
+        val seg2 = NDataSegment.empty();
+        seg2.setId("2");
+        seg2.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 15L));
+        seg2.setStatus(SegmentStatusEnum.NEW);
+
+        val job = new DefaultExecutable();
+        job.setParam(NBatchConstants.P_SEGMENT_IDS, "2");
+
+        SegmentStatusEnumToDisplay status = SegmentUtil.getSegmentStatusToDisplay(segments, seg2,
+                Lists.newArrayList(job));
+        Assert.assertEquals(SegmentStatusEnumToDisplay.LOADING, status);
+
+        val seg3 = NDataSegment.empty();
+        seg3.setId("3");
+        seg3.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(11L, 12L));
+        seg3.setStatus(SegmentStatusEnum.NEW);
+        segments.add(seg3);
+
+        val job2 = new DefaultExecutable();
+        job2.setParam(NBatchConstants.P_SEGMENT_IDS, "3");
+
+        SegmentStatusEnumToDisplay status2 = SegmentUtil.getSegmentStatusToDisplay(segments, seg3,
+                Lists.newArrayList(job2));
+        Assert.assertEquals(SegmentStatusEnumToDisplay.LOADING, status2);
+    }
+
     @Test
     public void testGetSegmentStatusToDisplay_Ready() {
         Segments segments = new Segments();
@@ -157,15 +196,21 @@ public class SegmentsTest {
         newSeg.setStatus(SegmentStatusEnum.NEW);
         segments.add(newSeg);
 
+        Mockito.mockStatic(SegmentUtil.class);
+        Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, newSeg, null)).thenCallRealMethod();
+        Mockito.when(SegmentUtil.anyIndexJobRunning(newSeg)).thenReturn(false);
         SegmentStatusEnumToDisplay status = SegmentUtil.getSegmentStatusToDisplay(segments, newSeg, null);
         Assert.assertEquals(status, SegmentStatusEnumToDisplay.MERGING);
 
+        Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, seg, null)).thenCallRealMethod();
+        Mockito.when(SegmentUtil.anyIndexJobRunning(seg)).thenReturn(false);
         SegmentStatusEnumToDisplay status2 = SegmentUtil.getSegmentStatusToDisplay(segments, seg, null);
         Assert.assertEquals(status2, SegmentStatusEnumToDisplay.LOCKED);
 
+        Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, seg2, null)).thenCallRealMethod();
+        Mockito.when(SegmentUtil.anyIndexJobRunning(seg2)).thenReturn(false);
         SegmentStatusEnumToDisplay status3 = SegmentUtil.getSegmentStatusToDisplay(segments, seg2, null);
         Assert.assertEquals(status3, SegmentStatusEnumToDisplay.LOCKED);
-
     }
 
     public NDataSegment newReadySegment(Long startTime, Long endTime) {
diff --git a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java
index 5f722bd282..d1f706e275 100644
--- a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java
+++ b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java
@@ -166,7 +166,8 @@ public class SegmentController extends BaseController {
                 modelId);
         validateDataRange(buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd(), partitionColumnFormat);
         val res = modelService.checkSegHoleExistIfNewRangeBuild(buildSegmentsRequest.getProject(), modelId,
-                buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd());
+                buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd(),
+                buildSegmentsRequest.isBuildAllIndexes(), buildSegmentsRequest.getBatchIndexIds());
         return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, res, "");
     }
 
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
index 93724c8c26..6d4e17b81b 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
@@ -25,6 +25,7 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CONCURR
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_ABANDON;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_DUPLICATE;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_EMPTY;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_BUILD_RANGE_OVERLAP;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
 import java.util.Set;
+import java.util.StringJoiner;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -371,7 +373,9 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
         Preconditions.checkArgument(!PushDownUtil.needPushdown(params.getStart(), params.getEnd()),
                 "Load data must set start and end date");
         val segmentRangeToBuild = SourceFactory.getSource(table).getSegmentRange(params.getStart(), params.getEnd());
-        modelService.checkSegmentToBuildOverlapsBuilt(project, modelId, segmentRangeToBuild);
+        List<NDataSegment> overlapSegments = modelService.checkSegmentToBuildOverlapsBuilt(project,
+                modelDescInTransaction, segmentRangeToBuild, params.isNeedBuild(), params.getBatchIndexIds());
+        buildSegmentOverlapExceptionInfo(overlapSegments);
         modelService.saveDateFormatIfNotExist(project, modelId, params.getPartitionColFormat());
         checkMultiPartitionBuildParam(modelDescInTransaction, params);
         NDataSegment newSegment = getManager(NDataflowManager.class, project).appendSegment(df, segmentRangeToBuild,
@@ -394,6 +398,18 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
                 .licenseCheckWrap(project, () -> jobManager.addSegmentJob(jobParam)));
     }
 
+    private void buildSegmentOverlapExceptionInfo(List<NDataSegment> overlapSegments) {
+        if (CollectionUtils.isEmpty(overlapSegments)) {
+            return;
+        }
+
+        StringJoiner joiner = new StringJoiner(",", "[", "]");
+        for (NDataSegment seg : overlapSegments) {
+            joiner.add(seg.getName());
+        }
+        throw new KylinException(SEGMENT_BUILD_RANGE_OVERLAP, joiner.toString());
+    }
+
     public void checkMultiPartitionBuildParam(NDataModel model, IncrementBuildSegmentParams params) {
         if (!model.isMultiPartitionModel()) {
             return;
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 07cab15306..e4af48b957 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -57,7 +57,6 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_NOT_EXIST;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.PARAMETER_INVALID_SUPPORT_LIST;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.PROJECT_NOT_EXIST;
-import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_BUILD_RANGE_OVERLAP;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_LOCKED;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CONTAINS_GAPS;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_NOT_EXIST_ID;
@@ -929,7 +928,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, project);
         return execManager.listPartialExec(path -> StringUtils.endsWith(path, modelId), ExecutableState::isRunning,
-                INDEX_BUILD, JobTypeEnum.SUB_PARTITION_BUILD);
+                INDEX_BUILD, INC_BUILD, JobTypeEnum.SUB_PARTITION_BUILD);
     }
 
     public List<NDataSegmentResponse> getSegmentsResponse(String modelId, String project, String start, String end,
@@ -2295,7 +2294,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
     }
 
     public SegmentCheckResponse checkSegHoleExistIfNewRangeBuild(String project, String modelId, String start,
-            String end) {
+            String end, boolean isBuildAllIndexes, List<Long> batchIndexIds) {
         aclEvaluate.checkProjectOperationPermission(project);
         Preconditions.checkArgument(!PushDownUtil.needPushdown(start, end), "Load data must set start and end date");
         NDataModel dataModelDesc = getManager(NDataModelManager.class, project).getDataModelDesc(modelId);
@@ -2304,18 +2303,17 @@ public class ModelService extends AbstractModelService implements TableModelSupp
         SegmentRange segmentRangeToBuild = SourceFactory.getSource(table).getSegmentRange(start, end);
         List<NDataSegment> segmentGaps = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
                 .checkHoleIfNewSegBuild(modelId, segmentRangeToBuild);
-
-        Segments<NDataSegment> segments = getSegmentsByRange(modelId, project, "0", "" + Long.MAX_VALUE);
-        val overlapSegments = segments.stream().filter(seg -> seg.getSegRange().overlaps(segmentRangeToBuild))
-                .map(seg -> new SegmentRangeResponse(seg.getTSRange().getStart(), seg.getTSRange().getEnd()))
+        List<NDataSegment> overlapSegments = checkSegmentToBuildOverlapsBuilt(project, dataModelDesc,
+                segmentRangeToBuild, isBuildAllIndexes, batchIndexIds);
+        val overlapSegmentResponses = overlapSegments.stream().map(
+                segment -> new SegmentRangeResponse(segment.getTSRange().getStart(), segment.getTSRange().getEnd()))
                 .collect(Collectors.toList());
-
         SegmentCheckResponse segmentCheckResponse = new SegmentCheckResponse();
         val segHoles = segmentGaps.stream()
                 .map(seg -> new SegmentRangeResponse(seg.getTSRange().getStart(), seg.getTSRange().getEnd()))
                 .collect(Collectors.toList());
         segmentCheckResponse.setSegmentHoles(segHoles);
-        segmentCheckResponse.setOverlapSegments(overlapSegments);
+        segmentCheckResponse.setOverlapSegments(overlapSegmentResponses);
         return segmentCheckResponse;
     }
 
@@ -2563,17 +2561,36 @@ public class ModelService extends AbstractModelService implements TableModelSupp
         datamodelManager.updateDataModelDesc(modelUpdate);
     }
 
-    public void checkSegmentToBuildOverlapsBuilt(String project, String model, SegmentRange segmentRangeToBuild) {
-        Segments<NDataSegment> segments = getSegmentsByRange(model, project, "0", "" + Long.MAX_VALUE);
-        if (!CollectionUtils.isEmpty(segments)) {
-            for (NDataSegment existedSegment : segments) {
-                if (existedSegment.getSegRange().overlaps(segmentRangeToBuild)) {
-                    throw new KylinException(SEGMENT_BUILD_RANGE_OVERLAP,
-                            existedSegment.getSegRange().getStart().toString(),
-                            existedSegment.getSegRange().getEnd().toString());
-                }
+    public List<NDataSegment> checkSegmentToBuildOverlapsBuilt(String project, NDataModel model,
+            SegmentRange<Long> segmentRangeToBuild, boolean isBuildAllIndexes, List<Long> batchIndexIds) {
+        boolean isOverlap;
+        Segments<NDataSegment> segments = getSegmentsByRange(model.getId(), project, "0", "" + Long.MAX_VALUE);
+        List<NDataSegment> overlapsBuiltSegment = Lists.newArrayListWithCapacity(segments.size());
+
+        if (CollectionUtils.isEmpty(segments)) {
+            return overlapsBuiltSegment;
+        }
+
+        boolean buildSegmentOverlapEnable = getIndexPlan(model.getId(), project).getConfig()
+                .isBuildSegmentOverlapEnabled();
+        boolean isBuildAllIndexesFinally = batchIndexIds == null || batchIndexIds.size() == 0
+                || batchIndexIds.size() == getIndexPlan(model.getId(), project).getAllIndexes().size();
+
+        for (NDataSegment existedSegment : segments) {
+            if (buildSegmentOverlapEnable && NDataModel.ModelType.BATCH == model.getModelType()
+                    && !model.isMultiPartitionModel() && isBuildAllIndexes && isBuildAllIndexesFinally
+                    && !SecondStorageUtil.isModelEnable(project, model.getId())) {
+                isOverlap = existedSegment.getSegRange().overlaps(segmentRangeToBuild)
+                        && !segmentRangeToBuild.contains(existedSegment.getSegRange());
+            } else {
+                isOverlap = existedSegment.getSegRange().overlaps(segmentRangeToBuild);
+            }
+            if (isOverlap) {
+                overlapsBuiltSegment.add(existedSegment);
             }
         }
+
+        return overlapsBuiltSegment;
     }
 
     public ComputedColumnUsageResponse getComputedColumnUsages(String project) {
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
index e9dc0f22c3..8d3a4462b2 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
@@ -3889,19 +3889,93 @@ public class ModelServiceTest extends SourceTestCase {
         Assert.assertEquals(0, res.getOverlapSegments().size());
         Assert.assertEquals(1, res.getSegmentHoles().size());
 
-        res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "20000", "30000");
+        res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "20000", "30000", true, null);
         Assert.assertEquals(0, res.getOverlapSegments().size());
         Assert.assertEquals(3, res.getSegmentHoles().size());
 
-        res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "10");
+        res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "10", true, null);
         Assert.assertEquals(0, res.getOverlapSegments().size());
         Assert.assertEquals(1, res.getSegmentHoles().size());
 
-        res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "5");
+        res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "5", true, null);
         Assert.assertEquals(0, res.getOverlapSegments().size());
         Assert.assertEquals(2, res.getSegmentHoles().size());
     }
 
+    @Test
+    public void testCheckSegmentToBuildOverlapsBuilt() throws IOException {
+        KylinConfig kylinConfig = getTestConfig();
+        final String defaultProject = getProject();
+        final String streamingProject = "streaming_test";
+        NDataModelManager modelManager = NDataModelManager.getInstance(kylinConfig, defaultProject);
+
+        kylinConfig.setProperty("kylin.build.segment-overlap-enabled", "true");
+
+        List<NDataSegment> overlapSegments = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                modelManager.getDataModelDesc("b780e4e4-69af-449e-b09f-05c90dfa04b6"),
+                new SegmentRange.TimePartitionedSegmentRange(1604188800000L, 1604361600000L), true, null);
+        Assert.assertEquals(3, overlapSegments.size());
+
+        val streamingModelManager = NDataModelManager.getInstance(getTestConfig(), streamingProject);
+        List<NDataSegment> overlapSegments2 = modelService.checkSegmentToBuildOverlapsBuilt(streamingProject,
+                streamingModelManager.getDataModelDesc("e78a89dd-847f-4574-8afa-8768b4228b74"),
+                new SegmentRange.KafkaOffsetPartitionedSegmentRange(1613957110000L, 1613957130000L), true, null);
+        Assert.assertEquals(2, overlapSegments2.size());
+
+        String modelId = "abe3bf1a-c4bc-458d-8278-7ea8b00f5e96";
+        NDataModel dataModelDesc = modelManager.getDataModelDesc(modelId);
+        List<NDataSegment> overlapSegments3 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true,
+                null);
+        Assert.assertEquals(0, overlapSegments3.size());
+
+        List<NDataSegment> overlapSegments4 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true,
+                null);
+        Assert.assertEquals(0, overlapSegments4.size());
+
+        List<NDataSegment> overlapSegments5 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), false,
+                null);
+        Assert.assertEquals(1, overlapSegments5.size());
+
+        List<NDataSegment> overlapSegments6 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true,
+                Lists.newArrayList());
+        Assert.assertEquals(0, overlapSegments6.size());
+
+        List<NDataSegment> overlapSegments7 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true,
+                Lists.newArrayList(10000L));
+        Assert.assertEquals(1, overlapSegments7.size());
+
+        List<NDataSegment> overlapSegments8 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513780L, 1509891513760L), true,
+                null);
+        Assert.assertEquals(1, overlapSegments8.size());
+
+        MockSecondStorage.mock(defaultProject, new ArrayList<>(), this);
+        val indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), defaultProject);
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            indexPlanManager.updateIndexPlan(modelId, indexPlan -> {
+                indexPlan.createAndAddBaseIndex(indexPlan.getModel());
+            });
+            return null;
+        }, defaultProject);
+        SecondStorageUtil.initModelMetaData(defaultProject, modelId);
+        Assert.assertTrue(SecondStorageUtil.isModelEnable(defaultProject, modelId));
+        List<NDataSegment> overlapSegments31 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true,
+                null);
+        Assert.assertEquals(1, overlapSegments31.size());
+
+        kylinConfig.setProperty("kylin.build.segment-overlap-enabled", "false");
+        List<NDataSegment> overlapSegments9 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+                dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true,
+                null);
+        Assert.assertEquals(1, overlapSegments9.size());
+    }
+
     @Test
     public void testUpdateModelOwner() throws IOException {
         String project = "default";
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
index 1913f45ba2..117d381df8 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
@@ -25,16 +25,16 @@ import java.util.Set;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.engine.spark.ExecutableUtils;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.JobTypeEnum;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.engine.spark.ExecutableUtils;
 import org.apache.kylin.metadata.cube.model.NDataLayout;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
 import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -91,6 +91,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger {
 
         val dfUpdate = new NDataflowUpdate(flowName);
         val theSeg = remoteDataflow.getSegment(segmentId);
+        val toRemoveSegments = remoteDataflowManager.getToRemoveSegs(remoteDataflow, theSeg);
 
         if (theSeg.getModel().isMultiPartitionModel()) {
             final long lastBuildTime = System.currentTimeMillis();
@@ -107,6 +108,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger {
 
         theSeg.setStatus(SegmentStatusEnum.READY);
         dfUpdate.setToUpdateSegs(theSeg);
+        dfUpdate.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[toRemoveSegments.size()]));
         dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getLayouts().toArray(new NDataLayout[0]));
 
         localDataflowManager.updateDataflow(dfUpdate);