You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/05/06 06:59:12 UTC
[kylin] 11/38: KYLIN-5527 Data count check for segment layout building
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 59a2e21034f7ccbf859e3aee94668a5ce8a4ec1a
Author: Yinghao Lin <39...@users.noreply.github.com>
AuthorDate: Tue Feb 21 11:35:14 2023 +0800
KYLIN-5527 Data count check for segment layout building
---
.../kylin/rest/service/AccessServiceTest.java | 4 +-
.../kylin/rest/service/AclTCRServiceTest.java | 2 +-
.../org/apache/kylin/common/KylinConfigBase.java | 12 +
.../org/apache/kylin/common/util/JsonUtil.java | 1 +
.../apache/kylin/job/constant/JobStatusEnum.java | 6 +
.../kylin/job/execution/AbstractExecutable.java | 10 +-
.../kylin/job/execution/DefaultExecutable.java | 45 ++-
.../kylin/job/execution/ExecutableState.java | 11 +-
.../apache/kylin/job/execution/ExecuteResult.java | 23 +-
.../kylin/job/execution/NExecutableManager.java | 7 +
.../job/execution/NExecutableManagerTest.java | 21 +
.../kylin/metadata/cube/model/NBatchConstants.java | 1 +
.../kylin/metadata/cube/model/NDataLayout.java | 19 +-
.../kylin/metadata/cube/model/NDataSegDetails.java | 33 +-
.../kylin/metadata/cube/model/NDataSegment.java | 63 ++-
.../metadata/cube/model/NDataflowManager.java | 4 +-
.../metadata/cube/model/SegmentPartition.java | 2 +-
.../org/apache/kylin/metadata/model/ISegment.java | 2 +
.../metadata/project/NProjectManagerTest.java | 2 +-
.../kylin/metrics/HdfsCapacityMetricsTest.java | 4 +-
.../kylin/rest/response/ExecutableResponse.java | 6 +-
.../rest/response/ExecutableStepResponse.java | 3 +
.../org/apache/kylin/rest/service/JobService.java | 8 +-
.../org/apache/kylin/rest/service/StageTest.java | 3 +
.../metadata/_global/project/index_build_test.json | 40 ++
.../3ec47efc-573a-9304-4405-8e05ae184322.json | 70 ++++
.../b2f206e1-7a15-c94a-20f5-f608d550ead6.json | 69 ++++
.../3ec47efc-573a-9304-4405-8e05ae184322.json | 204 ++++++++++
.../3ec47efc-573a-9304-4405-8e05ae184322.json | 241 +++++++++++
.../index_build_test/table/SSB.CUSTOMER.json | 77 ++++
.../index_build_test/table/SSB.LINEORDER.json | 131 ++++++
.../rest/controller/NIndexPlanController.java | 7 +-
.../apache/kylin/rest/response/IndexResponse.java | 4 +
.../kylin/rest/response/NDataSegmentResponse.java | 4 +-
.../kylin/rest/service/FusionIndexService.java | 24 +-
.../kylin/rest/service/IndexPlanService.java | 50 ++-
.../apache/kylin/rest/service/ModelService.java | 4 +-
.../rest/service/params/IndexPlanParams.java} | 31 +-
.../rest/service/params/PaginationParams.java} | 27 +-
.../kylin/rest/service/ProjectServiceTest.java | 6 +-
.../rest/service/QueryHistoryServiceTest.java | 2 +-
.../kylin/query/engine/AsyncQueryJobTest.java | 2 +-
src/spark-project/engine-spark/pom.xml | 5 +
.../engine/spark/application/SparkApplication.java | 17 +-
.../builder/PartitionDictionaryBuilderHelper.java | 2 +-
.../spark/job/ExecutableAddCuboidHandler.java | 46 ++-
.../kylin/engine/spark/job/NSparkCubingStep.java | 42 +-
.../spark/merger/AfterBuildResourceMerger.java | 4 +-
.../merger/AfterMergeOrRefreshResourceMerger.java | 4 +-
.../spark/builder/DictionaryBuilderHelper.java | 2 +-
.../kylin/engine/spark/job/BuildJobInfos.scala | 21 +-
.../apache/kylin/engine/spark/job/DFMergeJob.java | 2 +-
.../kylin/engine/spark/job/PartitionExec.scala | 4 +
.../kylin/engine/spark/job/SegmentBuildJob.java | 14 +-
.../kylin/engine/spark/job/SegmentExec.scala | 38 +-
.../apache/kylin/engine/spark/job/SegmentJob.java | 4 +
.../kylin/engine/spark/job/SegmentMergeJob.java | 3 +-
.../kylin/engine/spark/job/stage/StageExec.scala | 113 +++---
.../engine/spark/job/stage/build/BuildLayer.scala | 23 +-
.../engine/spark/job/stage/build/BuildStage.scala | 6 -
.../job/stage/build/GatherFlatTableStats.scala | 2 -
.../spark/job/stage/build/GenerateFlatTable.scala | 145 ++++++-
.../build/partition/PartitionBuildLayer.scala | 23 +-
.../build/partition/PartitionBuildStage.scala | 6 -
.../engine/spark/job/stage/merge/MergeStage.scala | 2 +-
.../merge/partition/PartitionMergeStage.scala | 2 +-
.../GenerateFlatTableWithSparkSessionTest.java | 444 +++++++++++++++++++++
.../kylin/tool/garbage/DataflowCleanerCLI.java | 2 +-
68 files changed, 1976 insertions(+), 285 deletions(-)
diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
index b0f9142756..7fc2665b6f 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java
@@ -596,14 +596,14 @@ public class AccessServiceTest extends NLocalFileMetadataTestCase {
@Test
public void testGetGrantedProjectsOfUser() throws IOException {
List<String> result = accessService.getGrantedProjectsOfUser("ADMIN");
- assertEquals(28, result.size());
+ assertEquals(29, result.size());
}
@Test
public void testGetGrantedProjectsOfUserOrGroup() throws IOException {
// admin user
List<String> result = accessService.getGrantedProjectsOfUserOrGroup("ADMIN", true);
- assertEquals(28, result.size());
+ assertEquals(29, result.size());
// normal user
result = accessService.getGrantedProjectsOfUserOrGroup("ANALYST", true);
diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
index 9a0ee62d7d..20e40e74e6 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java
@@ -1316,7 +1316,7 @@ public class AclTCRServiceTest extends NLocalFileMetadataTestCase {
Mockito.when(userService.isGlobalAdmin("ADMIN")).thenReturn(true);
List<SidPermissionWithAclResponse> responses = accessService.getUserOrGroupAclPermissions(projects, "ADMIN",
true);
- Assert.assertEquals(28, responses.size());
+ Assert.assertEquals(29, responses.size());
Assert.assertTrue(responses.stream().allMatch(response -> "ADMIN".equals(response.getProjectPermission())));
// test normal group
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f20dca320b..32af04760a 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -980,6 +980,10 @@ public abstract class KylinConfigBase implements Serializable {
return !getBuildConf().isEmpty() && !getWritingClusterWorkingDir().isEmpty();
}
+ public String getWriteClusterWorkingDir() {
+ return getOptional("kylin.env.write-hdfs-working-dir", "");
+ }
+
public String getWritingClusterWorkingDir() {
return getOptional(WRITING_CLUSTER_WORKING_DIR, "");
}
@@ -3800,6 +3804,14 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.source.ddl.logical-view-catchup-interval", "60"));
}
+ public boolean isDataCountCheckEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.build.data-count-check-enabled", FALSE));
+ }
+
+ public boolean isNonStrictCountCheckAllowed() {
+ return Boolean.parseBoolean(getOptional("kylin.build.allow-non-strict-count-check", FALSE));
+ }
+
// ============================================================================
// Cost based index Planner
// ============================================================================
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
index d74c7ca1d1..fb0b46826c 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
@@ -60,6 +60,7 @@ public class JsonUtil {
static {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.setConfig(mapper.getSerializationConfig().withView(PersistenceView.class));
mapper.setFilterProvider(simpleFilterProvider);
indentMapper.configure(SerializationFeature.INDENT_OUTPUT, true)
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/src/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
index 73f634bc85..59b0d14934 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
@@ -100,6 +100,12 @@ public enum JobStatusEnum {
public boolean checkAction(JobActionEnum actionEnum) {
return false;
}
+ },
+ WARNING(2048) {
+ @Override
+ public boolean checkAction(JobActionEnum actionEnum) {
+ return false;
+ }
};
public abstract boolean checkAction(JobActionEnum actionEnum);
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index bfe6f1fb6a..0f8de4e055 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -285,7 +285,9 @@ public abstract class AbstractExecutable implements Executable {
MetricsGroup.hostTagCounterInc(MetricsName.JOB_STEP_ATTEMPTED, MetricsCategory.PROJECT, project, retry);
if (result.succeed()) {
wrapWithCheckQuit(() -> {
- updateJobOutput(project, getId(), ExecutableState.SUCCEED, result.getExtraInfo(), result.output(),
+ ExecutableState state = adjustState(ExecutableState.SUCCEED);
+ logger.info("Job {} adjust future state from {} to {}", getId(), ExecutableState.SUCCEED.name(), state.name());
+ updateJobOutput(project, getId(), state, result.getExtraInfo(), result.output(),
null);
});
} else if (result.skip()) {
@@ -304,6 +306,10 @@ public abstract class AbstractExecutable implements Executable {
}
}
+ protected ExecutableState adjustState(ExecutableState originalState) {
+ return originalState;
+ }
+
protected void onExecuteStopHook() {
onExecuteErrorHook(getId());
}
@@ -337,7 +343,7 @@ public abstract class AbstractExecutable implements Executable {
//The output will be stored in HDFS,not in RS
if (this instanceof ChainedStageExecutable) {
- if (newStatus == ExecutableState.SUCCEED) {
+ if (newStatus.isNotBad()) {
executableManager.makeStageSuccess(jobId);
} else if (newStatus == ExecutableState.ERROR) {
executableManager.makeStageError(jobId);
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
index 5a4e92e58c..78cb903ad1 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
@@ -184,7 +184,7 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
private void executeStep(Executable executable, ExecutableContext context) throws ExecuteException {
if (executable.isRunnable()) {
executable.execute(context);
- } else if (ExecutableState.SUCCEED == executable.getStatus()) {
+ } else if (executable.getStatus().isNotBad()) {
logger.info("step {} is already succeed, skip it.", executable.getDisplayName());
} else {
throw new IllegalStateException("invalid subtask state, sub task:" + executable.getDisplayName()
@@ -289,27 +289,28 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
logger.info("Sub-task finished {}, state: {}", task.getDisplayName(), task.getStatus());
boolean taskSucceed = false;
switch (task.getStatus()) {
- case RUNNING:
- hasError = true;
- break;
- case ERROR:
- hasError = true;
- break;
- case DISCARDED:
- hasDiscarded = true;
- break;
- case SUICIDAL:
- hasSuicidal = true;
- break;
- case PAUSED:
- hasPaused = true;
- break;
- case SKIP:
- case SUCCEED:
- taskSucceed = true;
- break;
- default:
- break;
+ case RUNNING:
+ hasError = true;
+ break;
+ case ERROR:
+ hasError = true;
+ break;
+ case DISCARDED:
+ hasDiscarded = true;
+ break;
+ case SUICIDAL:
+ hasSuicidal = true;
+ break;
+ case PAUSED:
+ hasPaused = true;
+ break;
+ case SUCCEED:
+ case SKIP:
+ case WARNING:
+ taskSucceed = true;
+ break;
+ default:
+ break;
}
allSucceed &= taskSucceed;
}
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
index 9016ee8a40..40999107ce 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
@@ -33,7 +33,7 @@ import com.google.common.collect.Multimaps;
*/
public enum ExecutableState {
- READY, RUNNING, ERROR, PAUSED, DISCARDED, SUCCEED, SUICIDAL, SKIP;
+ READY, RUNNING, ERROR, PAUSED, DISCARDED, SUCCEED, SUICIDAL, SKIP, WARNING;
private static Multimap<ExecutableState, ExecutableState> VALID_STATE_TRANSFER;
@@ -60,6 +60,7 @@ public enum ExecutableState {
VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.SUICIDAL);
VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.PAUSED);
VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.SKIP);
+ VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.WARNING);
VALID_STATE_TRANSFER.put(ExecutableState.PAUSED, ExecutableState.DISCARDED);
VALID_STATE_TRANSFER.put(ExecutableState.PAUSED, ExecutableState.SUICIDAL);
@@ -94,6 +95,12 @@ public enum ExecutableState {
this == READY;//restart case
}
+ public boolean isNotBad() {
+ return this == SUCCEED
+ || this == SKIP
+ || this == WARNING;
+ }
+
public static boolean isValidStateTransfer(ExecutableState from, ExecutableState to) {
return VALID_STATE_TRANSFER.containsEntry(from, to);
}
@@ -115,6 +122,8 @@ public enum ExecutableState {
case SUICIDAL:
case DISCARDED:
return JobStatusEnum.DISCARDED;
+ case WARNING:
+ return JobStatusEnum.WARNING;
default:
throw new RuntimeException("invalid state:" + this);
}
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
index c9567f1b10..86c60b35dc 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
@@ -47,17 +47,18 @@ public final class ExecuteResult {
private ExecuteResult(State state, String output, Throwable throwable) {
Preconditions.checkArgument(state != null, "state cannot be null");
- if (state == State.SUCCEED) {
- Preconditions.checkNotNull(output);
- Preconditions.checkState(throwable == null);
- } else if (state == State.SKIP) {
- Preconditions.checkNotNull(output);
- Preconditions.checkState(throwable == null);
- } else if (state == State.ERROR) {
- Preconditions.checkNotNull(throwable);
- Preconditions.checkState(output == null);
- } else {
- throw new IllegalStateException();
+ switch (state) {
+ case SUCCEED:
+ case SKIP:
+ Preconditions.checkNotNull(output);
+ Preconditions.checkState(throwable == null);
+ break;
+ case ERROR:
+ Preconditions.checkNotNull(throwable);
+ Preconditions.checkState(output == null);
+ break;
+ default:
+ throw new IllegalStateException();
}
this.state = state;
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index b22dee724f..4f1b192f1d 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -1238,6 +1238,7 @@ public class NExecutableManager {
// DISCARDED must not be transferred to any others status
if ((oldStatus == ExecutableState.PAUSED && newStatus == ExecutableState.ERROR)
|| (oldStatus == ExecutableState.SKIP && newStatus == ExecutableState.SUCCEED)
+ || (oldStatus == ExecutableState.WARNING && newStatus == ExecutableState.SUCCEED)
|| oldStatus == ExecutableState.DISCARDED) {
return false;
}
@@ -1253,6 +1254,12 @@ public class NExecutableManager {
final int indexSuccessCount = Integer
.parseInt(map.getOrDefault(NBatchConstants.P_INDEX_SUCCESS_COUNT, "0"));
info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, String.valueOf(indexSuccessCount));
+
+ // Add warning_code to stage output info if exists
+ String warningCode;
+ if ((warningCode = map.get(NBatchConstants.P_WARNING_CODE)) != null) {
+ info.put(NBatchConstants.P_WARNING_CODE, warningCode);
+ }
});
jobOutput.setInfo(info);
jobOutput.setLastModified(System.currentTimeMillis());
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
index 4ba31b27c1..3f05c47032 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
@@ -162,6 +162,27 @@ public class NExecutableManagerTest extends NLocalFileMetadataTestCase {
assertJobEqual(job, anotherJob);
}
+ @Test
+ public void testExecutableStateCorrectness() {
+ assertTrue(ExecutableState.READY.isProgressing());
+ assertTrue(ExecutableState.RUNNING.isProgressing());
+
+ assertTrue(ExecutableState.SUCCEED.isFinalState());
+ assertTrue(ExecutableState.DISCARDED.isFinalState());
+ assertTrue(ExecutableState.SUICIDAL.isFinalState());
+
+ assertTrue(ExecutableState.ERROR.isNotProgressing());
+ assertTrue(ExecutableState.PAUSED.isNotProgressing());
+
+ assertTrue(ExecutableState.DISCARDED.isStoppedNonVoluntarily());
+ assertTrue(ExecutableState.PAUSED.isStoppedNonVoluntarily());
+ assertTrue(ExecutableState.READY.isStoppedNonVoluntarily());
+
+ assertTrue(ExecutableState.SUCCEED.isNotBad());
+ assertTrue(ExecutableState.SKIP.isNotBad());
+ assertTrue(ExecutableState.WARNING.isNotBad());
+ }
+
@Test
public void testValidStateTransfer() {
SucceedTestExecutable job = new SucceedTestExecutable();
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
index 754769280b..bcca549182 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
@@ -55,6 +55,7 @@ public interface NBatchConstants {
/** use for stage calculate exec ratio */
String P_INDEX_COUNT = "indexCount";
String P_INDEX_SUCCESS_COUNT = "indexSuccessCount";
+ String P_WARNING_CODE = "warning_code";
/** value like : { "segmentId1": 1223, "segmentId2": 1223 } */
String P_WAITE_TIME = "waiteTime";
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java
index d979ece2f8..08a99459e1 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -43,6 +44,10 @@ import lombok.val;
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class NDataLayout implements Serializable {
+ public enum AbnormalType {
+ DATA_INCONSISTENT
+ }
+
public static NDataLayout newDataLayout(NDataflow df, String segId, long layoutId) {
return newDataLayout(NDataSegDetails.newSegDetails(df, segId), layoutId);
}
@@ -54,6 +59,13 @@ public class NDataLayout implements Serializable {
return r;
}
+ public static boolean filterWorkingLayout(NDataLayout layout) {
+ if (layout == null) {
+ return false;
+ }
+ return Objects.isNull(layout.getAbnormalType());
+ }
+
// ============================================================================
/**
@@ -97,6 +109,11 @@ public class NDataLayout implements Serializable {
@JsonProperty("multi_partition")
private List<LayoutPartition> multiPartition = new ArrayList<>();
+ @Getter
+ @Setter
+ @JsonProperty("abnormal_type")
+ private AbnormalType abnormalType;
+
public NDataLayout() {
this.createTime = System.currentTimeMillis();
}
@@ -260,7 +277,7 @@ public class NDataLayout implements Serializable {
if (segDetails == null || !segDetails.isCachedAndShared())
return false;
- for (NDataLayout cached : segDetails.getLayouts()) {
+ for (NDataLayout cached : segDetails.getWorkingLayouts()) {
if (cached == this)
return true;
}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java
index 5607e918c5..e9866f3509 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.cube.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.RootPersistentEntity;
@@ -114,18 +115,40 @@ public class NDataSegDetails extends RootPersistentEntity implements Serializabl
public long getTotalRowCount() {
long count = 0L;
- for (NDataLayout cuboid : getLayouts()) {
+ for (NDataLayout cuboid : getWorkingLayouts()) {
count += cuboid.getRows();
}
return count;
}
+ /**
+ * @deprecated Deprecated because of non-working layouts were added.
+ * <p>Use {@link NDataSegDetails#getWorkingLayouts} or {@link NDataSegDetails#getAllLayouts} instead.
+ */
+ @Deprecated
public List<NDataLayout> getLayouts() {
- return isCachedAndShared() ? ImmutableList.copyOf(layouts) : layouts;
+ return getAllLayouts();
+ }
+
+ public List<NDataLayout> getWorkingLayouts() {
+ List<NDataLayout> workingLayouts = getLayouts0(false);
+ return isCachedAndShared() ? ImmutableList.copyOf(workingLayouts) : workingLayouts;
+ }
+
+ public List<NDataLayout> getAllLayouts() {
+ List<NDataLayout> allLayouts = getLayouts0(true);
+ return isCachedAndShared() ? ImmutableList.copyOf(allLayouts) : allLayouts;
+ }
+
+ private List<NDataLayout> getLayouts0(boolean includingNonWorkingLayouts) {
+ if (includingNonWorkingLayouts) {
+ return layouts;
+ }
+ return layouts.stream().filter(NDataLayout::filterWorkingLayout).collect(Collectors.toList());
}
public NDataLayout getLayoutById(long layoutId) {
- for (NDataLayout cuboid : getLayouts()) {
+ for (NDataLayout cuboid : getAllLayouts()) {
if (cuboid.getLayoutId() == layoutId)
return cuboid;
}
@@ -156,8 +179,8 @@ public class NDataSegDetails extends RootPersistentEntity implements Serializabl
if (another == this)
return false;
- List<NDataLayout> currentSortedLayouts = getSortedLayouts(getLayouts());
- List<NDataLayout> anotherSortedLayouts = getSortedLayouts(another.getLayouts());
+ List<NDataLayout> currentSortedLayouts = getSortedLayouts(getAllLayouts());
+ List<NDataLayout> anotherSortedLayouts = getSortedLayouts(another.getAllLayouts());
int size = currentSortedLayouts.size();
if (size != anotherSortedLayouts.size())
return false;
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
index ddb81e407a..00c2d105a7 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
@@ -275,7 +275,20 @@ public class NDataSegment implements ISegment, Serializable {
return getLayoutInfo().getLayoutSize();
}
+ @Override
+ public int getWorkingLayoutSize() {
+ return (int) getLayoutInfo().getLayoutsMap().values().stream()
+ .filter(NDataLayout::filterWorkingLayout).count();
+ }
+
public NDataLayout getLayout(long layoutId) {
+ return getLayout(layoutId, false);
+ }
+
+ public NDataLayout getLayout(long layoutId, boolean includingNonWorkingLayout) {
+ if (includingNonWorkingLayout) {
+ return getAllLayoutsMap().get(layoutId);
+ }
return getLayoutsMap().get(layoutId);
}
@@ -283,6 +296,10 @@ public class NDataSegment implements ISegment, Serializable {
return getLayoutInfo().getLayoutsMap();
}
+ public Map<Long, NDataLayout> getAllLayoutsMap() {
+ return getLayoutInfo().getAllLayoutsMap();
+ }
+
public Set<Long> getLayoutIds() {
return getLayoutInfo().getLayoutIds();
}
@@ -314,7 +331,8 @@ public class NDataSegment implements ISegment, Serializable {
// not required by spark cubing
private NDataSegDetails segDetails;
// not required by spark cubing
- private Map<Long, NDataLayout> layoutsMap = Collections.emptyMap();
+ private Map<Long, NDataLayout> allLayoutsMap = Collections.emptyMap();
+ private Map<Long, NDataLayout> workingLayoutsMap = Collections.emptyMap();
/**
* for each layout, partition id -> bucket id
*/
@@ -329,7 +347,10 @@ public class NDataSegment implements ISegment, Serializable {
}
public LayoutInfo(Map<Long, NDataLayout> layoutsMap) {
- this.layoutsMap = layoutsMap;
+ this.allLayoutsMap = layoutsMap;
+ this.workingLayoutsMap = layoutsMap.entrySet().stream()
+ .filter(entry -> NDataLayout.filterWorkingLayout(entry.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private LayoutInfo(boolean loadDetail) {
@@ -344,37 +365,45 @@ public class NDataSegment implements ISegment, Serializable {
IndexPlan indexPlan = dataflow.getIndexPlan();
if (!indexPlan.isBroken()) {
- List<NDataLayout> filteredCuboids = segDetails.getLayouts().stream()
+ List<NDataLayout> filteredCuboids = segDetails.getAllLayouts().stream()
.filter(dataLayout -> dataLayout.getLayout() != null).collect(Collectors.toList());
segDetails.setLayouts(filteredCuboids);
}
segDetails.setCachedAndShared(dataflow.isCachedAndShared());
- List<NDataLayout> cuboids = segDetails.getLayouts();
- layoutsMap = new HashMap<>(cuboids.size());
- for (NDataLayout cuboid : cuboids) {
- layoutsMap.put(cuboid.getLayoutId(), cuboid);
- Map<Long, Long> cuboidBucketMap = Maps.newHashMap();
- cuboid.getMultiPartition().forEach(dataPartition -> cuboidBucketMap.put(dataPartition.getPartitionId(),
- dataPartition.getBucketId()));
- partitionBucketMap.put(cuboid.getLayoutId(), cuboidBucketMap);
+ List<NDataLayout> allLayouts = segDetails.getAllLayouts();
+ allLayoutsMap = Maps.newHashMap();
+ workingLayoutsMap = Maps.newHashMap();
+ for (NDataLayout layout : allLayouts) {
+ allLayoutsMap.put(layout.getLayoutId(), layout);
+ if (NDataLayout.filterWorkingLayout(layout)) {
+ workingLayoutsMap.put(layout.getLayoutId(), layout);
+ Map<Long, Long> cuboidBucketMap = Maps.newHashMap();
+ layout.getMultiPartition().forEach(dataPartition -> cuboidBucketMap.put(dataPartition.getPartitionId(),
+ dataPartition.getBucketId()));
+ partitionBucketMap.put(layout.getLayoutId(), cuboidBucketMap);
+ }
}
}
public int getLayoutSize() {
- return layoutsMap.size();
+ return workingLayoutsMap.size();
}
public NDataLayout getLayout(long layoutId) {
- return layoutsMap.get(layoutId);
+ return workingLayoutsMap.get(layoutId);
}
public Map<Long, NDataLayout> getLayoutsMap() {
- return layoutsMap;
+ return workingLayoutsMap;
+ }
+
+ public Map<Long, NDataLayout> getAllLayoutsMap() {
+ return allLayoutsMap;
}
public Set<Long> getLayoutIds() {
- return layoutsMap.keySet();
+ return workingLayoutsMap.keySet();
}
public List<Long> getMultiPartitionIds() {
@@ -390,8 +419,8 @@ public class NDataSegment implements ISegment, Serializable {
}
public boolean isAlreadyBuilt(long layoutId) {
- if (Objects.nonNull(layoutsMap) && layoutsMap.containsKey(layoutId)) {
- return layoutsMap.get(layoutId).isReady();
+ if (Objects.nonNull(workingLayoutsMap) && workingLayoutsMap.containsKey(layoutId)) {
+ return workingLayoutsMap.get(layoutId).isReady();
}
return false;
}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
index f22dd1838d..875201fe65 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
@@ -730,7 +730,7 @@ public class NDataflowManager implements IRealizationProvider {
private void updateSegmentStatus(NDataSegment seg) {
NDataSegDetails segDetails = NDataSegDetailsManager.getInstance(seg.getConfig(), project).getForSegment(seg);
- if (seg.getStatus() == SegmentStatusEnum.WARNING && segDetails != null && segDetails.getLayouts().isEmpty()) {
+ if (seg.getStatus() == SegmentStatusEnum.WARNING && segDetails != null && segDetails.getAllLayouts().isEmpty()) {
seg.setStatus(SegmentStatusEnum.READY);
}
}
@@ -830,7 +830,7 @@ public class NDataflowManager implements IRealizationProvider {
}
val affectedLayouts = Lists.newArrayList();
for (NDataSegment segment : updateSegments) {
- val layouts = segment.getSegDetails().getLayouts();
+ val layouts = segment.getSegDetails().getAllLayouts();
layouts.forEach(dataLayout -> {
if (dataLayout.removeMultiPartition(toBeDeletedPartIds)) {
affectedLayouts.add(dataLayout);
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java
index dfe2cd16f9..0b4b8ca873 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java
@@ -142,7 +142,7 @@ public class SegmentPartition implements Serializable {
return 0;
}
storageSize = dataSegment.getSegDetails() //
- .getLayouts().stream() //
+ .getWorkingLayouts().stream() //
.flatMap(layout -> layout.getMultiPartition().stream()) //
.filter(partition -> partition.getPartitionId() == partitionId) //
.mapToLong(LayoutPartition::getByteSize).sum();
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
index 46f72933ef..146eead0f4 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
@@ -43,6 +43,8 @@ public interface ISegment extends Comparable<ISegment> {
public int getLayoutSize();
+ public int getWorkingLayoutSize();
+
public NDataModel getModel();
public SegmentStatusEnum getStatus();
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
index 0d2e6cbaf3..f3ed44b83f 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java
@@ -68,7 +68,7 @@ public class NProjectManagerTest extends NLocalFileMetadataTestCase {
}
val projects = projectManager.listAllProjects();
- Assert.assertEquals(28, projects.size());
+ Assert.assertEquals(29, projects.size());
Assert.assertTrue(projects.stream().noneMatch(p -> p.getName().equals("test")));
}
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java
index ce18241853..28a8c5fada 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java
@@ -90,7 +90,7 @@ public class HdfsCapacityMetricsTest extends NLocalFileMetadataTestCase {
}
Assert.assertTrue(hdfsCapacityMetrics.getWorkingDirCapacity().isEmpty());
hdfsCapacityMetrics.writeHdfsMetrics();
- Assert.assertEquals(28, hdfsCapacityMetrics.getWorkingDirCapacity().size());
+ Assert.assertEquals(29, hdfsCapacityMetrics.getWorkingDirCapacity().size());
}
@@ -139,4 +139,4 @@ public class HdfsCapacityMetricsTest extends NLocalFileMetadataTestCase {
HdfsCapacityMetrics hdfsCapacityMetrics = new HdfsCapacityMetrics(getTestConfig());
Assert.assertEquals(0L, (long) hdfsCapacityMetrics.getHdfsCapacityByProject("kylin"));
}
-}
\ No newline at end of file
+}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
index c974b4caeb..36719e6abb 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
@@ -249,7 +249,8 @@ public class ExecutableResponse implements Comparable<ExecutableResponse> {
continue;
}
}
- if (ExecutableState.SUCCEED == task.getStatus() || ExecutableState.SKIP == task.getStatus()) {
+
+ if (task.getStatus().isNotBad()) {
successSteps++;
}
}
@@ -279,7 +280,8 @@ public class ExecutableResponse implements Comparable<ExecutableResponse> {
var successStages = 0D;
for (StageBase stage : stageBases) {
if (ExecutableState.SUCCEED == stage.getStatus(segmentId)
- || stage.getStatus(segmentId) == ExecutableState.SKIP) {
+ || ExecutableState.SKIP == stage.getStatus(segmentId)
+ || ExecutableState.WARNING == stage.getStatus(segmentId)) {
successStages += 1;
continue;
}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableStepResponse.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableStepResponse.java
index edd980320f..c0c9f1282b 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableStepResponse.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableStepResponse.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Maps;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
@@ -152,5 +153,7 @@ public class ExecutableStepResponse {
private long execEndTime;
@JsonProperty("stage")
private List<ExecutableStepResponse> stage;
+ @JsonProperty("info")
+ private Map<String, String> info = Maps.newHashMap();
}
}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index eb668435fa..dad56e01d3 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -829,6 +829,12 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl
val stepCount = stageResponses.isEmpty() ? 1 : stageResponses.size();
val stepRatio = (float) ExecutableResponse.calculateSuccessStage(task, segmentId, stageBases, true) / stepCount;
segmentSubStages.setStepRatio(stepRatio);
+
+ // Put warning message into segment_sub_stages.info if exists
+ Optional<ExecutableStepResponse> warningStageRes = stageResponses.stream().filter(stageRes ->
+ stageRes.getStatus() == JobStatusEnum.WARNING).findFirst();
+ warningStageRes.ifPresent(res -> segmentSubStages.getInfo().put(NBatchConstants.P_WARNING_CODE,
+ res.getInfo().getOrDefault(NBatchConstants.P_WARNING_CODE, null)));
}
private void setStage(List<ExecutableStepResponse> responses, ExecutableStepResponse newResponse) {
@@ -844,7 +850,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl
*/
Set<JobStatusEnum> jobStatusEnums = Sets.newHashSet(JobStatusEnum.ERROR, JobStatusEnum.STOPPED,
JobStatusEnum.DISCARDED);
- Set<JobStatusEnum> jobFinishOrSkip = Sets.newHashSet(JobStatusEnum.FINISHED, JobStatusEnum.SKIP);
+ Set<JobStatusEnum> jobFinishOrSkip = Sets.newHashSet(JobStatusEnum.FINISHED, JobStatusEnum.SKIP, JobStatusEnum.WARNING);
if (oldResponse.getStatus() != newResponse.getStatus()
&& !jobStatusEnums.contains(oldResponse.getStatus())) {
if (jobStatusEnums.contains(newResponse.getStatus())) {
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
index b69c00eb9b..e565a194fd 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
@@ -549,6 +549,9 @@ public class StageTest extends NLocalFileMetadataTestCase {
result = ExecutableState.DISCARDED.toJobStatus();
Assert.assertEquals(JobStatusEnum.DISCARDED, result);
+
+ result = ExecutableState.WARNING.toJobStatus();
+ Assert.assertEquals(JobStatusEnum.WARNING, result);
}
@Test
diff --git a/src/examples/test_case_data/localmeta/metadata/_global/project/index_build_test.json b/src/examples/test_case_data/localmeta/metadata/_global/project/index_build_test.json
new file mode 100644
index 0000000000..7c1aadeca6
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/_global/project/index_build_test.json
@@ -0,0 +1,40 @@
+{
+ "uuid": "63b21fc0-1bbd-3570-b2ac-54fbb0fb8181",
+ "last_modified": 1675944232432,
+ "create_time": 1675944228905,
+ "version": "4.0.0.0",
+ "name": "index_build_test",
+ "owner": "ADMIN",
+ "status": "ENABLED",
+ "create_time_utc": 1675944228905,
+ "default_database": "DEFAULT",
+ "description": null,
+ "principal": null,
+ "keytab": null,
+ "maintain_model_type": "MANUAL_MAINTAIN",
+ "override_kylin_properties": {
+ "kylin.metadata.semi-automatic-mode": "true",
+ "kylin.query.metadata.expose-computed-column": "true",
+ "kylin.source.default": "9"
+ },
+ "segment_config": {
+ "auto_merge_enabled": false,
+ "auto_merge_time_ranges": [
+ "WEEK",
+ "MONTH",
+ "QUARTER",
+ "YEAR"
+ ],
+ "volatile_range": {
+ "volatile_range_number": 0,
+ "volatile_range_enabled": false,
+ "volatile_range_type": "DAY"
+ },
+ "retention_range": {
+ "retention_range_number": 1,
+ "retention_range_enabled": false,
+ "retention_range_type": "MONTH"
+ },
+ "create_empty_segment_enabled": false
+ }
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow/3ec47efc-573a-9304-4405-8e05ae184322.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow/3ec47efc-573a-9304-4405-8e05ae184322.json
new file mode 100644
index 0000000000..d020817c44
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow/3ec47efc-573a-9304-4405-8e05ae184322.json
@@ -0,0 +1,70 @@
+{
+ "uuid": "3ec47efc-573a-9304-4405-8e05ae184322",
+ "last_modified": 1675944717956,
+ "create_time": 1675944415945,
+ "version": "4.0.0.0",
+ "status": "ONLINE",
+ "last_status": null,
+ "cost": 50,
+ "query_hit_count": 0,
+ "last_query_time": 0,
+ "layout_query_hit_count": {},
+ "segments": [
+ {
+ "id": "b2f206e1-7a15-c94a-20f5-f608d550ead6",
+ "name": "FULL_BUILD",
+ "create_time_utc": 1675944504890,
+ "status": "READY",
+ "segRange": {
+ "@class": "org.apache.kylin.metadata.model.SegmentRange$TimePartitionedSegmentRange",
+ "date_range_start": 0,
+ "date_range_end": 9223372036854775807
+ },
+ "timeRange": null,
+ "dimension_range_info_map": {
+ "0": {
+ "min": "1",
+ "max": "6"
+ },
+ "24": {
+ "min": "1",
+ "max": "5"
+ },
+ "13": {
+ "min": "5",
+ "max": "15"
+ },
+ "22": {
+ "min": "Customer#000000001",
+ "max": "Customer#000000005"
+ },
+ "16": {
+ "min": "1",
+ "max": "5"
+ }
+ },
+ "parameters": null,
+ "dictionaries": null,
+ "snapshots": null,
+ "last_build_time": 1675944668815,
+ "source_count": 6,
+ "source_bytes_size": 27732,
+ "column_source_bytes": {
+ "SSB.CUSTOMER.C_NAME": 108,
+ "SSB.LINEORDER.LO_QUANTITY": 9,
+ "SSB.LINEORDER.LO_ORDERKEY": 6,
+ "SSB.LINEORDER.LO_CUSTKEY": 6,
+ "SSB.CUSTOMER.C_CUSTKEY": 6
+ },
+ "ori_snapshot_size": {},
+ "additionalInfo": {},
+ "is_realtime_segment": false,
+ "is_snapshot_ready": false,
+ "is_dict_ready": true,
+ "is_flat_table_ready": true,
+ "is_fact_view_ready": false,
+ "multi_partitions": [],
+ "max_bucket_id": -1
+ }
+ ]
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow_details/3ec47efc-573a-9304-4405-8e05ae184322/b2f206e1-7a15-c94a-20f5-f608d550ead6.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow_details/3ec47efc-573a-9304-4405-8e05ae184322/b2f206e1-7a15-c94a-20f5-f608d550ead6.json
new file mode 100644
index 0000000000..b7512dc9f2
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/dataflow_details/3ec47efc-573a-9304-4405-8e05ae184322/b2f206e1-7a15-c94a-20f5-f608d550ead6.json
@@ -0,0 +1,69 @@
+{
+ "uuid": "b2f206e1-7a15-c94a-20f5-f608d550ead6",
+ "last_modified": 1675944504890,
+ "create_time": 1675944504890,
+ "version": "4.0.0.0",
+ "dataflow": "3ec47efc-573a-9304-4405-8e05ae184322",
+ "layout_instances": [
+ {
+ "layout_id": 1,
+ "build_job_id": "944d7367-29e5-a5bd-373c-f01b34c078d2-3ec47efc-573a-9304-4405-8e05ae184322",
+ "rows": 6,
+ "byte_size": 2021,
+ "file_count": 1,
+ "source_rows": 6,
+ "source_byte_size": 0,
+ "partition_num": 1,
+ "partition_values": [],
+ "is_ready": false,
+ "create_time": 1675944665659,
+ "multi_partition": [],
+ "abnormal_type": null
+ },
+ {
+ "layout_id": 10001,
+ "build_job_id": "944d7367-29e5-a5bd-373c-f01b34c078d2-3ec47efc-573a-9304-4405-8e05ae184322",
+ "rows": 6,
+ "byte_size": 2021,
+ "file_count": 1,
+ "source_rows": 6,
+ "source_byte_size": 0,
+ "partition_num": 1,
+ "partition_values": [],
+ "is_ready": false,
+ "create_time": 1675944665659,
+ "multi_partition": [],
+ "abnormal_type": null
+ },
+ {
+ "layout_id": 20000000001,
+ "build_job_id": "944d7367-29e5-a5bd-373c-f01b34c078d2-3ec47efc-573a-9304-4405-8e05ae184322",
+ "rows": 6,
+ "byte_size": 1739,
+ "file_count": 1,
+ "source_rows": 6,
+ "source_byte_size": 0,
+ "partition_num": 1,
+ "partition_values": [],
+ "is_ready": false,
+ "create_time": 1675944665659,
+ "multi_partition": [],
+ "abnormal_type": null
+ },
+ {
+ "layout_id": 20000010001,
+ "build_job_id": "944d7367-29e5-a5bd-373c-f01b34c078d2-3ec47efc-573a-9304-4405-8e05ae184322",
+ "rows": 6,
+ "byte_size": 1739,
+ "file_count": 1,
+ "source_rows": 6,
+ "source_byte_size": 0,
+ "partition_num": 1,
+ "partition_values": [],
+ "is_ready": false,
+ "create_time": 1675944665659,
+ "multi_partition": [],
+ "abnormal_type": null
+ }
+ ]
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/index_plan/3ec47efc-573a-9304-4405-8e05ae184322.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/index_plan/3ec47efc-573a-9304-4405-8e05ae184322.json
new file mode 100644
index 0000000000..f53bfcb30c
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/index_plan/3ec47efc-573a-9304-4405-8e05ae184322.json
@@ -0,0 +1,204 @@
+{
+ "uuid": "3ec47efc-573a-9304-4405-8e05ae184322",
+ "last_modified": 1675945219064,
+ "create_time": 1675944415943,
+ "version": "4.0.0.0",
+ "description": null,
+ "rule_based_index": null,
+ "indexes": [
+ {
+ "id": 10000,
+ "dimensions": [
+ 24,
+ 22
+ ],
+ "measures": [
+ 100000,
+ 100001
+ ],
+ "layouts": [
+ {
+ "id": 10001,
+ "name": null,
+ "owner": null,
+ "col_order": [
+ 24,
+ 22,
+ 100000,
+ 100001
+ ],
+ "shard_by_columns": [],
+ "partition_by_columns": [],
+ "sort_by_columns": [],
+ "storage_type": 20,
+ "update_time": 1675945215262,
+ "manual": false,
+ "auto": true,
+ "base": false,
+ "draft_version": null,
+ "index_range": null
+ }
+ ],
+ "next_layout_offset": 2
+ },
+ {
+ "id": 20000000000,
+ "dimensions": [
+ 0,
+ 13,
+ 16,
+ 24,
+ 22
+ ],
+ "measures": [],
+ "layouts": [
+ {
+ "id": 20000000001,
+ "name": null,
+ "owner": null,
+ "col_order": [
+ 0,
+ 13,
+ 16,
+ 24,
+ 22
+ ],
+ "shard_by_columns": [],
+ "partition_by_columns": [],
+ "sort_by_columns": [],
+ "storage_type": 20,
+ "update_time": 1675944415943,
+ "manual": false,
+ "auto": false,
+ "base": true,
+ "draft_version": null,
+ "index_range": null
+ }
+ ],
+ "next_layout_offset": 2
+ },
+ {
+ "id": 20000010000,
+ "dimensions": [
+ 0,
+ 13
+ ],
+ "measures": [],
+ "layouts": [
+ {
+ "id": 20000010001,
+ "name": null,
+ "owner": null,
+ "col_order": [
+ 0,
+ 13
+ ],
+ "shard_by_columns": [],
+ "partition_by_columns": [],
+ "sort_by_columns": [],
+ "storage_type": 20,
+ "update_time": 1675944415943,
+ "manual": false,
+ "auto": false,
+ "base": true,
+ "draft_version": null,
+ "index_range": null
+ }
+ ],
+ "next_layout_offset": 2
+ },
+ {
+ "id": 20000,
+ "dimensions": [
+ 0,
+ 13,
+ 16,
+ 24,
+ 22
+ ],
+ "measures": [
+ 100000,
+ 100001
+ ],
+ "layouts": [
+ {
+ "id": 20001,
+ "name": null,
+ "owner": "ADMIN",
+ "col_order": [
+ 0,
+ 13,
+ 16,
+ 24,
+ 22,
+ 100000,
+ 100001
+ ],
+ "shard_by_columns": [],
+ "partition_by_columns": [],
+ "sort_by_columns": [],
+ "storage_type": 20,
+ "update_time": 1675945219064,
+ "manual": false,
+ "auto": false,
+ "base": true,
+ "draft_version": null,
+ "index_range": null
+ }
+ ],
+ "next_layout_offset": 2
+ }
+ ],
+ "override_properties": {},
+ "to_be_deleted_indexes": [
+ {
+ "id": 0,
+ "dimensions": [
+ 0,
+ 13,
+ 16,
+ 24,
+ 22
+ ],
+ "measures": [
+ 100000
+ ],
+ "layouts": [
+ {
+ "id": 1,
+ "name": null,
+ "owner": null,
+ "col_order": [
+ 0,
+ 13,
+ 16,
+ 24,
+ 22,
+ 100000
+ ],
+ "shard_by_columns": [],
+ "partition_by_columns": [],
+ "sort_by_columns": [],
+ "storage_type": 20,
+ "update_time": 1675944415943,
+ "manual": false,
+ "auto": false,
+ "base": true,
+ "draft_version": null,
+ "index_range": null
+ }
+ ],
+ "next_layout_offset": 2
+ }
+ ],
+ "auto_merge_time_ranges": null,
+ "retention_range": 0,
+ "engine_type": 80,
+ "next_aggregation_index_id": 30000,
+ "next_table_index_id": 20000020000,
+ "agg_shard_by_columns": [],
+ "extend_partition_columns": [],
+ "layout_bucket_num": {},
+ "approved_additional_recs": 1,
+ "approved_removal_recs": 0
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/model_desc/3ec47efc-573a-9304-4405-8e05ae184322.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/model_desc/3ec47efc-573a-9304-4405-8e05ae184322.json
new file mode 100644
index 0000000000..f75ddbffb2
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/model_desc/3ec47efc-573a-9304-4405-8e05ae184322.json
@@ -0,0 +1,241 @@
+{
+ "uuid": "3ec47efc-573a-9304-4405-8e05ae184322",
+ "last_modified": 1675999084906,
+ "create_time": 1675999084809,
+ "version": "4.0.0.0",
+ "alias": "INDEX_BUILD_ON_SEGMENT",
+ "owner": "ADMIN",
+ "config_last_modifier": null,
+ "config_last_modified": 0,
+ "description": "",
+ "fact_table": "SSB.LINEORDER",
+ "fact_table_alias": null,
+ "management_type": "MODEL_BASED",
+ "join_tables": [
+ {
+ "table": "SSB.CUSTOMER",
+ "kind": "LOOKUP",
+ "alias": "CUSTOMER",
+ "join": {
+ "type": "INNER",
+ "primary_key": [
+ "CUSTOMER.C_CUSTKEY"
+ ],
+ "foreign_key": [
+ "LINEORDER.LO_CUSTKEY"
+ ],
+ "non_equi_join_condition": null,
+ "primary_table": null,
+ "foreign_table": null
+ },
+ "flattenable": "flatten",
+ "join_relation_type": "MANY_TO_ONE"
+ }
+ ],
+ "filter_condition": "",
+ "partition_desc": null,
+ "capacity": "MEDIUM",
+ "segment_config": {
+ "auto_merge_enabled": null,
+ "auto_merge_time_ranges": null,
+ "volatile_range": null,
+ "retention_range": null,
+ "create_empty_segment_enabled": false
+ },
+ "data_check_desc": null,
+ "semantic_version": 0,
+ "storage_type": 0,
+ "model_type": "BATCH",
+ "all_named_columns": [
+ {
+ "id": 0,
+ "name": "LO_ORDERKEY",
+ "column": "LINEORDER.LO_ORDERKEY",
+ "status": "DIMENSION"
+ },
+ {
+ "id": 1,
+ "name": "LO_PARTKEY",
+ "column": "LINEORDER.LO_PARTKEY"
+ },
+ {
+ "id": 2,
+ "name": "LO_DISCOUNT",
+ "column": "LINEORDER.LO_DISCOUNT"
+ },
+ {
+ "id": 3,
+ "name": "LO_SUPPLYCOST",
+ "column": "LINEORDER.LO_SUPPLYCOST"
+ },
+ {
+ "id": 4,
+ "name": "LO_COMMITDATE",
+ "column": "LINEORDER.LO_COMMITDATE"
+ },
+ {
+ "id": 5,
+ "name": "LO_EXTENDEDPRICE",
+ "column": "LINEORDER.LO_EXTENDEDPRICE"
+ },
+ {
+ "id": 6,
+ "name": "LO_TAX",
+ "column": "LINEORDER.LO_TAX"
+ },
+ {
+ "id": 7,
+ "name": "LO_SUPPKEY",
+ "column": "LINEORDER.LO_SUPPKEY"
+ },
+ {
+ "id": 8,
+ "name": "LO_ORDTOTALPRICE",
+ "column": "LINEORDER.LO_ORDTOTALPRICE"
+ },
+ {
+ "id": 9,
+ "name": "LO_REVENUE",
+ "column": "LINEORDER.LO_REVENUE"
+ },
+ {
+ "id": 10,
+ "name": "LO_ORDERDATE",
+ "column": "LINEORDER.LO_ORDERDATE"
+ },
+ {
+ "id": 11,
+ "name": "LO_ORDERPRIOTITY",
+ "column": "LINEORDER.LO_ORDERPRIOTITY"
+ },
+ {
+ "id": 12,
+ "name": "LO_SHIPPRIOTITY",
+ "column": "LINEORDER.LO_SHIPPRIOTITY"
+ },
+ {
+ "id": 13,
+ "name": "LO_QUANTITY",
+ "column": "LINEORDER.LO_QUANTITY",
+ "status": "DIMENSION"
+ },
+ {
+ "id": 14,
+ "name": "LO_SHIPMODE",
+ "column": "LINEORDER.LO_SHIPMODE"
+ },
+ {
+ "id": 15,
+ "name": "LO_LINENUMBER",
+ "column": "LINEORDER.LO_LINENUMBER"
+ },
+ {
+ "id": 16,
+ "name": "LO_CUSTKEY",
+ "column": "LINEORDER.LO_CUSTKEY",
+ "status": "DIMENSION"
+ },
+ {
+ "id": 17,
+ "name": "C_ADDRESS",
+ "column": "CUSTOMER.C_ADDRESS"
+ },
+ {
+ "id": 18,
+ "name": "C_NATION",
+ "column": "CUSTOMER.C_NATION"
+ },
+ {
+ "id": 19,
+ "name": "C_CITY",
+ "column": "CUSTOMER.C_CITY"
+ },
+ {
+ "id": 20,
+ "name": "C_PHONE",
+ "column": "CUSTOMER.C_PHONE"
+ },
+ {
+ "id": 21,
+ "name": "C_REGION",
+ "column": "CUSTOMER.C_REGION"
+ },
+ {
+ "id": 22,
+ "name": "C_NAME",
+ "column": "CUSTOMER.C_NAME",
+ "status": "DIMENSION"
+ },
+ {
+ "id": 23,
+ "name": "C_MKTSEGMENT",
+ "column": "CUSTOMER.C_MKTSEGMENT"
+ },
+ {
+ "id": 24,
+ "name": "C_CUSTKEY",
+ "column": "CUSTOMER.C_CUSTKEY",
+ "status": "DIMENSION"
+ }
+ ],
+ "all_measures": [
+ {
+ "name": "COUNT_ALL",
+ "function": {
+ "expression": "COUNT",
+ "parameters": [
+ {
+ "type": "constant",
+ "value": "1"
+ }
+ ],
+ "returntype": "bigint"
+ },
+ "column": null,
+ "comment": null,
+ "id": 100000,
+ "type": "NORMAL",
+ "internal_ids": []
+ },
+ {
+ "name": "SUM_LINEORDER_LO_QUANTITY",
+ "function": {
+ "expression": "SUM",
+ "parameters": [
+ {
+ "type": "column",
+ "value": "LINEORDER.LO_QUANTITY"
+ }
+ ],
+ "returntype": "bigint"
+ },
+ "column": null,
+ "comment": null,
+ "id": 100001,
+ "type": "NORMAL",
+ "internal_ids": []
+ }
+ ],
+ "recommendations_count": 0,
+ "computed_columns": [],
+ "canvas": {
+ "coordinate": {
+ "LINEORDER": {
+ "x": 752.8888617621528,
+ "y": 126.22219509548611,
+ "width": 200.0,
+ "height": 230.0
+ },
+ "CUSTOMER": {
+ "x": 745.111083984375,
+ "y": 468.44441731770837,
+ "width": 200.0,
+ "height": 230.0
+ }
+ },
+ "zoom": 9.0
+ },
+ "multi_partition_desc": null,
+ "multi_partition_key_mapping": null,
+ "fusion_id": null
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.CUSTOMER.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.CUSTOMER.json
new file mode 100644
index 0000000000..c84432550e
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.CUSTOMER.json
@@ -0,0 +1,77 @@
+{
+ "uuid": "970b1e03-75fd-6152-ce22-7c6d19bb1760",
+ "last_modified": 0,
+ "create_time": 1675998714528,
+ "version": "4.0.0.0",
+ "name": "CUSTOMER",
+ "columns": [
+ {
+ "id": "1",
+ "name": "C_CUSTKEY",
+ "datatype": "bigint",
+ "case_sensitive_name": "c_custkey"
+ },
+ {
+ "id": "2",
+ "name": "C_NAME",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "c_name"
+ },
+ {
+ "id": "3",
+ "name": "C_ADDRESS",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "c_address"
+ },
+ {
+ "id": "4",
+ "name": "C_CITY",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "c_city"
+ },
+ {
+ "id": "5",
+ "name": "C_NATION",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "c_nation"
+ },
+ {
+ "id": "6",
+ "name": "C_REGION",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "c_region"
+ },
+ {
+ "id": "7",
+ "name": "C_PHONE",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "c_phone"
+ },
+ {
+ "id": "8",
+ "name": "C_MKTSEGMENT",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "c_mktsegment"
+ }
+ ],
+ "source_type": 9,
+ "table_type": "EXTERNAL",
+ "top": false,
+ "increment_loading": false,
+ "last_snapshot_path": null,
+ "last_snapshot_size": 0,
+ "snapshot_last_modified": 0,
+ "query_hit_count": 0,
+ "partition_column": null,
+ "snapshot_partitions": {},
+ "snapshot_partitions_info": {},
+ "snapshot_total_rows": 0,
+ "snapshot_partition_col": null,
+ "selected_snapshot_partition_col": null,
+ "temp_snapshot_path": null,
+ "snapshot_has_broken": false,
+ "database": "SSB",
+ "transactional": false,
+ "rangePartition": false,
+ "partition_desc": null
+}
diff --git a/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.LINEORDER.json b/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.LINEORDER.json
new file mode 100644
index 0000000000..08bda05859
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/metadata/index_build_test/table/SSB.LINEORDER.json
@@ -0,0 +1,131 @@
+{
+ "uuid": "33529927-8bc2-d0fb-33a3-562db986be6b",
+ "last_modified": 0,
+ "create_time": 1675998714544,
+ "version": "4.0.0.0",
+ "name": "LINEORDER",
+ "columns": [
+ {
+ "id": "1",
+ "name": "LO_ORDERKEY",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_orderkey"
+ },
+ {
+ "id": "2",
+ "name": "LO_LINENUMBER",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_linenumber"
+ },
+ {
+ "id": "3",
+ "name": "LO_CUSTKEY",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_custkey"
+ },
+ {
+ "id": "4",
+ "name": "LO_PARTKEY",
+ "datatype": "integer",
+ "case_sensitive_name": "lo_partkey"
+ },
+ {
+ "id": "5",
+ "name": "LO_SUPPKEY",
+ "datatype": "integer",
+ "case_sensitive_name": "lo_suppkey"
+ },
+ {
+ "id": "6",
+ "name": "LO_ORDERDATE",
+ "datatype": "date",
+ "case_sensitive_name": "lo_orderdate"
+ },
+ {
+ "id": "7",
+ "name": "LO_ORDERPRIOTITY",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "lo_orderpriotity"
+ },
+ {
+ "id": "8",
+ "name": "LO_SHIPPRIOTITY",
+ "datatype": "integer",
+ "case_sensitive_name": "lo_shippriotity"
+ },
+ {
+ "id": "9",
+ "name": "LO_QUANTITY",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_quantity"
+ },
+ {
+ "id": "10",
+ "name": "LO_EXTENDEDPRICE",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_extendedprice"
+ },
+ {
+ "id": "11",
+ "name": "LO_ORDTOTALPRICE",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_ordtotalprice"
+ },
+ {
+ "id": "12",
+ "name": "LO_DISCOUNT",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_discount"
+ },
+ {
+ "id": "13",
+ "name": "LO_REVENUE",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_revenue"
+ },
+ {
+ "id": "14",
+ "name": "LO_SUPPLYCOST",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_supplycost"
+ },
+ {
+ "id": "15",
+ "name": "LO_TAX",
+ "datatype": "bigint",
+ "case_sensitive_name": "lo_tax"
+ },
+ {
+ "id": "16",
+ "name": "LO_COMMITDATE",
+ "datatype": "date",
+ "case_sensitive_name": "lo_commitdate"
+ },
+ {
+ "id": "17",
+ "name": "LO_SHIPMODE",
+ "datatype": "varchar(4096)",
+ "case_sensitive_name": "lo_shipmode"
+ }
+ ],
+ "source_type": 9,
+ "table_type": "EXTERNAL",
+ "top": false,
+ "increment_loading": false,
+ "last_snapshot_path": null,
+ "last_snapshot_size": 0,
+ "snapshot_last_modified": 0,
+ "query_hit_count": 0,
+ "partition_column": null,
+ "snapshot_partitions": {},
+ "snapshot_partitions_info": {},
+ "snapshot_total_rows": 0,
+ "snapshot_partition_col": null,
+ "selected_snapshot_partition_col": null,
+ "temp_snapshot_path": null,
+ "snapshot_has_broken": false,
+ "database": "SSB",
+ "transactional": false,
+ "rangePartition": false,
+ "partition_desc": null
+}
diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java
index 24124071a5..fe5bdc8cfc 100644
--- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java
+++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java
@@ -48,6 +48,8 @@ import org.apache.kylin.rest.response.TableIndexResponse;
import org.apache.kylin.rest.service.FusionIndexService;
import org.apache.kylin.rest.service.IndexPlanService;
import org.apache.kylin.rest.service.ModelService;
+import org.apache.kylin.rest.service.params.IndexPlanParams;
+import org.apache.kylin.rest.service.params.PaginationParams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.DeleteMapping;
@@ -178,6 +180,7 @@ public class NIndexPlanController extends NBasicController {
@GetMapping(value = "/index")
public EnvelopeResponse<FusionRuleDataResult<List<IndexResponse>>> getIndex(
@RequestParam(value = "project") String project, @RequestParam(value = "model") String modelId, //
+ @RequestParam(value = "segment_id", required = false, defaultValue = "") String segmentId,
@RequestParam(value = "sort_by", required = false, defaultValue = "") String order,
@RequestParam(value = "reverse", required = false, defaultValue = "false") Boolean desc,
@RequestParam(value = "sources", required = false, defaultValue = "") List<IndexEntity.Source> sources,
@@ -189,7 +192,9 @@ public class NIndexPlanController extends NBasicController {
@RequestParam(value = "range", required = false, defaultValue = "") List<IndexEntity.Range> range) {
checkProjectName(project);
checkRequiredArg(MODEL_ID, modelId);
- val indexes = fusionIndexService.getIndexes(project, modelId, key, status, order, desc, sources, ids, range);
+ IndexPlanParams indexPlanParams = new IndexPlanParams(project, modelId, segmentId, ids, sources, status, range);
+ PaginationParams paginationParams = new PaginationParams(offset, limit, order, desc);
+ val indexes = fusionIndexService.getIndexes(indexPlanParams, paginationParams, key);
val indexUpdateEnabled = FusionIndexService.checkUpdateIndexEnabled(project, modelId);
return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
FusionRuleDataResult.get(indexes, offset, limit, indexUpdateEnabled), "");
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/IndexResponse.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/IndexResponse.java
index b437c2f857..4ae77b249a 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/IndexResponse.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/IndexResponse.java
@@ -19,6 +19,7 @@ package org.apache.kylin.rest.response;
import java.util.List;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.IndexEntity.Source;
@@ -101,6 +102,9 @@ public class IndexResponse {
}
+ @JsonProperty("abnormal_type")
+ private NDataLayout.AbnormalType abnormalType;
+
@Data
@AllArgsConstructor
public static class ColOrderPair {
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java
index 47eb6a6140..8af94f0868 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java
@@ -120,14 +120,14 @@ public class NDataSegmentResponse extends NDataSegment {
startTime = Long.parseLong(getSegRange().getStart().toString());
endTime = Long.parseLong(getSegRange().getEnd().toString());
storage = bytesSize;
- indexCount = segment.getLayoutSize();
+ indexCount = segment.getWorkingLayoutSize();
indexCountTotal = segment.getIndexPlan().getAllLayoutsSize(true);
multiPartitionCount = segment.getMultiPartitions().size();
hasBaseAggIndex = segment.getIndexPlan().containBaseAggLayout();
hasBaseTableIndex = segment.getIndexPlan().containBaseTableLayout();
if (segment.getIndexPlan().getBaseTableLayout() != null) {
val indexPlan = segment.getDataflow().getIndexPlan();
- long segmentFileCount = segment.getSegDetails().getLayouts().stream()
+ long segmentFileCount = segment.getSegDetails().getWorkingLayouts().stream()
.filter(layout -> indexPlan.getLayoutEntity(layout.getLayoutId()) != null
&& indexPlan.getLayoutEntity(layout.getLayoutId()).isBaseIndex())
.mapToLong(NDataLayout::getFileCount).sum();
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java
index 80043d3fd5..b048cd3815 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java
@@ -55,6 +55,8 @@ import org.apache.kylin.rest.response.AggIndexResponse;
import org.apache.kylin.rest.response.BuildIndexResponse;
import org.apache.kylin.rest.response.DiffRuleBasedIndexResponse;
import org.apache.kylin.rest.response.IndexResponse;
+import org.apache.kylin.rest.service.params.IndexPlanParams;
+import org.apache.kylin.rest.service.params.PaginationParams;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.springframework.beans.factory.annotation.Autowired;
@@ -195,10 +197,24 @@ public class FusionIndexService extends BasicService {
}
public List<IndexResponse> getIndexes(String project, String modelId, String key, List<IndexEntity.Status> status,
- String orderBy, Boolean desc, List<IndexEntity.Source> sources, List<Long> ids,
- List<IndexEntity.Range> range) {
- List<IndexResponse> indexes = indexPlanService.getIndexes(project, modelId, key, status, orderBy, desc,
- sources);
+ String orderBy, Boolean desc, List<IndexEntity.Source> sources, List<Long> ids,
+ List<IndexEntity.Range> range) {
+ return getIndexes(new IndexPlanParams(project, modelId, null, ids, sources, status, range),
+ new PaginationParams(null, null, orderBy, desc),
+ key);
+ }
+
+ public List<IndexResponse> getIndexes(IndexPlanParams indexPlanParams, PaginationParams paginationParams, String key) {
+ String project = indexPlanParams.getProject();
+ String modelId = indexPlanParams.getModelId();
+ List<Long> ids = indexPlanParams.getIds();
+ List<IndexEntity.Source> sources = indexPlanParams.getSources();
+ List<IndexEntity.Status> status = indexPlanParams.getStatus();
+ List<IndexEntity.Range> range = indexPlanParams.getRange();
+ String orderBy = paginationParams.getOrderBy();
+ Boolean desc = paginationParams.getReverse();
+
+ List<IndexResponse> indexes = indexPlanService.getIndexes(indexPlanParams, paginationParams, key);
NDataModel model = getManager(NDataModelManager.class, project).getDataModelDesc(modelId);
if (model.isFusionModel()) {
FusionModel fusionModel = getManager(FusionModelManager.class, project).getFusionModel(modelId);
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
index 1b9e681ef6..767b098dd5 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
@@ -98,6 +98,8 @@ import org.apache.kylin.rest.response.IndexGraphResponse;
import org.apache.kylin.rest.response.IndexResponse;
import org.apache.kylin.rest.response.IndexStatResponse;
import org.apache.kylin.rest.response.TableIndexResponse;
+import org.apache.kylin.rest.service.params.IndexPlanParams;
+import org.apache.kylin.rest.service.params.PaginationParams;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.SpringContext;
import org.slf4j.Logger;
@@ -651,7 +653,21 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
}
public List<IndexResponse> getIndexes(String project, String modelId, String key, List<IndexEntity.Status> status,
- String orderBy, Boolean desc, List<IndexEntity.Source> sources) {
+ String orderBy, Boolean desc, List<IndexEntity.Source> sources) {
+ return getIndexes(new IndexPlanParams(project, modelId, null, null, sources, status, null),
+ new PaginationParams(null, null, orderBy, desc),
+ key);
+ }
+
+ public List<IndexResponse> getIndexes(IndexPlanParams indexPlanParams, PaginationParams paginationParams, String key) {
+ String project = indexPlanParams.getProject();
+ String modelId = indexPlanParams.getModelId();
+ String segmentId = indexPlanParams.getSegmentId();
+ List<IndexEntity.Source> sources = indexPlanParams.getSources();
+ List<IndexEntity.Status> status = indexPlanParams.getStatus();
+ String orderBy = paginationParams.getOrderBy();
+ Boolean desc = paginationParams.getReverse();
+
aclEvaluate.checkProjectReadPermission(project);
Set<IndexEntity.Status> statusSet = Sets.newHashSet(status);
@@ -662,7 +678,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
Set<Long> layoutsByRunningJobs = getLayoutsByRunningJobs(project, modelId);
if (StringUtils.isBlank(key)) {
return sortAndFilterLayouts(layouts.stream()
- .map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs))
+ .map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs, segmentId))
.filter(indexResponse -> statusSet.isEmpty() || statusSet.contains(indexResponse.getStatus())),
orderBy, desc, sources);
}
@@ -676,7 +692,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
return String.valueOf(index.getId()).equals(key.trim())
|| !Sets.intersection(matchDimensions, colOrderSet).isEmpty()
|| !Sets.intersection(matchMeasures, colOrderSet).isEmpty();
- }).map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs))
+ }).map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs, segmentId))
.filter(indexResponse -> statusSet.isEmpty() || statusSet.contains(indexResponse.getStatus())), orderBy,
desc, sources);
}
@@ -735,7 +751,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
for (NDataSegment seg : readySegments) {
val lockedIndexCountInSeg = seg.getLayoutsMap().values().stream()
.filter(nDataLayout -> nDataLayout.getLayout().isToBeDeleted()).count();
- if ((seg.getSegDetails().getLayouts().size() - lockedIndexCountInSeg) != allIndexCountWithoutTobeDel) {
+ if ((seg.getSegDetails().getAllLayouts().size() - lockedIndexCountInSeg) != allIndexCountWithoutTobeDel) {
segmentToComplementCount += 1;
}
}
@@ -840,11 +856,11 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
}
private IndexResponse convertToResponse(LayoutEntity layoutEntity, NDataModel model) {
- return convertToResponse(layoutEntity, model, Sets.newHashSet());
+ return convertToResponse(layoutEntity, model, Sets.newHashSet(), null);
}
private IndexResponse convertToResponse(LayoutEntity layoutEntity, NDataModel model,
- Set<Long> layoutIdsOfRunningJobs) {
+ Set<Long> layoutIdsOfRunningJobs, String segmentId) {
// remove all internal measures
val colOrders = Lists.newArrayList(layoutEntity.getColOrder());
@@ -862,20 +878,32 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp
val dataflow = dfMgr.getDataflow(layoutEntity.getIndex().getIndexPlan().getUuid());
long dataSize = 0L;
int readyCount = 0;
- for (NDataSegment segment : dataflow.getSegments()) {
- val dataCuboid = segment.getLayout(layoutEntity.getId());
- if (dataCuboid == null) {
+ boolean hasDataInconsistent = false;
+
+ List<NDataSegment> segments = StringUtils.isBlank(segmentId) ? dataflow.getSegments()
+ : dataflow.getSegments().stream().filter(seg -> seg.getId().equals(segmentId)).collect(Collectors.toList());
+ for (NDataSegment segment : segments) {
+ NDataLayout layout = segment.getLayout(layoutEntity.getId(), true);
+ if (layout == null) {
+ continue;
+ }
+ if (layout.getAbnormalType() == NDataLayout.AbnormalType.DATA_INCONSISTENT) {
+ hasDataInconsistent = true;
continue;
}
readyCount++;
- dataSize += dataCuboid.getByteSize();
+ dataSize += layout.getByteSize();
}
IndexEntity.Status status;
if (readyCount <= 0) {
- status = IndexEntity.Status.NO_BUILD;
if (layoutIdsOfRunningJobs.contains(layoutEntity.getId())) {
status = IndexEntity.Status.BUILDING;
+ } else {
+ status = IndexEntity.Status.NO_BUILD;
+ if (hasDataInconsistent) {
+ response.setAbnormalType(NDataLayout.AbnormalType.DATA_INCONSISTENT);
+ }
}
} else {
status = IndexEntity.Status.ONLINE;
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index de7d4191f4..9a917fb61c 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -1177,7 +1177,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
boolean allToComplement, Set<Long> allIndexWithoutTobeDel, NDataSegment segment) {
if (allToComplement) {
// find seg that does not have all indexes(don't include tobeDeleted)
- val segLayoutIds = segment.getSegDetails().getLayouts().stream().map(NDataLayout::getLayoutId)
+ val segLayoutIds = segment.getSegDetails().getWorkingLayouts().stream().map(NDataLayout::getLayoutId)
.collect(Collectors.toSet());
return !Sets.difference(allIndexWithoutTobeDel, segLayoutIds).isEmpty();
}
@@ -2386,7 +2386,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
for (String segmentId : segmentIds) {
NDataSegment seg = dataflow.getSegment(segmentId);
NDataSegDetails segDetails = seg.getSegDetails();
- List<NDataLayout> layouts = new LinkedList<>(segDetails.getLayouts());
+ List<NDataLayout> layouts = new LinkedList<>(segDetails.getAllLayouts());
layouts.removeIf(layout -> indexIds.contains(layout.getLayoutId()));
dfManger.updateDataflowDetailsLayouts(seg, layouts);
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/IndexPlanParams.java
similarity index 58%
copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
copy to src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/IndexPlanParams.java
index a8cf9dc237..ea917984d8 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/IndexPlanParams.java
@@ -16,21 +16,24 @@
* limitations under the License.
*/
-package org.apache.kylin.engine.spark.job.stage.build
+package org.apache.kylin.rest.service.params;
-import org.apache.kylin.engine.spark.job.SegmentJob
-import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.kylin.metadata.cube.model.IndexEntity;
-class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
- extends BuildStage(jobContext, dataSegment, buildParam) {
+import java.util.List;
- override def execute(): Unit = {
- // Build layers.
- buildLayouts()
- // Drain results immediately after building.
- drain()
- }
-
- override def getStageName: String = "BuildLayer"
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class IndexPlanParams {
+ private String project;
+ private String modelId;
+ private String segmentId;
+ private List<Long> ids;
+ private List<IndexEntity.Source> sources;
+ private List<IndexEntity.Status> status;
+ private List<IndexEntity.Range> range;
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/PaginationParams.java
similarity index 58%
copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
copy to src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/PaginationParams.java
index a8cf9dc237..b0b495f3bc 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/PaginationParams.java
@@ -16,21 +16,18 @@
* limitations under the License.
*/
-package org.apache.kylin.engine.spark.job.stage.build
+package org.apache.kylin.rest.service.params;
-import org.apache.kylin.engine.spark.job.SegmentJob
-import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
- extends BuildStage(jobContext, dataSegment, buildParam) {
-
- override def execute(): Unit = {
- // Build layers.
- buildLayouts()
- // Drain results immediately after building.
- drain()
- }
-
- override def getStageName: String = "BuildLayer"
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class PaginationParams {
+ private Integer pageOffset;
+ private Integer pageSize;
+ private String orderBy;
+ private Boolean reverse;
}
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
index 70936d305e..be0b4814f7 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
@@ -235,14 +235,14 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
public void testGetReadableProjects() {
Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false);
- Assert.assertEquals(28, projectInstances.size());
+ Assert.assertEquals(29, projectInstances.size());
}
@Test
public void testGetAdminProjects() throws Exception {
Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
List<ProjectInstance> projectInstances = projectService.getAdminProjects();
- Assert.assertEquals(28, projectInstances.size());
+ Assert.assertEquals(29, projectInstances.size());
}
@Test
@@ -256,7 +256,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
public void testGetReadableProjectsHasNoPermissionProject() {
Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class));
List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false);
- Assert.assertEquals(28, projectInstances.size());
+ Assert.assertEquals(29, projectInstances.size());
}
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
index 2c46e619ea..0ce477be53 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
@@ -364,7 +364,7 @@ public class QueryHistoryServiceTest extends NLocalFileMetadataTestCase {
// get all tables
tableMap = queryHistoryService.getQueryHistoryTableMap(null);
- Assert.assertEquals(28, tableMap.size());
+ Assert.assertEquals(29, tableMap.size());
// not existing project
tableMap = queryHistoryService.getQueryHistoryTableMap(Lists.newArrayList("not_existing_project"));
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
index 93fe27494f..c6b2848523 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
@@ -217,7 +217,7 @@ public class AsyncQueryJobTest extends NLocalFileMetadataTestCase {
rawResourceMap.put(zipEntry.getName(), raw);
}
}
- Assert.assertEquals(84, rawResourceMap.size());
+ Assert.assertEquals(85, rawResourceMap.size());
}
private void testKylinConfig(FileSystem workingFileSystem, FileStatus metaFileStatus) throws IOException {
diff --git a/src/spark-project/engine-spark/pom.xml b/src/spark-project/engine-spark/pom.xml
index b22aefa720..6af33ecf15 100644
--- a/src/spark-project/engine-spark/pom.xml
+++ b/src/spark-project/engine-spark/pom.xml
@@ -177,6 +177,11 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index cb395e88ba..a7b72aa242 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -35,7 +35,9 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +73,7 @@ import org.apache.kylin.engine.spark.job.UdfManager;
import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
+import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.cube.model.NBatchConstants;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
@@ -118,6 +121,7 @@ public abstract class SparkApplication implements Application {
protected String project;
protected int layoutSize = -1;
protected BuildJobInfos infos;
+ protected ConcurrentHashMap<String, Boolean> skipFollowingStagesMap = new ConcurrentHashMap<>();
/**
* path for spark app args on HDFS
*/
@@ -215,6 +219,17 @@ public abstract class SparkApplication implements Application {
return clusterManager.getBuildTrackingUrl(sparkSession);
}
+ public void setSkipFollowingStages(String segmentId) {
+ skipFollowingStagesMap.put(segmentId, true);
+ }
+
+ public boolean isSkipFollowingStages(String segmentId) {
+ if (segmentId == null) {
+ return false;
+ }
+ return Optional.ofNullable(skipFollowingStagesMap.get(segmentId)).orElse(false);
+ }
+
private String tryReplaceHostAddress(String url) {
String originHost = null;
try {
@@ -446,7 +461,7 @@ public abstract class SparkApplication implements Application {
protected void waiteForResourceSuccess() throws Exception {
val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
- waiteForResource.onStageFinished(true);
+ waiteForResource.onStageFinished(ExecutableState.SUCCEED);
infos.recordStageId("");
}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java
index cd4fcf341e..2d376b51bc 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java
@@ -49,7 +49,7 @@ public class PartitionDictionaryBuilderHelper extends DictionaryBuilderHelper {
.filter(partition -> !partition.getStatus().equals(PartitionStatusEnum.READY))
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(newPartitions)) {
- for (NDataLayout cuboid : seg.getSegDetails().getLayouts()) {
+ for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) {
buildedLayouts.add(cuboid.getLayout());
}
}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/ExecutableAddCuboidHandler.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/ExecutableAddCuboidHandler.java
index 0f1b68cd2d..43e9e521f5 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/ExecutableAddCuboidHandler.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/ExecutableAddCuboidHandler.java
@@ -18,21 +18,24 @@
package org.apache.kylin.engine.spark.job;
-import com.google.common.base.Preconditions;
-import lombok.val;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Set;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.merger.AfterBuildResourceMerger;
import org.apache.kylin.job.execution.DefaultExecutableOnModel;
import org.apache.kylin.job.execution.ExecutableHandler;
+import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.cube.model.NBatchConstants;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedHashSet;
-import java.util.Optional;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+
+import lombok.val;
public class ExecutableAddCuboidHandler extends ExecutableHandler {
private static final Logger logger = LoggerFactory.getLogger(ExecutableAddCuboidHandler.class);
@@ -59,6 +62,28 @@ public class ExecutableAddCuboidHandler extends ExecutableHandler {
.filter(task -> ((NSparkExecutable) task).needMergeMetadata())
.forEach(task -> ((NSparkExecutable) task).mergerMetadata(merger));
+ tryRemoveToBeDeletedLayouts(project, modelId, jobId, executable);
+ markDFStatus();
+ }
+
+ @Override
+ public void handleDiscardOrSuicidal() {
+ val job = getExecutable();
+ // anyTargetSegmentExists && checkCuttingInJobByModel need restart job
+ if (!(job.checkCuttingInJobByModel() && job.checkAnyTargetSegmentAndPartitionExists())) {
+ return;
+ }
+ }
+
+ private void tryRemoveToBeDeletedLayouts(String project, String modelId, String jobId, DefaultExecutableOnModel executable) {
+ if (!(executable instanceof NSparkCubingJob)) {
+ return;
+ }
+ NSparkCubingJob job = (NSparkCubingJob) executable;
+ if (job.getSparkCubingStep().getStatus() != ExecutableState.SUCCEED) {
+ return;
+ }
+
Optional.ofNullable(executable.getParams()).ifPresent(params -> {
String toBeDeletedLayoutIdsStr = params.get(NBatchConstants.P_TO_BE_DELETED_LAYOUT_IDS);
if (StringUtils.isNotBlank(toBeDeletedLayoutIdsStr)) {
@@ -72,16 +97,5 @@ public class ExecutableAddCuboidHandler extends ExecutableHandler {
copyForWrite -> copyForWrite.removeLayouts(toBeDeletedLayoutIds, true, true));
}
});
- markDFStatus();
}
-
- @Override
- public void handleDiscardOrSuicidal() {
- val job = getExecutable();
- // anyTargetSegmentExists && checkCuttingInJobByModel need restart job
- if (!(job.checkCuttingInJobByModel() && job.checkAnyTargetSegmentAndPartitionExists())) {
- return;
- }
- }
-
}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
index 62ea6ec1e7..8f633509a4 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
@@ -18,23 +18,30 @@
package org.apache.kylin.engine.spark.job;
-import com.google.common.collect.Sets;
-import lombok.NoArgsConstructor;
-import lombok.val;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.merger.MetadataMerger;
import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.execution.StageBase;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.Set;
+import com.google.common.collect.Sets;
+
+import lombok.NoArgsConstructor;
+import lombok.val;
@NoArgsConstructor
public class NSparkCubingStep extends NSparkExecutable {
@@ -101,6 +108,29 @@ public class NSparkCubingStep extends NSparkExecutable {
return result;
}
+ @Override
+ protected ExecutableState adjustState(ExecutableState originalState) {
+ if (hasWarningStage()) {
+ return ExecutableState.WARNING;
+ }
+ return super.adjustState(originalState);
+ }
+
+ protected boolean hasWarningStage() {
+ NExecutableManager executableManager = getManager();
+ Map<String, List<StageBase>> stagesMap = getStagesMap();
+ for (Map.Entry<String, List<StageBase>> entry : stagesMap.entrySet()) {
+ String segmentId = entry.getKey();
+ List<StageBase> stages = entry.getValue();
+ boolean hasWarning = stages.stream()
+ .anyMatch(stage -> executableManager.getOutput(stage.getId(), segmentId).getState() == ExecutableState.WARNING);
+ if (hasWarning) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public static class Mockup {
public static void main(String[] args) {
logger.info(Mockup.class + ".main() invoked, args: " + Arrays.toString(args));
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
index f0582567f0..d5efe0722a 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java
@@ -110,7 +110,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger {
theSeg.setStatus(SegmentStatusEnum.READY);
dfUpdate.setToUpdateSegs(theSeg);
dfUpdate.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[toRemoveSegments.size()]));
- dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getLayouts().toArray(new NDataLayout[0]));
+ dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getWorkingLayouts().toArray(new NDataLayout[0]));
localDataflowManager.updateDataflow(dfUpdate);
updateIndexPlan(flowName, remoteStore);
@@ -147,7 +147,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger {
}
remoteSeg.setLastBuildTime(remoteSeg.getSegDetails().getLastModified());
for (long layoutId : availableLayoutIds) {
- NDataLayout dataCuboid = remoteSeg.getLayout(layoutId);
+ NDataLayout dataCuboid = remoteSeg.getLayout(layoutId, true);
Preconditions.checkNotNull(dataCuboid);
addCuboids.add(dataCuboid);
}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
index 3935675a82..8aa4073ae7 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
@@ -83,7 +83,7 @@ public class AfterMergeOrRefreshResourceMerger extends SparkJobMetadataMerger {
partition.setLastBuildTime(lastBuildTime);
});
mergedSegment.setLastBuildTime(lastBuildTime);
- toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getLayouts()));
+ toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getWorkingLayouts()));
} else {
mergedSegment = upsertSegmentPartition(localSegment, remoteSegment, partitions);
for (val segId : segmentIds) {
@@ -158,7 +158,7 @@ public class AfterMergeOrRefreshResourceMerger extends SparkJobMetadataMerger {
mergedSegment.setStatus(SegmentStatusEnum.WARNING);
}
}
- toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getLayouts()));
+ toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getWorkingLayouts()));
update.setToAddOrUpdateLayouts(toUpdateCuboids.toArray(new NDataLayout[0]));
update.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[0]));
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
index c179c7d947..698de6d745 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
@@ -138,7 +138,7 @@ public class DictionaryBuilderHelper {
List<LayoutEntity> buildedLayouts = Lists.newArrayList();
if (seg.getSegDetails() != null) {
- for (NDataLayout cuboid : seg.getSegDetails().getLayouts()) {
+ for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) {
buildedLayouts.add(cuboid.getLayout());
}
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
index 4b2f70329a..b5b193c527 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
@@ -24,10 +24,12 @@ import org.apache.spark.application.RetryInfo
import org.apache.spark.sql.execution.SparkPlan
import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
class BuildJobInfos {
// BUILD
- private val seg2cuboidsNumPerLayer: util.Map[String, util.List[Int]] = new util.HashMap[String, util.List[Int]]
+ private val seg2cuboidsNumPerLayer: util.Map[String, AtomicInteger] = new ConcurrentHashMap[String, AtomicInteger]()
private val seg2SpanningTree: java.util.Map[String, NSpanningTree] = new util.HashMap[String, NSpanningTree]
@@ -157,22 +159,19 @@ class BuildJobInfos {
}
def recordCuboidsNumPerLayer(segId: String, num: Int): Unit = {
- if (seg2cuboidsNumPerLayer.containsKey(segId)) {
- seg2cuboidsNumPerLayer.get(segId).add(num)
- } else {
- val nums = new util.LinkedList[Int]()
- nums.add(num)
- seg2cuboidsNumPerLayer.put(segId, nums)
+ var counter = seg2cuboidsNumPerLayer.get(segId)
+ if (counter == null) {
+ seg2cuboidsNumPerLayer.putIfAbsent(segId, new AtomicInteger())
+ counter = seg2cuboidsNumPerLayer.get(segId)
}
+ counter.addAndGet(num)
}
def clearCuboidsNumPerLayer(segId: String): Unit = {
- if (seg2cuboidsNumPerLayer.containsKey(segId)) {
- seg2cuboidsNumPerLayer.get(segId).clear()
- }
+ seg2cuboidsNumPerLayer.remove(segId)
}
- def getSeg2cuboidsNumPerLayer: util.Map[String, util.List[Int]] = {
+ def getSeg2cuboidsNumPerLayer: util.Map[String, AtomicInteger] = {
seg2cuboidsNumPerLayer
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java
index f3bc59837b..7109e0a297 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java
@@ -73,7 +73,7 @@ public class DFMergeJob extends SparkApplication {
// collect layouts need to merge
Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist = Maps.newConcurrentMap();
for (NDataSegment seg : mergingSegments) {
- for (NDataLayout cuboid : seg.getSegDetails().getLayouts()) {
+ for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) {
long layoutId = cuboid.getLayoutId();
DFLayoutMergeAssist assist = mergeCuboidsAssist.get(layoutId);
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/PartitionExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/PartitionExec.scala
index 887b252c01..f9a1fdf075 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/PartitionExec.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/PartitionExec.scala
@@ -91,6 +91,10 @@ private[job] trait PartitionExec {
}
logInfo(s"Segment $segmentId drained layout partition: " + //
s"${results.asScala.map(lp => s"(${lp.layoutId} ${lp.partitionId})").mkString("[", ",", "]")}")
+
+ val buildJobInfos = KylinBuildEnv.get().buildJobInfos
+ buildJobInfos.recordCuboidsNumPerLayer(segmentId, results.size())
+
class DFUpdate extends UnitOfWork.Callback[Int] {
override def process(): Int = {
// Merge into the newest data segment.
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
index e3c930c883..caf4835827 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
@@ -29,10 +29,14 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
+import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
+import org.apache.kylin.engine.spark.job.SegmentJob;
+import org.apache.kylin.engine.spark.job.SparkJobConstants;
import org.apache.kylin.engine.spark.job.exec.BuildExec;
import org.apache.kylin.engine.spark.job.stage.BuildParam;
import org.apache.kylin.engine.spark.job.stage.StageExec;
import org.apache.kylin.metadata.cube.model.NBatchConstants;
+import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
@@ -88,7 +92,7 @@ public class SegmentBuildJob extends SegmentJob {
checkDateFormatIfExist(project, dataflowId);
}
val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
- waiteForResource.onStageFinished(true);
+ waiteForResource.onStageFinished(ExecutableState.SUCCEED);
infos.recordStageId("");
}
@@ -135,8 +139,6 @@ public class SegmentBuildJob extends SegmentJob {
protected void build() throws IOException {
Stream<NDataSegment> segmentStream = config.isSegmentParallelBuildEnabled() ? //
readOnlySegments.parallelStream() : readOnlySegments.stream();
- AtomicLong finishedSegmentCount = new AtomicLong(0);
- val segmentsCount = readOnlySegments.size();
segmentStream.forEach(seg -> {
try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig
.setAndUnsetThreadLocalConfig(config)) {
@@ -156,14 +158,10 @@ public class SegmentBuildJob extends SegmentJob {
GATHER_FLAT_TABLE_STATS.createStage(this, seg, buildParam, exec);
BUILD_LAYER.createStage(this, seg, buildParam, exec);
+ REFRESH_COLUMN_BYTES.createStage(this, seg, buildParam, exec);
buildSegment(seg, exec);
- val refreshColumnBytes = REFRESH_COLUMN_BYTES.createStage(this, seg, buildParam, exec);
- refreshColumnBytes.toWorkWithoutFinally();
- if (finishedSegmentCount.incrementAndGet() < segmentsCount) {
- refreshColumnBytes.onStageFinished(true);
- }
} catch (IOException e) {
Throwables.propagate(e);
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
index e0f635acad..61675ac4a8 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
@@ -18,25 +18,24 @@
package org.apache.kylin.engine.spark.job
-import java.util
-import java.util.Objects
-import java.util.concurrent.{BlockingQueue, ForkJoinPool, LinkedBlockingQueue, TimeUnit}
-
import com.google.common.collect.{Lists, Queues}
-import org.apache.kylin.engine.spark.job.SegmentExec.{LayoutResult, ResultType, SourceStats}
-import org.apache.kylin.engine.spark.job.stage.merge.MergeStage
-import org.apache.kylin.engine.spark.scheduler.JobRuntime
-import org.apache.kylin.metadata.cube.model._
-import org.apache.kylin.metadata.model.NDataModel
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.kylin.common.persistence.transaction.UnitOfWork
import org.apache.kylin.common.{KapConfig, KylinConfig}
-import org.apache.kylin.metadata.model.TblColRef
+import org.apache.kylin.engine.spark.job.SegmentExec.{LayoutResult, ResultType, SourceStats, filterSuccessfulLayoutResult}
+import org.apache.kylin.engine.spark.job.stage.merge.MergeStage
+import org.apache.kylin.engine.spark.scheduler.JobRuntime
+import org.apache.kylin.metadata.cube.model.NDataLayout.AbnormalType
+import org.apache.kylin.metadata.cube.model._
+import org.apache.kylin.metadata.model.{NDataModel, TblColRef}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.datasource.storage.{StorageListener, StorageStoreFactory, WriteTaskStats}
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.tracker.BuildContext
+import java.util
+import java.util.Objects
+import java.util.concurrent.{BlockingQueue, ForkJoinPool, LinkedBlockingQueue, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.parallel.ForkJoinTaskSupport
@@ -202,6 +201,9 @@ trait SegmentExec extends Logging {
logInfo(s"Segment $segmentId drained layouts: " + //
s"${results.asScala.map(_.layoutId).mkString("[", ",", "]")}")
+ val buildJobInfos = KylinBuildEnv.get().buildJobInfos
+ buildJobInfos.recordCuboidsNumPerLayer(segmentId, results.asScala.count(filterSuccessfulLayoutResult))
+
class DFUpdate extends UnitOfWork.Callback[Int] {
override def process(): Int = {
@@ -227,6 +229,7 @@ trait SegmentExec extends Logging {
dataLayout.setPartitionValues(taskStats.partitionValues)
dataLayout.setFileCount(taskStats.numFiles)
dataLayout.setByteSize(taskStats.numBytes)
+ dataLayout.setAbnormalType(lr.abnormalType)
dataLayout
}
logInfo(s"Segment $segmentId update the data layouts $dataLayouts")
@@ -290,7 +293,13 @@ trait SegmentExec extends Logging {
val storagePath = NSparkCubingUtil.getStoragePath(segment, layout.getId)
val taskStats = saveWithStatistics(layout, layoutDS, storagePath, readableDesc, storageListener)
val sourceStats = newSourceStats(layout, taskStats)
- pipe.offer(LayoutResult(layout.getId, taskStats, sourceStats))
+ pipe.offer(LayoutResult(layout.getId, taskStats, sourceStats, null))
+ }
+
+ protected final def newEmptyDataLayout(layout: LayoutEntity, abnormalType: AbnormalType): Unit = {
+ val taskStats = WriteTaskStats(0, 0, 0, 0, 0, 0, new util.ArrayList[String]())
+ val sourceStats = SourceStats(0)
+ pipe.offer(LayoutResult(layout.getId, taskStats, sourceStats, abnormalType))
}
protected def newSourceStats(layout: LayoutEntity, taskStats: WriteTaskStats): SourceStats = {
@@ -417,6 +426,11 @@ object SegmentExec {
case class SourceStats(rows: Long)
- case class LayoutResult(layoutId: java.lang.Long, stats: WriteTaskStats, sourceStats: SourceStats) extends ResultType
+ case class LayoutResult(layoutId: java.lang.Long, stats: WriteTaskStats, sourceStats: SourceStats,
+ abnormalType: NDataLayout.AbnormalType) extends ResultType
+
+ protected def filterSuccessfulLayoutResult(layoutResult: LayoutResult): Boolean = {
+ layoutResult.abnormalType == null
+ }
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentJob.java
index 28174f8d4f..0fd435bea0 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentJob.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentJob.java
@@ -247,4 +247,8 @@ public abstract class SegmentJob extends SparkApplication {
public BuildContext getBuildContext() {
return buildContext;
}
+
+ public IndexPlan getIndexPlan() {
+ return indexPlan;
+ }
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentMergeJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentMergeJob.java
index 9bb9e12999..9660cf4102 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentMergeJob.java
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentMergeJob.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.job.exec.MergeExec;
import org.apache.kylin.engine.spark.job.stage.BuildParam;
+import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.spark.tracker.BuildContext;
@@ -72,7 +73,7 @@ public class SegmentMergeJob extends SegmentJob {
mergeColumnBytes.toWorkWithoutFinally();
if (finishedSegmentCount.incrementAndGet() < segmentsCount) {
- mergeColumnBytes.onStageFinished(true);
+ mergeColumnBytes.onStageFinished(ExecutableState.SUCCEED);
}
} catch (IOException e) {
Throwables.propagate(e);
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
index 7f99feff77..8eca14faca 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.spark.job.stage
import com.google.common.base.Throwables
+import io.kyligence.kap.guava20.shaded.common.util.concurrent.RateLimiter
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.JsonUtil
import org.apache.kylin.job.execution.ExecutableState
@@ -44,98 +45,74 @@ trait StageExec extends Logging {
def execute(): Unit
- def onStageStart(): Unit = {
- val taskId = getId
- val segmentId = getSegmentId
- val project = getJobContext.getProject
- val status = ExecutableState.RUNNING.toString
- val errMsg = null
- val updateInfo: util.HashMap[String, String] = null
-
- updateStageInfo(taskId, segmentId, project, status, errMsg, updateInfo)
- }
-
- def updateStageInfo(taskId: String, segmentId: String, project: String, status: String,
- errMsg: String, updateInfo: util.HashMap[String, String]): Unit = {
- val context = getJobContext
-
- val url = "/kylin/api/jobs/stage/status"
-
- val payload: util.HashMap[String, Object] = new util.HashMap[String, Object](6)
- payload.put("task_id", taskId)
- payload.put("segment_id", segmentId)
- payload.put("project", project)
- payload.put("status", status)
- payload.put("err_msg", errMsg)
- payload.put("update_info", updateInfo)
- val json = JsonUtil.writeValueAsString(payload)
- val params = new util.HashMap[String, String]()
- val config = KylinConfig.getInstanceFromEnv
- params.put(ParamsConstants.TIME_OUT, config.getUpdateJobInfoTimeout.toString)
- params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true))
- context.getReport.updateSparkJobInfo(params, url, json)
+ def createRateLimiter(permitsPerSecond: Double = 0.1): RateLimiter = {
+ RateLimiter.create(permitsPerSecond)
}
- def onStageFinished(result: Boolean): Unit = {
- val taskId = getId
- val segmentId = getSegmentId
- val project = getJobContext.getProject
- val status = if (result) ExecutableState.SUCCEED.toString else ExecutableState.ERROR.toString
- val errMsg = null
- val updateInfo: util.HashMap[String, String] = null
-
- updateStageInfo(taskId, segmentId, project, status, errMsg, updateInfo)
+ def onStageStart(): Unit = {
+ if (getJobContext.isSkipFollowingStages(getSegmentId)) {
+ return
+ }
+ updateStageInfo(ExecutableState.RUNNING.toString, null, null)
}
- def onBuildLayoutSuccess(layoutCount: Int): Unit = {
- val taskId = getId
- val segmentId = getSegmentId
- val project = getJobContext.getProject
- val status = null
- val errMsg = null
- val updateInfo: util.HashMap[String, String] = new util.HashMap[String, String]
- updateInfo.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, String.valueOf(layoutCount))
-
- updateStageInfo(taskId, segmentId, project, status, errMsg, updateInfo)
+ def onStageFinished(state: ExecutableState = ExecutableState.SUCCEED): Unit = {
+ updateStageInfo(state.toString, null, null)
}
def onStageSkipped(): Unit = {
- val taskId = getId
- val segmentId = getSegmentId
- val project = getJobContext.getProject
- val status = ExecutableState.SKIP.toString
- val errMsg = null
- val updateInfo: util.HashMap[String, String] = null
-
- updateStageInfo(taskId, segmentId, project, status, errMsg, updateInfo)
+ updateStageInfo(ExecutableState.SKIP.toString, null, null)
}
def toWork(): Unit = {
- onStageStart()
- var result: Boolean = false
- try {
- execute()
- result = true
- } catch {
- case throwable: Throwable =>
- KylinBuildEnv.get().buildJobInfos.recordSegmentId(getSegmentId)
- KylinBuildEnv.get().buildJobInfos.recordStageId(getId)
- Throwables.propagate(throwable)
- } finally onStageFinished(result)
+ toWork0()
}
def toWorkWithoutFinally(): Unit = {
+ toWork0(false)
+ }
+
+ def toWork0(doFinally: Boolean = true): Unit = {
onStageStart()
+ var state: ExecutableState = ExecutableState.SUCCEED
try {
+ if (getJobContext.isSkipFollowingStages(getSegmentId)) {
+ state = ExecutableState.SKIP
+ return
+ }
execute()
} catch {
case throwable: Throwable =>
+ state = ExecutableState.ERROR
KylinBuildEnv.get().buildJobInfos.recordSegmentId(getSegmentId)
KylinBuildEnv.get().buildJobInfos.recordStageId(getId)
Throwables.propagate(throwable)
+ } finally {
+ if (doFinally) {
+ onStageFinished(state)
+ }
}
}
+ def updateStageInfo(status: String, errMsg: String, updateInfo: util.Map[String, String]): Unit = {
+ val context = getJobContext
+
+ val url = "/kylin/api/jobs/stage/status"
+
+ val payload: util.HashMap[String, Object] = new util.HashMap[String, Object](6)
+ payload.put("task_id", getId)
+ payload.put("segment_id", getSegmentId)
+ payload.put("project", context.getProject)
+ payload.put("status", status)
+ payload.put("err_msg", errMsg)
+ payload.put("update_info", updateInfo)
+ val json = JsonUtil.writeValueAsString(payload)
+ val params = new util.HashMap[String, String]()
+ val config = KylinConfig.getInstanceFromEnv
+ params.put(ParamsConstants.TIME_OUT, config.getUpdateJobInfoTimeout.toString)
+ params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(context.getProject, true))
+ context.getReport.updateSparkJobInfo(params, url, json)
+ }
def setId(id: String): Unit = {
this.id = id
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
index a8cf9dc237..11e71d081a 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala
@@ -18,14 +18,22 @@
package org.apache.kylin.engine.spark.job.stage.build
-import org.apache.kylin.engine.spark.job.SegmentJob
+import io.kyligence.kap.guava20.shaded.common.util.concurrent.RateLimiter
import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.engine.spark.job.{KylinBuildEnv, SegmentJob}
+import org.apache.kylin.metadata.cube.model.{NBatchConstants, NDataSegment}
+
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
extends BuildStage(jobContext, dataSegment, buildParam) {
+ private val rateLimiter: RateLimiter = createRateLimiter()
+
override def execute(): Unit = {
+ // Start an independent thread doing drain at a fixed rate
+ scheduleCheckpoint()
// Build layers.
buildLayouts()
// Drain results immediately after building.
@@ -33,4 +41,15 @@ class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam:
}
override def getStageName: String = "BuildLayer"
+
+ override protected def drain(timeout: Long, unit: TimeUnit): Unit = {
+ super.drain(timeout, unit)
+
+ val buildJobInfos = KylinBuildEnv.get().buildJobInfos
+ val layoutCount = buildJobInfos.getSeg2cuboidsNumPerLayer.get(segmentId)
+ if (rateLimiter.tryAcquire() && layoutCount != null) {
+ updateStageInfo(null, null, mapAsJavaMap(Map(NBatchConstants.P_INDEX_SUCCESS_COUNT ->
+ String.valueOf(layoutCount.get()))))
+ }
+ }
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
index 414335d9fa..dba42cca96 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
@@ -148,12 +148,6 @@ abstract class BuildStage(private val jobContext: SegmentJob,
override protected def recordTaskInfo(t: Task): Unit = {
logInfo(s"Segment $segmentId submit task: ${t.getTaskDesc}")
- KylinBuildEnv.get().buildJobInfos.recordCuboidsNumPerLayer(segmentId, 1)
- }
-
- override protected def reportTaskProgress(): Unit = {
- val layoutCount = KylinBuildEnv.get().buildJobInfos.getSeg2cuboidsNumPerLayer.get(segmentId).asScala.sum
- onBuildLayoutSuccess(layoutCount)
}
protected def buildStatistics(): Statistics = {
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
index 6668593a42..a5faa5374e 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala
@@ -28,8 +28,6 @@ class GatherFlatTableStats(jobContext: SegmentJob, dataSegment: NDataSegment, bu
extends BuildStage(jobContext, dataSegment, buildParam) {
override def execute(): Unit = {
- scheduleCheckpoint()
-
// Build flat table?
if (buildParam.getSpanningTree.fromFlatTable()) {
// Collect statistics for flat table.
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
index ba62f0bfd9..823079a59b 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala
@@ -18,26 +18,153 @@
package org.apache.kylin.engine.spark.job.stage.build
-import org.apache.kylin.engine.spark.job.SegmentJob
+import com.google.common.collect.{Maps, Queues}
import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.engine.spark.job.stage.build.GenerateFlatTable._
+import org.apache.kylin.engine.spark.job.{SanityChecker, SegmentJob}
+import org.apache.kylin.job.execution.ExecutableState
+import org.apache.kylin.metadata.cube.model._
+import org.apache.spark.sql.datasource.storage.StorageStoreUtils
import org.apache.spark.sql.{Dataset, Row}
+import scala.collection.JavaConverters._
+
class GenerateFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
extends FlatTableAndDictBase(jobContext, dataSegment, buildParam) {
- override def execute(): Unit = {
- val flatTable: Dataset[Row] = generateFlatTable()
- buildParam.setFlatTable(flatTable)
- val flatTablePart: Dataset[Row] = generateFlatTablePart()
- buildParam.setFlatTablePart(flatTablePart)
+ private var dataCountCheckGood: Boolean = true
+ override def execute(): Unit = {
+ buildParam.setFlatTable(generateFlatTable())
+ buildParam.setFlatTablePart(generateFlatTablePart())
buildParam.setBuildFlatTable(this)
- if (buildParam.isSkipGenerateFlatTable) {
- onStageSkipped()
+ if (!checkDataCountPass()) {
+ drainEmptyLayoutOnDataCountCheckFailed()
+ return
}
}
override def getStageName: String = "GenerateFlatTable"
+
+ override def onStageFinished(state: ExecutableState): Unit = {
+ if (dataCountCheckGood) {
+ super.onStageFinished(state)
+ } else {
+ updateStageInfo(ExecutableState.WARNING.toString, null,
+ Maps.newHashMap(Map(NBatchConstants.P_WARNING_CODE -> NDataLayout.AbnormalType.DATA_INCONSISTENT.name()).asJava))
+ }
+ }
+
+ private def checkDataCountPass(): Boolean = {
+ if (jobContext.getConfig.isDataCountCheckEnabled) {
+ logInfo(s"segment ${dataSegment.getId} dataCountCheck is enabled")
+ val result = checkDataCount()
+ logInfo(s"segment ${dataSegment.getId} dataCountCheck result: ${result}")
+ return result
+ }
+ logInfo(s"segment ${dataSegment.getId} dataCountCheck is not enabled")
+ true
+ }
+
+ private def drainEmptyLayoutOnDataCountCheckFailed(): Unit = {
+ dataCountCheckGood = false
+ jobContext.setSkipFollowingStages(dataSegment.getId)
+ readOnlyLayouts.asScala.foreach(newEmptyDataLayout(_, NDataLayout.AbnormalType.DATA_INCONSISTENT))
+ drain()
+ }
+
+ /**
+ * Data count check for segment layouts before index building, including 2 checks:
+ * <p>Check1: Equality check for sum value of count among existing layouts
+ * (not the layouts that are under construction)
+ * <p>Check2: Equality check for sum value of count between layouts above and flat table
+ *
+ * <p>Special case for check1 is to allow un-equality count sum between agg layouts and table layouts
+ * when <tt>kylin.build.allow-non-strict-count-check</tt> enabled, and should not be used in most cases.
+ *
+ * <p>Check2 will not execute when all the new layouts can be covered by existing layouts in index building.
+ *
+ * <p>Count measure will be used in calculating sum value of count for agg layouts,
+ * and for table layouts simply counting data row numbers.
+ *
+ * @return <tt>true</tt> if data count check pass, <tt>false</tt> otherwise
+ */
+ private def checkDataCount(): Boolean = {
+ val layouts = dataSegment.getSegDetails.getWorkingLayouts.asScala.map(lay => jobContext.getIndexPlan.getLayoutEntity(lay.getLayoutId))
+ val tasks = layouts.map(layout => new DataCountCheckTask(layout, StorageStoreUtils.toDF(dataSegment, layout, sparkSession)))
+ val resultsQueue = Queues.newLinkedBlockingQueue[DataCountCheckResult]()
+
+ if (layouts.isEmpty) return true
+
+ // Start calculating data count
+ slowStartExec(tasks.iterator, (task: DataCountCheckTask) => {
+ val layout = task.layout
+ val ds = task.ds
+ resultsQueue.offer(new DataCountCheckResult(layout, SanityChecker.getCount(ds, layout)))
+ })
+
+ val reduceFor: (DataCountCheckResult => Boolean) => Long = layoutCountReducer(resultsQueue.asScala)
+ val aggLayoutCount = reduceFor(result => !IndexEntity.isTableIndex(result.layout.getId))
+ val tableLayoutCount = reduceFor(result => IndexEntity.isTableIndex(result.layout.getId))
+
+ // All agg layouts count or table layouts count must be same
+ if (isInvalidCount(aggLayoutCount) || isInvalidCount(tableLayoutCount)) {
+ logWarning(s"segment ${dataSegment.getId} dataCountCheck check1 failed, " +
+ s"count number in agg layouts or table layouts are not same, " +
+ s"agg layouts count: ${aggLayoutCount}, table layouts count: ${tableLayoutCount}")
+ return false
+ }
+ // Count number between agg layout and table layout should be same when non-strict count check is not enabled
+ if (bothLayoutsExist(aggLayoutCount, tableLayoutCount)
+ && (aggLayoutCount != tableLayoutCount && !config.isNonStrictCountCheckAllowed)) {
+ logWarning(s"segment ${dataSegment.getId} dataCountCheck check1 failed, " +
+ s"count number between agg layouts and table layouts are not same, " +
+ s"agg layouts count: ${aggLayoutCount}, table layouts count: ${tableLayoutCount}")
+ return false
+ }
+ // Agg layout count equals table layout count, comparing with flat table count or simply return true
+ val layoutCount = if (isLayoutExists(aggLayoutCount)) aggLayoutCount else tableLayoutCount
+ if (isLayoutExists(layoutCount) && buildParam.getSpanningTree.fromFlatTable) {
+ val flatTableCount = buildParam.getFlatTable.count
+ val check2Result = layoutCount == flatTableCount
+ if (!check2Result) {
+ logWarning(s"segment ${dataSegment.getId} dataCountCheck check2 failed, " +
+ s"layouts count: ${layoutCount}, flat table count: ${flatTableCount}")
+ }
+ check2Result
+ } else {
+ true
+ }
+ }
+
+ private def layoutCountReducer(results: Iterable[DataCountCheckResult])(filter: DataCountCheckResult => Boolean): Long = {
+ results.filter(filter)
+ .map(_.count)
+ .reduceOption((prev, cur) => if (prev == cur) cur else InvalidCountFlag)
+ .getOrElse(LayoutNonExistsFlag)
+ }
+
+ sealed class DataCountCheckTask(val layout: LayoutEntity, val ds: Dataset[Row]) extends Task {
+ override def getTaskDesc: String = s"layout ${layout.getId} data count check"
+ }
+
+ sealed class DataCountCheckResult(val layout: LayoutEntity, val count: Long)
+}
+
+object GenerateFlatTable {
+ val InvalidCountFlag: Long = SanityChecker.SKIP_FLAG
+ val LayoutNonExistsFlag: Long = -2L
+
+ def isInvalidCount(count: Long): Boolean = {
+ count == InvalidCountFlag
+ }
+
+ def isLayoutExists(count: Long): Boolean = {
+ count > LayoutNonExistsFlag
+ }
+
+ def bothLayoutsExist(aggLayoutCount: Long, tableLayoutCount: Long): Boolean = {
+ aggLayoutCount > LayoutNonExistsFlag && tableLayoutCount > LayoutNonExistsFlag
+ }
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
index 995bb588cc..621424403a 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala
@@ -18,12 +18,19 @@
package org.apache.kylin.engine.spark.job.stage.build.partition
-import org.apache.kylin.engine.spark.job.SegmentJob
+import io.kyligence.kap.guava20.shaded.common.util.concurrent.RateLimiter
import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.engine.spark.job.{KylinBuildEnv, SegmentJob}
+import org.apache.kylin.metadata.cube.model.{NBatchConstants, NDataSegment}
+
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
class PartitionBuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam)
extends PartitionBuildStage(jobContext, dataSegment, buildParam) {
+
+ private val rateLimiter: RateLimiter = createRateLimiter()
+
override def execute(): Unit = {
// Build layers.
buildLayouts()
@@ -32,4 +39,16 @@ class PartitionBuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, bui
}
override def getStageName: String = "PartitionBuildLayer"
+
+ override protected def drain(timeout: Long, unit: TimeUnit): Unit = {
+ super.drain(timeout, unit)
+
+ val buildJobInfos = KylinBuildEnv.get().buildJobInfos
+ val layoutCountTotal = buildJobInfos.getSeg2cuboidsNumPerLayer.get(segmentId)
+ if (rateLimiter.tryAcquire() && layoutCountTotal != null) {
+ val layoutCount = layoutCountTotal.get() / partitions.size()
+ updateStageInfo(null, null, mapAsJavaMap(Map(NBatchConstants.P_INDEX_SUCCESS_COUNT ->
+ String.valueOf(layoutCount))))
+ }
+ }
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildStage.scala
index 331151ecae..6d2bb42355 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildStage.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildStage.scala
@@ -67,12 +67,6 @@ abstract class PartitionBuildStage(jobContext: SegmentJob, dataSegment: NDataSeg
override protected def columnIdFunc(colRef: TblColRef): String = flatTableDesc.getColumnIdAsString(colRef)
-
- override protected def reportTaskProgress(): Unit = {
- val layoutCount = KylinBuildEnv.get().buildJobInfos.getSeg2cuboidsNumPerLayer.get(segmentId).asScala.sum
- onBuildLayoutSuccess(layoutCount / partitions.size())
- }
-
override protected def buildLayouts(): Unit = {
val taskIter = new BuildTaskIterator[PartitionBuildTask] {
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala
index 0ac0a14701..54aebff88c 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala
@@ -78,7 +78,7 @@ abstract class MergeStage(private val jobContext: SegmentJob,
// Cleanup previous potentially left temp layout data.
cleanupLayoutTempData(dataSegment, jobContext.getReadOnlyLayouts.asScala.toSeq)
- val tasks = unmerged.flatMap(segment => segment.getSegDetails.getLayouts.asScala) //
+ val tasks = unmerged.flatMap(segment => segment.getSegDetails.getWorkingLayouts.asScala) //
.groupBy(_.getLayoutId).values.map(LayoutMergeTask)
slowStartExec(tasks.iterator, mergeLayout)
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala
index b31f25049c..b932331a8a 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala
@@ -50,7 +50,7 @@ abstract class PartitionMergeStage(private val jobContext: SegmentJob,
override protected def mergeIndices(): Unit = {
val tasks = unmerged.flatMap(segment =>
- segment.getSegDetails.getLayouts.asScala.flatMap(layout =>
+ segment.getSegDetails.getWorkingLayouts.asScala.flatMap(layout =>
layout.getMultiPartition.asScala.map(partition => (layout, partition))
)).groupBy(tp => (tp._1.getLayoutId, tp._2.getPartitionId)).values.map(PartitionMergeTask)
slowStartExec(tasks.iterator, mergePartition)
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTableWithSparkSessionTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTableWithSparkSessionTest.java
new file mode 100644
index 0000000000..8099dc7d4d
--- /dev/null
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTableWithSparkSessionTest.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job.stage.build;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KapConfig;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.spark.IndexDataConstructor;
+import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.engine.spark.job.NSparkCubingJob;
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.cube.model.IndexPlan;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.datasource.storage.StorageStore;
+import org.apache.spark.sql.datasource.storage.StorageStoreFactory;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import io.kyligence.kap.guava20.shaded.common.collect.Sets;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@SuppressWarnings("unchecked")
+public class GenerateFlatTableWithSparkSessionTest extends NLocalWithSparkSessionTest {
+
+ @Getter
+ @AllArgsConstructor
+ static class ColumnStruct {
+ private Long rowId;
+ private String rowName;
+ private DataType type;
+ }
+
+ private static final LinkedHashMap<Long, ColumnStruct> DIM_SCHEMAS = new LinkedHashMap<>();
+ private static final LinkedHashMap<Long, ColumnStruct> MEASURE_SCHEMAS = new LinkedHashMap<>();
+ static {
+ DIM_SCHEMAS.put(0L, new ColumnStruct(0L, "LO_ORDERKEY", DataTypes.LongType));
+ DIM_SCHEMAS.put(13L, new ColumnStruct(13L, "LO_QUANTITY", DataTypes.LongType));
+ DIM_SCHEMAS.put(16L, new ColumnStruct(16L, "LO_CUSTKEY", DataTypes.LongType));
+ DIM_SCHEMAS.put(22L, new ColumnStruct(22L, "C_NAME", DataTypes.StringType));
+ DIM_SCHEMAS.put(24L, new ColumnStruct(24L, "C_CUSTKEY", DataTypes.LongType));
+
+ MEASURE_SCHEMAS.put(100000L, new ColumnStruct(100000L, "COUNT_ALL", DataTypes.LongType));
+ MEASURE_SCHEMAS.put(100001L, new ColumnStruct(100001L, "SUM_LINEORDER_LO_QUANTITY", DataTypes.LongType));
+ }
+
+ private static final String MODEL_ID = "3ec47efc-573a-9304-4405-8e05ae184322";
+ private static final String SEGMENT_ID = "b2f206e1-7a15-c94a-20f5-f608d550ead6";
+ private static final long LAYOUT_ID_TO_BUILD = 20001L;
+
+ private KylinConfig config;
+ private NIndexPlanManager indexPlanManager;
+ private NDataflowManager dataflowManager;
+
+ @Override
+ public String getProject() {
+ return "index_build_test";
+ }
+
+ @BeforeAll
+ public static void beforeClass() {
+ NLocalWithSparkSessionTest.beforeClass();
+ }
+
+ @AfterAll
+ public static void afterClass() {
+ NLocalWithSparkSessionTest.afterClass();
+ }
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
+ overwriteSystemProp("kylin.engine.persist-flattable-threshold", "0");
+ overwriteSystemProp("kylin.engine.persist-flatview", "true");
+
+ NDefaultScheduler.destroyInstance();
+ NDefaultScheduler scheduler = NDefaultScheduler.getInstance(getProject());
+ scheduler.init(new JobEngineConfig(getTestConfig()));
+ if (!scheduler.hasStarted()) {
+ throw new RuntimeException("scheduler has not been started");
+ }
+
+ config = getTestConfig();
+ indexPlanManager = NIndexPlanManager.getInstance(config, getProject());
+ dataflowManager = NDataflowManager.getInstance(config, getProject());
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ super.tearDown();
+ NDefaultScheduler.destroyInstance();
+ }
+
+ private Object[] initDataCountCheckTest() {
+ // Enable data count check
+ overwriteSystemProp("kylin.build.data-count-check-enabled", "true");
+ // Index Plan
+ IndexPlan indexPlan = indexPlanManager.getIndexPlan(MODEL_ID);
+ // Segment
+ NDataflow dataflow = dataflowManager.getDataflow(MODEL_ID);
+ NDataSegment segment = dataflow.getSegment(SEGMENT_ID);
+ // Layouts to build
+ Set<Long> layoutIDs = Sets.newHashSet(LAYOUT_ID_TO_BUILD);
+ Set<LayoutEntity> readOnlyLayouts = Collections.unmodifiableSet(NSparkCubingUtil.toLayouts(indexPlan, layoutIDs));
+
+ return new Object[] {indexPlan, dataflow, segment, readOnlyLayouts};
+ }
+
+ @Test
+ void testCheckDataCount_Good() throws Exception {
+ Object[] objs = initDataCountCheckTest();
+ IndexPlan indexPlan = (IndexPlan) objs[0];
+ NDataflow dataflow = (NDataflow) objs[1];
+ NDataSegment segment = (NDataSegment) objs[2];
+ Set<LayoutEntity> readOnlyLayouts = (Set<LayoutEntity>) objs[3];
+
+ // Prepare flat table parquet
+ prepareFlatTableParquet(dataflow, segment, false);
+ // Prepare already existing layouts' parquet
+ prepareLayoutsParquet(indexPlan, segment, false, false, false);
+ // Execute job
+ ExecutableState state = executeJob(segment, readOnlyLayouts);
+
+ // Verify
+ assertEquals(ExecutableState.SUCCEED, state);
+ NDataflow dataflowForVerify = dataflowManager.getDataflow(MODEL_ID);
+ NDataSegment segmentForVerify = dataflowForVerify.getSegment(SEGMENT_ID);
+ NDataLayout layoutForVerify = segmentForVerify.getLayout(LAYOUT_ID_TO_BUILD, true);
+ assertNotNull(layoutForVerify);
+ assertNull(layoutForVerify.getAbnormalType());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "1,10001",
+ "20000000001,20000010001",
+ "1,10001,20000000001,20000010001"
+ })
+ void testCheckDataCount_Good_WithNonExistingLayouts(String nonExistingLayouts) throws Exception {
+ // Enable data count check
+ overwriteSystemProp("kylin.build.data-count-check-enabled", "true");
+
+ Set<Long> nonExistingLayoutsSet = Arrays.stream(nonExistingLayouts.split(",")).map(Long::parseLong)
+ .collect(Collectors.toSet());
+
+ IndexPlan indexPlan = indexPlanManager.getIndexPlan(MODEL_ID);
+ // Layouts to build
+ Set<Long> layoutIDs = Sets.newHashSet(LAYOUT_ID_TO_BUILD);
+ Set<LayoutEntity> readOnlyLayouts = Collections.unmodifiableSet(NSparkCubingUtil.toLayouts(indexPlan, layoutIDs));
+ // Remove all layouts for test
+ indexPlanManager.updateIndexPlan(MODEL_ID, copyForWrite -> {
+ copyForWrite.removeLayouts(nonExistingLayoutsSet, true, true);
+ });
+ dataflowManager.reloadAll();
+ NDataflow dataflow = dataflowManager.getDataflow(MODEL_ID);
+ NDataSegment segment = dataflow.getSegment(SEGMENT_ID);
+
+ // Prepare flat table parquet
+ prepareFlatTableParquet(dataflow, segment, false);
+ // Prepare already existing layouts' parquet
+ prepareLayoutsParquet(indexPlan, segment, false, false, false,
+ nonExistingLayoutsSet);
+ // Execute job
+ ExecutableState state = executeJob(segment, readOnlyLayouts);
+
+ // Verify
+ assertEquals(ExecutableState.SUCCEED, state);
+ NDataflow dataflowForVerify = dataflowManager.getDataflow(MODEL_ID);
+ NDataSegment segmentForVerify = dataflowForVerify.getSegment(SEGMENT_ID);
+ NDataLayout layoutForVerify = segmentForVerify.getLayout(LAYOUT_ID_TO_BUILD, true);
+ assertNotNull(layoutForVerify);
+ assertNull(layoutForVerify.getAbnormalType());
+ }
+
+ @Test
+ void testCheckDataCount_Good_WithNonStrictCountCheckEnabled() throws Exception {
+ Object[] objs = initDataCountCheckTest();
+ IndexPlan indexPlan = (IndexPlan) objs[0];
+ NDataflow dataflow = (NDataflow) objs[1];
+ NDataSegment segment = (NDataSegment) objs[2];
+ Set<LayoutEntity> readOnlyLayouts = (Set<LayoutEntity>) objs[3];
+
+ overwriteSystemProp("kylin.build.allow-non-strict-count-check", "true");
+
+ // Prepare flat table parquet
+ prepareFlatTableParquet(dataflow, segment, false);
+ // Prepare already existing layouts' parquet
+ prepareLayoutsParquet(indexPlan, segment, false, false, true);
+ // Execute job
+ ExecutableState state = executeJob(segment, readOnlyLayouts);
+
+ // Verify
+ assertEquals(ExecutableState.SUCCEED, state);
+ NDataflow dataflowForVerify = dataflowManager.getDataflow(MODEL_ID);
+ NDataSegment segmentForVerify = dataflowForVerify.getSegment(SEGMENT_ID);
+ NDataLayout layoutForVerify = segmentForVerify.getLayout(LAYOUT_ID_TO_BUILD, true);
+ assertNotNull(layoutForVerify);
+ assertNull(layoutForVerify.getAbnormalType());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "true, false, false",
+ "false, true, false",
+ "false, false, true"
+ })
+ void testCheckDataCount_Failed(boolean loseOneAggLayoutRecord,
+ boolean loseOneTableLayoutRecord,
+ boolean loseTowTableLayoutRecords) throws Exception {
+ Object[] objs = initDataCountCheckTest();
+ IndexPlan indexPlan = (IndexPlan) objs[0];
+ NDataflow dataflow = (NDataflow) objs[1];
+ NDataSegment segment = (NDataSegment) objs[2];
+ Set<LayoutEntity> readOnlyLayouts = (Set<LayoutEntity>) objs[3];
+
+ // Prepare flat table parquet
+ prepareFlatTableParquet(dataflow, segment, false);
+ // Prepare already existing layouts' parquet
+ prepareLayoutsParquet(indexPlan, segment, loseOneAggLayoutRecord, loseOneTableLayoutRecord, loseTowTableLayoutRecords);
+ // Execute job
+ ExecutableState state = executeJob(segment, readOnlyLayouts);
+
+ // Verify
+ assertEquals(ExecutableState.SUCCEED, state);
+ NDataflow dataflowForVerify = dataflowManager.getDataflow(MODEL_ID);
+ NDataSegment segmentForVerify = dataflowForVerify.getSegment(SEGMENT_ID);
+ NDataLayout layoutForVerify = segmentForVerify.getLayout(LAYOUT_ID_TO_BUILD, true);
+ assertNotNull(layoutForVerify);
+ assertEquals(NDataLayout.AbnormalType.DATA_INCONSISTENT, layoutForVerify.getAbnormalType());
+ }
+
+ private void prepareFlatTableParquet(NDataflow dataflow, NDataSegment segment, boolean loseOneRecordForTest) {
+ List<StructField> fields = new ArrayList<>();
+ for (ColumnStruct cs : DIM_SCHEMAS.values()) {
+ StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+ fields.add(field);
+ }
+ StructType schema = DataTypes.createStructType(fields);
+
+ List<Row> rows = new ArrayList<>();
+ rows.add(RowFactory.create(1L, 5L, 1L, "Customer#000000001", 1L));
+ rows.add(RowFactory.create(2L, 15L, 1L, "Customer#000000001", 1L));
+ rows.add(RowFactory.create(3L, 5L, 2L, "Customer#000000002", 2L));
+ rows.add(RowFactory.create(4L, 15L, 3L, "Customer#000000003", 3L));
+ rows.add(RowFactory.create(5L, 5L, 5L, "Customer#000000005", 5L));
+ if (!loseOneRecordForTest) {
+ rows.add(RowFactory.create(6L, 15L, 5L, "Customer#000000005", 5L));
+ }
+ Dataset<Row> flatTableDS = ss.createDataFrame(rows, schema);
+
+ ss.sessionState().conf().setLocalProperty("spark.sql.sources.repartitionWritingDataSource", "true");
+ flatTableDS.write().mode(SaveMode.Overwrite).parquet(config.getFlatTableDir(getProject(), dataflow.getId(), segment.getId()).toString());
+ }
+
+ private void prepareLayoutsParquet(IndexPlan indexPlan, NDataSegment segment,
+ boolean loseOneAggLayoutRecordForTest,
+ boolean loseOneTableLayoutRecordForTest,
+ boolean loseTwoTableLayoutRecordsForTest) {
+ prepareLayoutsParquet(indexPlan, segment, loseOneAggLayoutRecordForTest, loseOneTableLayoutRecordForTest,
+ loseTwoTableLayoutRecordsForTest, null);
+ }
+
+ private void prepareLayoutsParquet(IndexPlan indexPlan, NDataSegment segment,
+ boolean loseOneAggLayoutRecordForTest,
+ boolean loseOneTableLayoutRecordForTest,
+ boolean loseTwoTableLayoutRecordsForTest,
+ Set<Long> nonExistingLayouts) {
+ StorageStore store = StorageStoreFactory.create(1);
+
+ // Prepare layout 1
+ {
+ List<StructField> fields = new ArrayList<>();
+ for (ColumnStruct cs : toColumnStructs("0 13 16 24 22 100000")) {
+ StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+ fields.add(field);
+ }
+ StructType schema = DataTypes.createStructType(fields);
+
+ List<Row> rows = new ArrayList<>();
+ rows.add(RowFactory.create(1L, 5L, 1L, 1L, "Customer#000000001", 1L));
+ rows.add(RowFactory.create(2L, 15L, 1L, 1L, "Customer#000000001", 1L));
+ rows.add(RowFactory.create(3L, 5L, 2L, 2L, "Customer#000000002", 1L));
+ rows.add(RowFactory.create(4L, 15L, 3L, 3L, "Customer#000000003", 1L));
+ rows.add(RowFactory.create(5L, 5L, 5L, 5L, "Customer#000000005", 1L));
+ if (!loseOneAggLayoutRecordForTest) {
+ rows.add(RowFactory.create(6L, 15L, 5L, 5L, "Customer#000000005", 1L));
+ }
+
+ Dataset<Row> layoutDS = ss.createDataFrame(rows, schema);
+
+ final long layoutId = 1L;
+ LayoutEntity layoutEntity = indexPlan.getLayoutEntity(layoutId);
+ if (CollectionUtils.isEmpty(nonExistingLayouts) || !nonExistingLayouts.contains(layoutId)) {
+ store.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(segment, layoutEntity.getId())), KapConfig.wrap(config), layoutDS);
+ }
+ }
+ // Prepare layout 10001
+ {
+ List<StructField> fields = new ArrayList<>();
+ for (ColumnStruct cs : toColumnStructs("24 22 100000 100001")) {
+ StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+ fields.add(field);
+ }
+ StructType schema = DataTypes.createStructType(fields);
+
+ List<Row> rows = new ArrayList<>();
+ rows.add(RowFactory.create(1L, "Customer#000000001", 2L, 20L));
+ rows.add(RowFactory.create(2L, "Customer#000000002", 1L, 5L));
+ rows.add(RowFactory.create(3L, "Customer#000000003", 1L, 15L));
+ rows.add(RowFactory.create(5L, "Customer#000000005", 2L, 20L));
+ Dataset<Row> layoutDS = ss.createDataFrame(rows, schema);
+
+ final long layoutId = 10001L;
+ LayoutEntity layoutEntity = indexPlan.getLayoutEntity(layoutId);
+ if (CollectionUtils.isEmpty(nonExistingLayouts) || !nonExistingLayouts.contains(layoutId)) {
+ store.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(segment, layoutEntity.getId())), KapConfig.wrap(config), layoutDS);
+ }
+ }
+ // Prepare layout 20000000001
+ {
+ List<StructField> fields = new ArrayList<>();
+ for (ColumnStruct cs : toColumnStructs("0 13 16 24 22")) {
+ StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+ fields.add(field);
+ }
+ StructType schema = DataTypes.createStructType(fields);
+
+ List<Row> rows = new ArrayList<>();
+ rows.add(RowFactory.create(1L, 5L, 1L, 1L, "Customer#000000001"));
+ rows.add(RowFactory.create(2L, 15L, 1L, 1L, "Customer#000000001"));
+ rows.add(RowFactory.create(3L, 5L, 2L, 2L, "Customer#000000002"));
+ rows.add(RowFactory.create(4L, 15L, 3L, 3L, "Customer#000000003"));
+ rows.add(RowFactory.create(5L, 5L, 5L, 5L, "Customer#000000005"));
+ if (!loseOneTableLayoutRecordForTest && !loseTwoTableLayoutRecordsForTest) {
+ rows.add(RowFactory.create(6L, 15L, 5L, 5L, "Customer#000000005"));
+ }
+ Dataset<Row> layoutDS = ss.createDataFrame(rows, schema);
+
+ final long layoutId = 20_000_000_001L;
+ LayoutEntity layoutEntity = indexPlan.getLayoutEntity(layoutId);
+ if (CollectionUtils.isEmpty(nonExistingLayouts) || !nonExistingLayouts.contains(layoutId)) {
+ store.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(segment, layoutEntity.getId())), KapConfig.wrap(config), layoutDS);
+ }
+ }
+ // Prepare layout 20000010001
+ {
+ List<StructField> fields = new ArrayList<>();
+ for (ColumnStruct cs : toColumnStructs("0 13")) {
+ StructField field = DataTypes.createStructField(String.valueOf(cs.getRowId()), cs.getType(), true);
+ fields.add(field);
+ }
+ StructType schema = DataTypes.createStructType(fields);
+
+ List<Row> rows = new ArrayList<>();
+ rows.add(RowFactory.create(1L, 5L));
+ rows.add(RowFactory.create(2L, 15L));
+ rows.add(RowFactory.create(3L, 5L));
+ rows.add(RowFactory.create(4L, 15L));
+ rows.add(RowFactory.create(5L, 5L));
+ if (!loseTwoTableLayoutRecordsForTest) {
+ rows.add(RowFactory.create(6L, 15L));
+ }
+ Dataset<Row> layoutDS = ss.createDataFrame(rows, schema);
+
+ final long layoutId = 20_000_010_001L;
+ LayoutEntity layoutEntity = indexPlan.getLayoutEntity(layoutId);
+ if (CollectionUtils.isEmpty(nonExistingLayouts) || !nonExistingLayouts.contains(layoutId)) {
+ store.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(segment, layoutEntity.getId())), KapConfig.wrap(config), layoutDS);
+ }
+ }
+ }
+
+ private List<ColumnStruct> toColumnStructs(String columnIds) {
+ return Arrays.stream(columnIds.split(" ")).map(id -> {
+ ColumnStruct cs = DIM_SCHEMAS.get(Long.valueOf(id));
+ if (cs == null) {
+ cs = MEASURE_SCHEMAS.get(Long.valueOf(id));
+ }
+ return cs;
+ }).collect(Collectors.toList());
+ }
+
+ private ExecutableState executeJob(NDataSegment segment, Set<LayoutEntity> readOnlyLayouts) throws InterruptedException {
+ NExecutableManager executableManager = NExecutableManager.getInstance(config, getProject());
+
+ NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), readOnlyLayouts, "ADMIN", null);
+ executableManager.addJob(job);
+ return IndexDataConstructor.wait(job);
+ }
+}
diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/DataflowCleanerCLI.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/DataflowCleanerCLI.java
index 22f2e5f93a..a6390150a1 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/DataflowCleanerCLI.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/DataflowCleanerCLI.java
@@ -78,7 +78,7 @@ public class DataflowCleanerCLI {
val layoutIds = getLayouts(dataflow);
val toBeRemoved = Sets.<Long> newHashSet();
for (NDataSegment segment : dataflow.getSegments()) {
- toBeRemoved.addAll(segment.getSegDetails().getLayouts().stream().map(NDataLayout::getLayoutId)
+ toBeRemoved.addAll(segment.getSegDetails().getAllLayouts().stream().map(NDataLayout::getLayoutId)
.filter(id -> !layoutIds.contains(id)).collect(Collectors.toSet()));
}
dataflowManager.removeLayouts(dataflow, Lists.newArrayList(toBeRemoved));