You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:27:59 UTC
[kylin] 03/17: KYLIN-5390 Build segment support for overlap segments
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a9de3a2a27169a9695684a7dd1c2f7c5f807e10d
Author: sibingzhang <74...@users.noreply.github.com>
AuthorDate: Wed Nov 9 14:49:15 2022 +0800
KYLIN-5390 Build segment support for overlap segments
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 ++
.../resources/kylin_error_msg_conf_cn.properties | 2 +-
.../resources/kylin_error_msg_conf_en.properties | 2 +-
.../apache/kylin/common/KylinConfigBaseTest.java | 9 +++
.../org/apache/kylin/job/common/SegmentUtil.java | 18 ++---
.../org/apache/kylin/job/common/SegmentsTest.java | 53 ++++++++++++--
.../kylin/rest/controller/SegmentController.java | 3 +-
.../kylin/rest/service/ModelBuildService.java | 18 ++++-
.../apache/kylin/rest/service/ModelService.java | 53 +++++++++-----
.../kylin/rest/service/ModelServiceTest.java | 80 +++++++++++++++++++++-
.../spark/merger/AfterBuildResourceMerger.java | 6 +-
11 files changed, 209 insertions(+), 39 deletions(-)
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 64606df279..309ce19ced 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3670,6 +3670,10 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.second-storage.wait-lock-timeout", "180"));
}
+ public boolean isBuildSegmentOverlapEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.build.segment-overlap-enabled", FALSE));
+ }
+
public boolean getDDLEnabled(){
return Boolean.parseBoolean(getOptional("kylin.source.ddl.enabled", FALSE));
}
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
index 759514ee40..7fab056ac7 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
@@ -40,7 +40,7 @@ KE-010025201=无法找到相关 Cube。
## 100222XX Segment
KE-010022201=无法刷新,Segment 范围 “%s” 超出了加载数据的范围 “%s”。请修改后重试。
-KE-010022202=无法构建,待构建的范围和已构建的范围在 “%s” 到 “%s” 之间存在重合。请修改后重试。
+KE-010022202=无法构建,待构建的范围和已构建的范围 “%s” 存在重合。请修改后重试。
KE-010022203=无法刷新,请选择 Segment 后再试。
KE-010022204=无法刷新,部分 Segment 正在构建。请稍后再试。
KE-010022205=无法刷新,所选 Segment 范围为空。请重新选择后再试。
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
index 02ecccdb25..967a6c7d8c 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
@@ -40,7 +40,7 @@ KE-010025201=Can't find the cube.
## 100222XX Segment
KE-010022201=Can't refresh. The segment range "%s" exceeds the range of loaded data, which is "%s". Please modify and try again.
-KE-010022202=Can't build segment. The specified data range overlaps with the built segments from "%s" to "%s". Please modify and try again.
+KE-010022202=Can't build segment. The specified data range overlaps with the built segments "%s". Please modify and try again.
KE-010022203=Can't refresh. Please select segment and try again.
KE-010022204=Can't refresh, some segments are being built. Please try again later.
KE-010022205=Can't refresh, the selected segment range is empty. Please reselect and try again.
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 16f653d46b..f2c3cb1a2f 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1380,6 +1380,15 @@ class KylinConfigBaseTest {
val sub2 = config.getSubstitutor();
Assertions.assertSame(sub1, sub2);
}
+
+ @Test
+ void testIsBuildSegmentOverlapEnabled() {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty("kylin.build.segment-overlap-enabled", "false");
+ assertFalse(config.isBuildSegmentOverlapEnabled());
+ config.setProperty("kylin.build.segment-overlap-enabled", "true");
+ assertTrue(config.isBuildSegmentOverlapEnabled());
+ }
}
class EnvironmentUpdateUtils {
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
index e04777ef7f..0185cee866 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.kylin.job.common;
+import static org.apache.kylin.job.execution.JobTypeEnum.INC_BUILD;
import static org.apache.kylin.job.execution.JobTypeEnum.INDEX_BUILD;
import static org.apache.kylin.job.execution.JobTypeEnum.SUB_PARTITION_BUILD;
@@ -30,13 +31,13 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay;
import org.apache.kylin.metadata.model.Segments;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
@@ -64,12 +65,13 @@ public class SegmentUtil {
Segments<T> overlapSegs = segments.getSegmentsByRange(segment.getSegRange());
overlapSegs.remove(segment);
if (SegmentStatusEnum.NEW == segment.getStatus()) {
- if (CollectionUtils.isEmpty(overlapSegs)) {
- return SegmentStatusEnumToDisplay.LOADING;
+ if (!CollectionUtils.isEmpty(overlapSegs)
+ && overlapSegs.get(0).getSegRange().entireOverlaps(segment.getSegRange())) {
+ return SegmentStatusEnumToDisplay.REFRESHING;
}
- if (overlapSegs.get(0).getSegRange().entireOverlaps(segment.getSegRange())) {
- return SegmentStatusEnumToDisplay.REFRESHING;
+ if (CollectionUtils.isEmpty(overlapSegs) || anyIndexJobRunning(segment, executables)) {
+ return SegmentStatusEnumToDisplay.LOADING;
}
return SegmentStatusEnumToDisplay.MERGING;
@@ -102,7 +104,7 @@ public class SegmentUtil {
protected static <T extends ISegment> boolean anyIndexJobRunning(T segment) {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, segment.getModel().getProject());
- val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INDEX_BUILD,
+ val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INDEX_BUILD, INC_BUILD,
SUB_PARTITION_BUILD);
return executables.stream().anyMatch(task -> task.getSegmentIds().contains(segment.getId()));
}
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java b/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java
index 97eab4cdb8..587e688e21 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java
@@ -19,19 +19,23 @@
package org.apache.kylin.job.common;
import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.job.execution.DefaultExecutable;
+import org.apache.kylin.junit.TimeZoneTestRunner;
+import org.apache.kylin.metadata.cube.model.NBatchConstants;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay;
import org.apache.kylin.metadata.model.Segments;
-import org.apache.kylin.junit.TimeZoneTestRunner;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-import org.apache.kylin.metadata.cube.model.NDataflow;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
+import com.google.common.collect.Lists;
+
import lombok.val;
@RunWith(TimeZoneTestRunner.class)
@@ -59,6 +63,41 @@ public class SegmentsTest {
Assert.assertEquals(status, SegmentStatusEnumToDisplay.LOADING);
}
+ @Test
+ public void testGetSegmentStatusToDisplay_Loading() {
+ Segments segments = new Segments();
+ val seg = NDataSegment.empty();
+ seg.setId("1");
+ seg.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 10L));
+ seg.setStatus(SegmentStatusEnum.READY);
+ segments.add(seg);
+
+ val seg2 = NDataSegment.empty();
+ seg2.setId("2");
+ seg2.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 15L));
+ seg2.setStatus(SegmentStatusEnum.NEW);
+
+ val job = new DefaultExecutable();
+ job.setParam(NBatchConstants.P_SEGMENT_IDS, "2");
+
+ SegmentStatusEnumToDisplay status = SegmentUtil.getSegmentStatusToDisplay(segments, seg2,
+ Lists.newArrayList(job));
+ Assert.assertEquals(SegmentStatusEnumToDisplay.LOADING, status);
+
+ val seg3 = NDataSegment.empty();
+ seg3.setId("3");
+ seg3.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(11L, 12L));
+ seg3.setStatus(SegmentStatusEnum.NEW);
+ segments.add(seg3);
+
+ val job2 = new DefaultExecutable();
+ job2.setParam(NBatchConstants.P_SEGMENT_IDS, "3");
+
+ SegmentStatusEnumToDisplay status2 = SegmentUtil.getSegmentStatusToDisplay(segments, seg3,
+ Lists.newArrayList(job2));
+ Assert.assertEquals(SegmentStatusEnumToDisplay.LOADING, status2);
+ }
+
@Test
public void testGetSegmentStatusToDisplay_Ready() {
Segments segments = new Segments();
@@ -157,15 +196,21 @@ public class SegmentsTest {
newSeg.setStatus(SegmentStatusEnum.NEW);
segments.add(newSeg);
+ Mockito.mockStatic(SegmentUtil.class);
+ Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, newSeg, null)).thenCallRealMethod();
+ Mockito.when(SegmentUtil.anyIndexJobRunning(newSeg)).thenReturn(false);
SegmentStatusEnumToDisplay status = SegmentUtil.getSegmentStatusToDisplay(segments, newSeg, null);
Assert.assertEquals(status, SegmentStatusEnumToDisplay.MERGING);
+ Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, seg, null)).thenCallRealMethod();
+ Mockito.when(SegmentUtil.anyIndexJobRunning(seg)).thenReturn(false);
SegmentStatusEnumToDisplay status2 = SegmentUtil.getSegmentStatusToDisplay(segments, seg, null);
Assert.assertEquals(status2, SegmentStatusEnumToDisplay.LOCKED);
+ Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, seg2, null)).thenCallRealMethod();
+ Mockito.when(SegmentUtil.anyIndexJobRunning(seg2)).thenReturn(false);
SegmentStatusEnumToDisplay status3 = SegmentUtil.getSegmentStatusToDisplay(segments, seg2, null);
Assert.assertEquals(status3, SegmentStatusEnumToDisplay.LOCKED);
-
}
public NDataSegment newReadySegment(Long startTime, Long endTime) {
diff --git a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java
index 5f722bd282..d1f706e275 100644
--- a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java
+++ b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java
@@ -166,7 +166,8 @@ public class SegmentController extends BaseController {
modelId);
validateDataRange(buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd(), partitionColumnFormat);
val res = modelService.checkSegHoleExistIfNewRangeBuild(buildSegmentsRequest.getProject(), modelId,
- buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd());
+ buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd(),
+ buildSegmentsRequest.isBuildAllIndexes(), buildSegmentsRequest.getBatchIndexIds());
return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, res, "");
}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
index 93724c8c26..6d4e17b81b 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
@@ -25,6 +25,7 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CONCURR
import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_ABANDON;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_DUPLICATE;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_EMPTY;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_BUILD_RANGE_OVERLAP;
import java.io.IOException;
import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
+import java.util.StringJoiner;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
@@ -371,7 +373,9 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
Preconditions.checkArgument(!PushDownUtil.needPushdown(params.getStart(), params.getEnd()),
"Load data must set start and end date");
val segmentRangeToBuild = SourceFactory.getSource(table).getSegmentRange(params.getStart(), params.getEnd());
- modelService.checkSegmentToBuildOverlapsBuilt(project, modelId, segmentRangeToBuild);
+ List<NDataSegment> overlapSegments = modelService.checkSegmentToBuildOverlapsBuilt(project,
+ modelDescInTransaction, segmentRangeToBuild, params.isNeedBuild(), params.getBatchIndexIds());
+ buildSegmentOverlapExceptionInfo(overlapSegments);
modelService.saveDateFormatIfNotExist(project, modelId, params.getPartitionColFormat());
checkMultiPartitionBuildParam(modelDescInTransaction, params);
NDataSegment newSegment = getManager(NDataflowManager.class, project).appendSegment(df, segmentRangeToBuild,
@@ -394,6 +398,18 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
.licenseCheckWrap(project, () -> jobManager.addSegmentJob(jobParam)));
}
+ private void buildSegmentOverlapExceptionInfo(List<NDataSegment> overlapSegments) {
+ if (CollectionUtils.isEmpty(overlapSegments)) {
+ return;
+ }
+
+ StringJoiner joiner = new StringJoiner(",", "[", "]");
+ for (NDataSegment seg : overlapSegments) {
+ joiner.add(seg.getName());
+ }
+ throw new KylinException(SEGMENT_BUILD_RANGE_OVERLAP, joiner.toString());
+ }
+
public void checkMultiPartitionBuildParam(NDataModel model, IncrementBuildSegmentParams params) {
if (!model.isMultiPartitionModel()) {
return;
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 07cab15306..e4af48b957 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -57,7 +57,6 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_
import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_NOT_EXIST;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.PARAMETER_INVALID_SUPPORT_LIST;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.PROJECT_NOT_EXIST;
-import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_BUILD_RANGE_OVERLAP;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_LOCKED;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CONTAINS_GAPS;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_NOT_EXIST_ID;
@@ -929,7 +928,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, project);
return execManager.listPartialExec(path -> StringUtils.endsWith(path, modelId), ExecutableState::isRunning,
- INDEX_BUILD, JobTypeEnum.SUB_PARTITION_BUILD);
+ INDEX_BUILD, INC_BUILD, JobTypeEnum.SUB_PARTITION_BUILD);
}
public List<NDataSegmentResponse> getSegmentsResponse(String modelId, String project, String start, String end,
@@ -2295,7 +2294,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
}
public SegmentCheckResponse checkSegHoleExistIfNewRangeBuild(String project, String modelId, String start,
- String end) {
+ String end, boolean isBuildAllIndexes, List<Long> batchIndexIds) {
aclEvaluate.checkProjectOperationPermission(project);
Preconditions.checkArgument(!PushDownUtil.needPushdown(start, end), "Load data must set start and end date");
NDataModel dataModelDesc = getManager(NDataModelManager.class, project).getDataModelDesc(modelId);
@@ -2304,18 +2303,17 @@ public class ModelService extends AbstractModelService implements TableModelSupp
SegmentRange segmentRangeToBuild = SourceFactory.getSource(table).getSegmentRange(start, end);
List<NDataSegment> segmentGaps = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
.checkHoleIfNewSegBuild(modelId, segmentRangeToBuild);
-
- Segments<NDataSegment> segments = getSegmentsByRange(modelId, project, "0", "" + Long.MAX_VALUE);
- val overlapSegments = segments.stream().filter(seg -> seg.getSegRange().overlaps(segmentRangeToBuild))
- .map(seg -> new SegmentRangeResponse(seg.getTSRange().getStart(), seg.getTSRange().getEnd()))
+ List<NDataSegment> overlapSegments = checkSegmentToBuildOverlapsBuilt(project, dataModelDesc,
+ segmentRangeToBuild, isBuildAllIndexes, batchIndexIds);
+ val overlapSegmentResponses = overlapSegments.stream().map(
+ segment -> new SegmentRangeResponse(segment.getTSRange().getStart(), segment.getTSRange().getEnd()))
.collect(Collectors.toList());
-
SegmentCheckResponse segmentCheckResponse = new SegmentCheckResponse();
val segHoles = segmentGaps.stream()
.map(seg -> new SegmentRangeResponse(seg.getTSRange().getStart(), seg.getTSRange().getEnd()))
.collect(Collectors.toList());
segmentCheckResponse.setSegmentHoles(segHoles);
- segmentCheckResponse.setOverlapSegments(overlapSegments);
+ segmentCheckResponse.setOverlapSegments(overlapSegmentResponses);
return segmentCheckResponse;
}
@@ -2563,17 +2561,36 @@ public class ModelService extends AbstractModelService implements TableModelSupp
datamodelManager.updateDataModelDesc(modelUpdate);
}
- public void checkSegmentToBuildOverlapsBuilt(String project, String model, SegmentRange segmentRangeToBuild) {
- Segments<NDataSegment> segments = getSegmentsByRange(model, project, "0", "" + Long.MAX_VALUE);
- if (!CollectionUtils.isEmpty(segments)) {
- for (NDataSegment existedSegment : segments) {
- if (existedSegment.getSegRange().overlaps(segmentRangeToBuild)) {
- throw new KylinException(SEGMENT_BUILD_RANGE_OVERLAP,
- existedSegment.getSegRange().getStart().toString(),
- existedSegment.getSegRange().getEnd().toString());
- }
+ public List<NDataSegment> checkSegmentToBuildOverlapsBuilt(String project, NDataModel model,
+ SegmentRange<Long> segmentRangeToBuild, boolean isBuildAllIndexes, List<Long> batchIndexIds) {
+ boolean isOverlap;
+ Segments<NDataSegment> segments = getSegmentsByRange(model.getId(), project, "0", "" + Long.MAX_VALUE);
+ List<NDataSegment> overlapsBuiltSegment = Lists.newArrayListWithCapacity(segments.size());
+
+ if (CollectionUtils.isEmpty(segments)) {
+ return overlapsBuiltSegment;
+ }
+
+ boolean buildSegmentOverlapEnable = getIndexPlan(model.getId(), project).getConfig()
+ .isBuildSegmentOverlapEnabled();
+ boolean isBuildAllIndexesFinally = batchIndexIds == null || batchIndexIds.size() == 0
+ || batchIndexIds.size() == getIndexPlan(model.getId(), project).getAllIndexes().size();
+
+ for (NDataSegment existedSegment : segments) {
+ if (buildSegmentOverlapEnable && NDataModel.ModelType.BATCH == model.getModelType()
+ && !model.isMultiPartitionModel() && isBuildAllIndexes && isBuildAllIndexesFinally
+ && !SecondStorageUtil.isModelEnable(project, model.getId())) {
+ isOverlap = existedSegment.getSegRange().overlaps(segmentRangeToBuild)
+ && !segmentRangeToBuild.contains(existedSegment.getSegRange());
+ } else {
+ isOverlap = existedSegment.getSegRange().overlaps(segmentRangeToBuild);
+ }
+ if (isOverlap) {
+ overlapsBuiltSegment.add(existedSegment);
}
}
+
+ return overlapsBuiltSegment;
}
public ComputedColumnUsageResponse getComputedColumnUsages(String project) {
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
index e9dc0f22c3..8d3a4462b2 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
@@ -3889,19 +3889,93 @@ public class ModelServiceTest extends SourceTestCase {
Assert.assertEquals(0, res.getOverlapSegments().size());
Assert.assertEquals(1, res.getSegmentHoles().size());
- res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "20000", "30000");
+ res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "20000", "30000", true, null);
Assert.assertEquals(0, res.getOverlapSegments().size());
Assert.assertEquals(3, res.getSegmentHoles().size());
- res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "10");
+ res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "10", true, null);
Assert.assertEquals(0, res.getOverlapSegments().size());
Assert.assertEquals(1, res.getSegmentHoles().size());
- res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "5");
+ res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "5", true, null);
Assert.assertEquals(0, res.getOverlapSegments().size());
Assert.assertEquals(2, res.getSegmentHoles().size());
}
+ @Test
+ public void testCheckSegmentToBuildOverlapsBuilt() throws IOException {
+ KylinConfig kylinConfig = getTestConfig();
+ final String defaultProject = getProject();
+ final String streamingProject = "streaming_test";
+ NDataModelManager modelManager = NDataModelManager.getInstance(kylinConfig, defaultProject);
+
+ kylinConfig.setProperty("kylin.build.segment-overlap-enabled", "true");
+
+ List<NDataSegment> overlapSegments = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ modelManager.getDataModelDesc("b780e4e4-69af-449e-b09f-05c90dfa04b6"),
+ new SegmentRange.TimePartitionedSegmentRange(1604188800000L, 1604361600000L), true, null);
+ Assert.assertEquals(3, overlapSegments.size());
+
+ val streamingModelManager = NDataModelManager.getInstance(getTestConfig(), streamingProject);
+ List<NDataSegment> overlapSegments2 = modelService.checkSegmentToBuildOverlapsBuilt(streamingProject,
+ streamingModelManager.getDataModelDesc("e78a89dd-847f-4574-8afa-8768b4228b74"),
+ new SegmentRange.KafkaOffsetPartitionedSegmentRange(1613957110000L, 1613957130000L), true, null);
+ Assert.assertEquals(2, overlapSegments2.size());
+
+ String modelId = "abe3bf1a-c4bc-458d-8278-7ea8b00f5e96";
+ NDataModel dataModelDesc = modelManager.getDataModelDesc(modelId);
+ List<NDataSegment> overlapSegments3 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true,
+ null);
+ Assert.assertEquals(0, overlapSegments3.size());
+
+ List<NDataSegment> overlapSegments4 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true,
+ null);
+ Assert.assertEquals(0, overlapSegments4.size());
+
+ List<NDataSegment> overlapSegments5 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), false,
+ null);
+ Assert.assertEquals(1, overlapSegments5.size());
+
+ List<NDataSegment> overlapSegments6 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true,
+ Lists.newArrayList());
+ Assert.assertEquals(0, overlapSegments6.size());
+
+ List<NDataSegment> overlapSegments7 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true,
+ Lists.newArrayList(10000L));
+ Assert.assertEquals(1, overlapSegments7.size());
+
+ List<NDataSegment> overlapSegments8 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513780L, 1509891513760L), true,
+ null);
+ Assert.assertEquals(1, overlapSegments8.size());
+
+ MockSecondStorage.mock(defaultProject, new ArrayList<>(), this);
+ val indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), defaultProject);
+ EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+ indexPlanManager.updateIndexPlan(modelId, indexPlan -> {
+ indexPlan.createAndAddBaseIndex(indexPlan.getModel());
+ });
+ return null;
+ }, defaultProject);
+ SecondStorageUtil.initModelMetaData(defaultProject, modelId);
+ Assert.assertTrue(SecondStorageUtil.isModelEnable(defaultProject, modelId));
+ List<NDataSegment> overlapSegments31 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true,
+ null);
+ Assert.assertEquals(1, overlapSegments31.size());
+
+ kylinConfig.setProperty("kylin.build.segment-overlap-enabled", "false");
+ List<NDataSegment> overlapSegments9 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject,
+ dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true,
+ null);
+ Assert.assertEquals(1, overlapSegments9.size());
+ }
+
@Test
public void testUpdateModelOwner() throws IOException {
String project = "default";
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
index 1913f45ba2..117d381df8 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
@@ -25,16 +25,16 @@ import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.engine.spark.ExecutableUtils;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.JobTypeEnum;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.engine.spark.ExecutableUtils;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -91,6 +91,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger {
val dfUpdate = new NDataflowUpdate(flowName);
val theSeg = remoteDataflow.getSegment(segmentId);
+ val toRemoveSegments = remoteDataflowManager.getToRemoveSegs(remoteDataflow, theSeg);
if (theSeg.getModel().isMultiPartitionModel()) {
final long lastBuildTime = System.currentTimeMillis();
@@ -107,6 +108,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger {
theSeg.setStatus(SegmentStatusEnum.READY);
dfUpdate.setToUpdateSegs(theSeg);
+ dfUpdate.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[toRemoveSegments.size()]));
dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getLayouts().toArray(new NDataLayout[0]));
localDataflowManager.updateDataflow(dfUpdate);