You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/06/27 07:29:47 UTC
[kylin] branch master updated: Add rule for submit build job:
forbid submitting appending segment in the lambda stream cube (#228)
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 9b473d1 Add rule for submit build job: forbid submitting appending segment in the lambda stream cube (#228)
9b473d1 is described below
commit 9b473d14a18044794ce0f23047df715e7ed34d0a
Author: kliu3 <li...@apache.org>
AuthorDate: Fri Apr 16 14:26:17 2021 +0800
Add rule for submit build job: forbid submitting appending segment in the lambda stream cube (#228)
---
.../main/java/org/apache/kylin/cube/CubeInstance.java | 4 ++++
.../org/apache/kylin/rest/service/JobService.java | 19 +++++++++++++++++++
2 files changed, 23 insertions(+)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 7c81e4f..ee98b54 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -773,4 +773,8 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
throw new IllegalStateException("No segment's last build job ID equals " + jobID);
}
+ public boolean isStreamLambdaCube() {
+ return getModel().getRootFactTable().getTableDesc().isLambdaTable();
+ }
+
}
\ No newline at end of file
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index a5f44b5..41c74f7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -237,6 +237,9 @@ public class JobService extends BasicService implements InitializingBean {
checkCubeDescSignature(cube);
checkAllowBuilding(cube);
+ if (cube.isStreamLambdaCube() && buildType == CubeBuildTypeEnum.BUILD) {
+ checkStreamLambdaBuildingSegment(cube, tsRange);
+ }
if (buildType == CubeBuildTypeEnum.BUILD || buildType == CubeBuildTypeEnum.REFRESH) {
checkAllowParallelBuilding(cube);
@@ -435,6 +438,22 @@ public class JobService extends BasicService implements InitializingBean {
}
}
+ private void checkStreamLambdaBuildingSegment(CubeInstance cube, TSRange tsRange) {
+ if (tsRange == null || tsRange.end.v < tsRange.start.v) {
+ throw new BadRequestException("The tsRange is invalid " + tsRange +
+ "for the cube " + cube.getName() +
+ ". It's not allowed to submit a build job");
+ }
+ CubeSegment latestReadySegment = cube.getLatestReadySegment();
+
+ if (latestReadySegment != null && tsRange.end.v > latestReadySegment.getTSRange().end.v) {
+ throw new BadRequestException(
+ "The stream cube " + cube.getName() + "can't be submitted the appending segment. " +
+ "The latest ready segment tsrange is " + latestReadySegment.getTSRange() +
+ ", and the appending segment tsrange is " + tsRange);
+ }
+ }
+
private void checkAllowParallelBuilding(CubeInstance cube) {
if (cube.getConfig().isCubePlannerEnabled()) {
if (cube.getCuboids() == null) {