You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/02 13:03:03 UTC
[kylin] branch master updated: KYLIN-4273 Make cube planner works
for real-time streaming job
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 8a8d1b0 KYLIN-4273 Make cube planner works for real-time streaming job
8a8d1b0 is described below
commit 8a8d1b0a081b0040284bb612254bce5007cf0729
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu Nov 28 19:38:20 2019 +0800
KYLIN-4273 Make cube planner works for real-time streaming job
---
.../kylin/stream/coordinator/Coordinator.java | 36 +++++++++++++++++-
.../coordinator/coordinate/BuildJobSubmitter.java | 44 +++++++++++++++++++---
.../coordinate/BuildJobSubmitterTest.java | 16 ++++----
.../coordinator/coordinate/StreamingTestBase.java | 9 +++++
4 files changed, 91 insertions(+), 14 deletions(-)
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index 46a9bcf..938c0b4 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -56,6 +56,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.StreamingCubingEngine;
+import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
@@ -1133,12 +1134,20 @@ public class Coordinator implements CoordinatorClient {
private List<String> findSegmentsCanBuild(String cubeName) {
List<String> result = Lists.newArrayList();
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+ // in optimization
+ if (isInOptimize(cubeInstance)) {
+ return result;
+ }
+ int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments();
CubeSegment latestHistoryReadySegment = cubeInstance.getLatestReadySegment();
long minSegmentStart = -1;
if (latestHistoryReadySegment != null) {
minSegmentStart = latestHistoryReadySegment.getTSRange().end.v;
+ } else {
+ // there is no ready segment, to make cube planner work, only 1 segment can build
+ logger.info("there is no ready segments for cube:{}, so only allow 1 segment build concurrently", cubeName);
+ allowMaxBuildingSegments = 1;
}
- int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments();
CubeAssignment assignments = streamMetadataStore.getAssignmentsByCube(cubeName);
Set<Integer> cubeAssignedReplicaSets = assignments.getReplicaSetIDs();
@@ -1214,6 +1223,31 @@ public class Coordinator implements CoordinatorClient {
return result;
}
+ private boolean isInOptimize(CubeInstance cube) {
+ Segments<CubeSegment> readyPendingSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING);
+ if (readyPendingSegments.size() > 0) {
+ logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building",
+ cube.getName(), readyPendingSegments);
+ return true;
+ }
+ Segments<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW);
+ for (CubeSegment newSegment : newSegments) {
+ String jobId = newSegment.getLastBuildJobID();
+ if (jobId == null) {
+ continue;
+ }
+ AbstractExecutable job = getExecutableManager().getJob(jobId);
+ if (job != null && job instanceof CubingJob) {
+ CubingJob cubingJob = (CubingJob) job;
+ if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(cubingJob.getJobType())) {
+ logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cube.getName());
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
/**
* <pre>
* When all replica sets have uploaded their local segment cache to remote, we can mark
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
index 9e32f46..6f5bb0d 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
@@ -27,10 +27,12 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.StreamingCubingEngine;
+import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.stream.coordinator.StreamingCubeInfo;
import org.apache.kylin.stream.coordinator.coordinate.annotations.NonSideEffect;
import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
@@ -148,7 +150,7 @@ public class BuildJobSubmitter implements Runnable {
if (checkTimes % 100 == 1) {
logger.info("Force traverse all cubes periodically.");
for (StreamingCubeInfo cubeInfo : coordinator.getEnableStreamingCubes()) {
- List<String> segmentList = checkSegmentBuidJobFromMetadata(cubeInfo.getCubeName());
+ List<String> segmentList = checkSegmentBuildJobFromMetadata(cubeInfo.getCubeName());
for (String segmentName : segmentList) {
submitSegmentBuildJob(cubeInfo.getCubeName(), segmentName);
}
@@ -218,7 +220,7 @@ public class BuildJobSubmitter implements Runnable {
Iterator<String> iterator = cubeCheckList.iterator();
while (iterator.hasNext()) {
String cubeName = iterator.next();
- List<String> segmentList = checkSegmentBuidJobFromMetadata(cubeName);
+ List<String> segmentList = checkSegmentBuildJobFromMetadata(cubeName);
boolean allSubmited = true;
for (String segmentName : segmentList) {
boolean ok = submitSegmentBuildJob(cubeName, segmentName);
@@ -241,16 +243,23 @@ public class BuildJobSubmitter implements Runnable {
* @return list of segment which could be submitted a segment build job
*/
@NonSideEffect
- List<String> checkSegmentBuidJobFromMetadata(String cubeName) {
+ List<String> checkSegmentBuildJobFromMetadata(String cubeName) {
List<String> result = Lists.newArrayList();
CubeInstance cubeInstance = coordinator.getCubeManager().getCube(cubeName);
+ // in optimization
+ if (isInOptimize(cubeInstance)) {
+ return result;
+ }
+ int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments();
CubeSegment latestHistoryReadySegment = cubeInstance.getLatestReadySegment();
-
long minSegmentStart = -1;
if (latestHistoryReadySegment != null) {
minSegmentStart = latestHistoryReadySegment.getTSRange().end.v;
+ } else {
+ // there is no ready segment, to make cube planner work, only 1 segment can build
+ logger.info("there is no ready segments for cube:{}, so only allow 1 segment build concurrently", cubeName);
+ allowMaxBuildingSegments = 1;
}
- int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments();
CubeAssignment assignments = coordinator.getStreamMetadataStore().getAssignmentsByCube(cubeName);
Set<Integer> cubeAssignedReplicaSets = assignments.getReplicaSetIDs();
@@ -308,6 +317,31 @@ public class BuildJobSubmitter implements Runnable {
return result;
}
+ private boolean isInOptimize(CubeInstance cube) {
+ Segments<CubeSegment> readyPendingSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING);
+ if (readyPendingSegments.size() > 0) {
+ logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building",
+ cube.getName(), readyPendingSegments);
+ return true;
+ }
+ Segments<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW);
+ for (CubeSegment newSegment : newSegments) {
+ String jobId = newSegment.getLastBuildJobID();
+ if (jobId == null) {
+ continue;
+ }
+ AbstractExecutable job = coordinator.getExecutableManager().getJob(jobId);
+ if (job != null && job instanceof CubingJob) {
+ CubingJob cubingJob = (CubingJob) job;
+ if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(cubingJob.getJobType())) {
+ logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cube.getName());
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
/**
* Submit a build job for streaming segment
*
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
index 95a0f05..044534c 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
@@ -103,7 +103,7 @@ public class BuildJobSubmitterTest extends StreamingTestBase {
assertEquals(0, buildJobSubmitter.getCubeCheckList().size());
}
- void prepareTestCheckSegmentBuidJobFromMetadata() {
+ void prepareTestCheckSegmentBuildJobFromMetadata() {
CubeSegment cubeSegment = stubCubSegment(SegmentStatusEnum.NEW, 100L, 200L);
CubeInstance cubeInstance = stubCubeInstance(cubeSegment);
config = stubKylinConfig();
@@ -124,24 +124,24 @@ public class BuildJobSubmitterTest extends StreamingTestBase {
}
@Test
- public void testCheckSegmentBuidJobFromMetadata() {
- prepareTestCheckSegmentBuidJobFromMetadata();
+ public void testCheckSegmentBuildJobFromMetadata() {
+ prepareTestCheckSegmentBuildJobFromMetadata();
BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator);
buildJobSubmitter.restore();
- List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName2);
+ List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName2);
assertEquals(1, segmentReadyList.size());
- segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName3);
+ segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName3);
assertEquals(1, segmentReadyList.size());
}
@Test
- public void testCheckSegmentBuidJobFromMetadata1() {
- prepareTestCheckSegmentBuidJobFromMetadata();
+ public void testCheckSegmentBuildJobFromMetadata1() {
+ prepareTestCheckSegmentBuildJobFromMetadata();
BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator);
buildJobSubmitter.restore();
- List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName4);
+ List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName4);
verify(executableManager, times(1)).resumeJob(eq(mockBuildJob4));
assertEquals(0, segmentReadyList.size());
}
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
index 5308d79..c7b1ac5 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
@@ -315,14 +315,23 @@ public class StreamingTestBase extends LocalFileMetadataTestCase {
CubeInstance stubCubeInstance(CubeSegment cubSegment) {
CubeInstance cubeInstance = mock(CubeInstance.class);
+ CubeSegment readySegment = stubCubSegment(SegmentStatusEnum.READY, 0L, 1L);
when(cubeInstance.latestCopyForWrite()).thenReturn(cubeInstance);
@SuppressWarnings("unchecked")
Segments<CubeSegment> segmentSegments = mock(Segments.class, RETURNS_DEEP_STUBS);
+ Segments<CubeSegment> optimizedSegments = mock(Segments.class, RETURNS_DEEP_STUBS);
+
when(segmentSegments.size()).thenReturn(1);
when(cubeInstance.getBuildingSegments()).thenReturn(segmentSegments);
when(cubeInstance.getName()).thenReturn(cubeName1);
when(cubeInstance.getSegment(anyString(), Matchers.any())).thenReturn(cubSegment);
+
+ when(optimizedSegments.size()).thenReturn(0);
+ when(cubeInstance.getLatestReadySegment()).thenReturn(readySegment);
+ when(cubeInstance.getSegments(SegmentStatusEnum.READY_PENDING)).thenReturn(optimizedSegments);
+ when(cubeInstance.getSegments(SegmentStatusEnum.NEW)).thenReturn(segmentSegments);
+
return cubeInstance;
}