You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/02 13:03:03 UTC

[kylin] branch master updated: KYLIN-4273 Make cube planner works for real-time streaming job

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a8d1b0  KYLIN-4273 Make cube planner works for real-time streaming job
8a8d1b0 is described below

commit 8a8d1b0a081b0040284bb612254bce5007cf0729
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu Nov 28 19:38:20 2019 +0800

    KYLIN-4273 Make cube planner works for real-time streaming job
---
 .../kylin/stream/coordinator/Coordinator.java      | 36 +++++++++++++++++-
 .../coordinator/coordinate/BuildJobSubmitter.java  | 44 +++++++++++++++++++---
 .../coordinate/BuildJobSubmitterTest.java          | 16 ++++----
 .../coordinator/coordinate/StreamingTestBase.java  |  9 +++++
 4 files changed, 91 insertions(+), 14 deletions(-)

diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index 46a9bcf..938c0b4 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -56,6 +56,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.StreamingCubingEngine;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -1133,12 +1134,20 @@ public class Coordinator implements CoordinatorClient {
     private List<String> findSegmentsCanBuild(String cubeName) {
         List<String> result = Lists.newArrayList();
         CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        // in optimization
+        if (isInOptimize(cubeInstance)) {
+            return result;
+        }
+        int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments();
         CubeSegment latestHistoryReadySegment = cubeInstance.getLatestReadySegment();
         long minSegmentStart = -1;
         if (latestHistoryReadySegment != null) {
             minSegmentStart = latestHistoryReadySegment.getTSRange().end.v;
+        } else {
+            // there is no ready segment, to make cube planner work, only 1 segment can build
+            logger.info("there is no ready segments for cube:{}, so only allow 1 segment build concurrently", cubeName);
+            allowMaxBuildingSegments = 1;
         }
-        int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments();
 
         CubeAssignment assignments = streamMetadataStore.getAssignmentsByCube(cubeName);
         Set<Integer> cubeAssignedReplicaSets = assignments.getReplicaSetIDs();
@@ -1214,6 +1223,31 @@ public class Coordinator implements CoordinatorClient {
         return result;
     }
 
+    private boolean isInOptimize(CubeInstance cube) {
+        Segments<CubeSegment> readyPendingSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING);
+        if (readyPendingSegments.size() > 0) {
+            logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building",
+                cube.getName(), readyPendingSegments);
+            return true;
+        }
+        Segments<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW);
+        for (CubeSegment newSegment : newSegments) {
+            String jobId = newSegment.getLastBuildJobID();
+            if (jobId == null) {
+                continue;
+            }
+            AbstractExecutable job = getExecutableManager().getJob(jobId);
+            if (job != null && job instanceof CubingJob) {
+                CubingJob cubingJob = (CubingJob) job;
+                if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(cubingJob.getJobType())) {
+                    logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cube.getName());
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     /**
      * <pre>
      *     When all replica sets have uploaded their local segment cache to remote, we can mark
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
index 9e32f46..6f5bb0d 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
@@ -27,10 +27,12 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.StreamingCubingEngine;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.stream.coordinator.StreamingCubeInfo;
 import org.apache.kylin.stream.coordinator.coordinate.annotations.NonSideEffect;
 import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
@@ -148,7 +150,7 @@ public class BuildJobSubmitter implements Runnable {
         if (checkTimes % 100 == 1) {
             logger.info("Force traverse all cubes periodically.");
             for (StreamingCubeInfo cubeInfo : coordinator.getEnableStreamingCubes()) {
-                List<String> segmentList = checkSegmentBuidJobFromMetadata(cubeInfo.getCubeName());
+                List<String> segmentList = checkSegmentBuildJobFromMetadata(cubeInfo.getCubeName());
                 for (String segmentName : segmentList) {
                     submitSegmentBuildJob(cubeInfo.getCubeName(), segmentName);
                 }
@@ -218,7 +220,7 @@ public class BuildJobSubmitter implements Runnable {
         Iterator<String> iterator = cubeCheckList.iterator();
         while (iterator.hasNext()) {
             String cubeName = iterator.next();
-            List<String> segmentList = checkSegmentBuidJobFromMetadata(cubeName);
+            List<String> segmentList = checkSegmentBuildJobFromMetadata(cubeName);
             boolean allSubmited = true;
             for (String segmentName : segmentList) {
                 boolean ok = submitSegmentBuildJob(cubeName, segmentName);
@@ -241,16 +243,23 @@ public class BuildJobSubmitter implements Runnable {
      * @return list of segment which could be submitted a segment build job
      */
     @NonSideEffect
-    List<String> checkSegmentBuidJobFromMetadata(String cubeName) {
+    List<String> checkSegmentBuildJobFromMetadata(String cubeName) {
         List<String> result = Lists.newArrayList();
         CubeInstance cubeInstance = coordinator.getCubeManager().getCube(cubeName);
+        // in optimization
+        if (isInOptimize(cubeInstance)) {
+            return result;
+        }
+        int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments();
         CubeSegment latestHistoryReadySegment = cubeInstance.getLatestReadySegment();
-
         long minSegmentStart = -1;
         if (latestHistoryReadySegment != null) {
             minSegmentStart = latestHistoryReadySegment.getTSRange().end.v;
+        } else {
+            // there is no ready segment, to make cube planner work, only 1 segment can build
+            logger.info("there is no ready segments for cube:{}, so only allow 1 segment build concurrently", cubeName);
+            allowMaxBuildingSegments = 1;
         }
-        int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments();
 
         CubeAssignment assignments = coordinator.getStreamMetadataStore().getAssignmentsByCube(cubeName);
         Set<Integer> cubeAssignedReplicaSets = assignments.getReplicaSetIDs();
@@ -308,6 +317,31 @@ public class BuildJobSubmitter implements Runnable {
         return result;
     }
 
+    private boolean isInOptimize(CubeInstance cube) {
+        Segments<CubeSegment> readyPendingSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING);
+        if (readyPendingSegments.size() > 0) {
+            logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building",
+                cube.getName(), readyPendingSegments);
+            return true;
+        }
+        Segments<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW);
+        for (CubeSegment newSegment : newSegments) {
+            String jobId = newSegment.getLastBuildJobID();
+            if (jobId == null) {
+                continue;
+            }
+            AbstractExecutable job = coordinator.getExecutableManager().getJob(jobId);
+            if (job != null && job instanceof CubingJob) {
+                CubingJob cubingJob = (CubingJob) job;
+                if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(cubingJob.getJobType())) {
+                    logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cube.getName());
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     /**
      * Submit a build job for streaming segment
      *
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
index 95a0f05..044534c 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
@@ -103,7 +103,7 @@ public class BuildJobSubmitterTest extends StreamingTestBase {
         assertEquals(0, buildJobSubmitter.getCubeCheckList().size());
     }
 
-    void prepareTestCheckSegmentBuidJobFromMetadata() {
+    void prepareTestCheckSegmentBuildJobFromMetadata() {
         CubeSegment cubeSegment = stubCubSegment(SegmentStatusEnum.NEW, 100L, 200L);
         CubeInstance cubeInstance = stubCubeInstance(cubeSegment);
         config = stubKylinConfig();
@@ -124,24 +124,24 @@ public class BuildJobSubmitterTest extends StreamingTestBase {
     }
 
     @Test
-    public void testCheckSegmentBuidJobFromMetadata() {
-        prepareTestCheckSegmentBuidJobFromMetadata();
+    public void testCheckSegmentBuildJobFromMetadata() {
+        prepareTestCheckSegmentBuildJobFromMetadata();
         BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator);
         buildJobSubmitter.restore();
-        List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName2);
+        List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName2);
         assertEquals(1, segmentReadyList.size());
 
-        segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName3);
+        segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName3);
         assertEquals(1, segmentReadyList.size());
     }
 
     @Test
-    public void testCheckSegmentBuidJobFromMetadata1() {
-        prepareTestCheckSegmentBuidJobFromMetadata();
+    public void testCheckSegmentBuildJobFromMetadata1() {
+        prepareTestCheckSegmentBuildJobFromMetadata();
         BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator);
         buildJobSubmitter.restore();
 
-        List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName4);
+        List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName4);
         verify(executableManager, times(1)).resumeJob(eq(mockBuildJob4));
         assertEquals(0, segmentReadyList.size());
     }
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
index 5308d79..c7b1ac5 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
@@ -315,14 +315,23 @@ public class StreamingTestBase extends LocalFileMetadataTestCase {
 
     CubeInstance stubCubeInstance(CubeSegment cubSegment) {
         CubeInstance cubeInstance = mock(CubeInstance.class);
+        CubeSegment readySegment = stubCubSegment(SegmentStatusEnum.READY, 0L, 1L);
         when(cubeInstance.latestCopyForWrite()).thenReturn(cubeInstance);
         @SuppressWarnings("unchecked")
         Segments<CubeSegment> segmentSegments = mock(Segments.class, RETURNS_DEEP_STUBS);
 
+        Segments<CubeSegment> optimizedSegments = mock(Segments.class, RETURNS_DEEP_STUBS);
+
         when(segmentSegments.size()).thenReturn(1);
         when(cubeInstance.getBuildingSegments()).thenReturn(segmentSegments);
         when(cubeInstance.getName()).thenReturn(cubeName1);
         when(cubeInstance.getSegment(anyString(), Matchers.any())).thenReturn(cubSegment);
+
+        when(optimizedSegments.size()).thenReturn(0);
+        when(cubeInstance.getLatestReadySegment()).thenReturn(readySegment);
+        when(cubeInstance.getSegments(SegmentStatusEnum.READY_PENDING)).thenReturn(optimizedSegments);
+        when(cubeInstance.getSegments(SegmentStatusEnum.NEW)).thenReturn(segmentSegments);
+
         return cubeInstance;
     }