You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/06/08 08:52:16 UTC

incubator-kylin git commit: KYLIN-816 Allow gap in cube segments, mainly for streaming case

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 d4e5120f3 -> 44b3c6a3a


KYLIN-816 Allow gap in cube segments, mainly for streaming case

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/44b3c6a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/44b3c6a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/44b3c6a3

Branch: refs/heads/0.8.0
Commit: 44b3c6a3ada15f9785df035ef557d3bfed1454fa
Parents: d4e5120
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jun 8 14:52:01 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jun 8 14:52:01 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     |   4 +
 .../java/org/apache/kylin/cube/CubeManager.java |  46 +++++---
 .../java/org/apache/kylin/cube/CubeSegment.java |   2 +-
 .../org/apache/kylin/cube/CubeManagerTest.java  | 104 ++++++++++++++++++-
 .../org/apache/kylin/cube/CubeSegmentsTest.java |  34 +++++-
 5 files changed, 168 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index d744bdf..f62904e 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -409,6 +409,10 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
         return autoMergeTimeRanges;
     }
 
+    public void setAutoMergeTimeRanges(long[] autoMergeTimeRanges) {
+        this.autoMergeTimeRanges = autoMergeTimeRanges;
+    }
+
     public boolean needAutoMerge() {
         if (!this.getDescriptor().getModel().getPartitionDesc().isPartitioned())
             return false;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 5fc5584..b990d92 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -342,19 +342,22 @@ public class CubeManager implements IRealizationProvider {
         return appendSegments(cube, endDate, true, true);
     }
 
-    public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean checkNoBuilding, boolean saveChange) throws IOException {
-        if (checkNoBuilding)
-            checkNoBuildingSegment(cube);
-
-        CubeSegment newSegment;
+    public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean strictChecking, boolean saveChange) throws IOException {
+        long startDate = 0;
         if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
-            long startDate = calculateStartDateForAppendSegment(cube);
-            newSegment = newSegment(cube, startDate, endDate);
+            startDate = calculateStartDateForAppendSegment(cube);
         } else {
-            newSegment = newSegment(cube, 0, Long.MAX_VALUE);
+            endDate = Long.MAX_VALUE;
         }
+        return appendSegments(cube, startDate, endDate, strictChecking, saveChange);
+    }
 
-        validateNewSegments(cube, newSegment);
+    public CubeSegment appendSegments(CubeInstance cube, long startDate,  long endDate, boolean strictChecking, boolean saveChange) throws IOException {
+        if (strictChecking)
+            checkNoBuildingSegment(cube);
+
+        CubeSegment newSegment = newSegment(cube, startDate, endDate);
+        validateNewSegments(cube, strictChecking, newSegment);
 
         if (saveChange) {
 
@@ -546,20 +549,29 @@ public class CubeManager implements IRealizationProvider {
         for (int i = timeRanges.length - 1; i >= 0; i--) {
             long toMergeRange = timeRanges[i];
             long currentRange = 0;
+            long lastEndTime = 0;
             List<CubeSegment> toMergeSegments = Lists.newArrayList();
             for (CubeSegment segment : readySegments) {
                 long thisSegmentRange = segment.getDateRangeEnd() - segment.getDateRangeStart();
 
-                if (thisSegmentRange >= toMergeRange) {
+                if (thisSegmentRange >= toMergeRange ) {
                     // this segment and its previous segments will not be merged
                     toMergeSegments.clear();
                     currentRange = 0;
+                    lastEndTime = segment.getDateRangeEnd();
                     continue;
                 }
 
+                if (segment.getDateRangeStart() != lastEndTime && toMergeSegments.isEmpty() == false) {
+                    // gap exists, give up the small segments before the gap;
+                    toMergeSegments.clear();
+                    currentRange = 0;
+                }
+
                 currentRange += thisSegmentRange;
                 if (currentRange < toMergeRange) {
                     toMergeSegments.add(segment);
+                    lastEndTime = segment.getDateRangeEnd();
                 } else {
                     // merge
                     toMergeSegments.add(segment);
@@ -576,7 +588,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
-        List<CubeSegment> tobe = calculateToBeSegments(cube);
+        List<CubeSegment> tobe = calculateToBeSegments(cube, false);
 
         for (CubeSegment seg : newSegments) {
             if (tobe.contains(seg) == false)
@@ -610,7 +622,11 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
-        List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
+        validateNewSegments(cube, true, newSegments);
+    }
+
+    public void validateNewSegments(CubeInstance cube, boolean strictChecking, CubeSegment... newSegments) {
+        List<CubeSegment> tobe = calculateToBeSegments(cube, strictChecking, newSegments);
         List<CubeSegment> newList = Arrays.asList(newSegments);
         if (tobe.containsAll(newList) == false) {
             throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
@@ -623,7 +639,7 @@ public class CubeManager implements IRealizationProvider {
      * - Favors new segments over the old
      * - Favors big segments over the small
      */
-    private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... newSegments) {
+    private List<CubeSegment> calculateToBeSegments(CubeInstance cube, boolean strictChecking, CubeSegment... newSegments) {
         CubeDesc cubeDesc = cube.getDescriptor();
         PartitionDesc partDesc = cubeDesc.getModel().getPartitionDesc();
 
@@ -638,7 +654,7 @@ public class CubeManager implements IRealizationProvider {
 
         // check first segment start time
         CubeSegment firstSeg = tobe.get(0);
-        if (firstSeg.getDateRangeStart() != partDesc.getPartitionDateStart()) {
+        if (strictChecking && firstSeg.getDateRangeStart() != partDesc.getPartitionDateStart()) {
             throw new IllegalStateException("For " + cube + ", the first segment, " + firstSeg + ", must start at " + partDesc.getPartitionDateStart());
         }
         firstSeg.validate();
@@ -680,7 +696,7 @@ public class CubeManager implements IRealizationProvider {
             }
 
             // if i, j in sequence
-            if (is.getDateRangeEnd() == js.getDateRangeStart()) {
+            if ((!strictChecking && is.getDateRangeEnd() <= js.getDateRangeStart() || strictChecking && is.getDateRangeEnd() == js.getDateRangeStart())) {
                 i++;
                 j++;
                 continue;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index fbc014b..035b5eb 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -268,7 +268,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
 
     public void validate() {
         if (cubeInstance.getDescriptor().getModel().getPartitionDesc().isPartitioned() && dateRangeStart >= dateRangeEnd)
-            throw new IllegalStateException("dateRangeStart(" + dateRangeStart + ") must be greater than dateRangeEnd(" + dateRangeEnd + ") in segment " + this);
+            throw new IllegalStateException("dateRangeStart(" + dateRangeStart + ") must be smaller than dateRangeEnd(" + dateRangeEnd + ") in segment " + this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 3b0bd3f..ddf89d9 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.junit.After;
@@ -84,14 +85,107 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
     }
 
     @Test
-    public void testAutoMerge() throws Exception {
-        CubeManager cubeManager = CubeManager.getInstance(getTestConfig());
-        CubeInstance cube = cubeManager.getCube("test_kylin_cube_with_slr_ready_2_segments");
-        CubeSegment newSeg = cubeManager.autoMergeCubeSegments(cube);
+    public void testAutoMergeNormal() throws Exception {
+        CubeManager mgr = CubeManager.getInstance(getTestConfig());
+        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+
+        cube.setAutoMergeTimeRanges(new long[] {2000, 6000});
+        mgr.updateCube(new CubeBuilder(cube));
+
+        assertTrue(cube.needAutoMerge());
+
+        // no segment at first
+        assertEquals(0, cube.getSegments().size());
+
+        // append first
+        CubeSegment seg1 = mgr.appendSegments(cube, 1000);
+        seg1.setStatus(SegmentStatusEnum.READY);
+
+
+        CubeSegment seg2 = mgr.appendSegments(cube, 2000);
+        seg2.setStatus(SegmentStatusEnum.READY);
+
+        CubeBuilder cubeBuilder = new CubeBuilder(cube);
+
+        mgr.updateCube(cubeBuilder);
+
+        assertEquals(2, cube.getSegments().size());
+
+        CubeSegment mergedSeg = mgr.autoMergeCubeSegments(cube);
+
+        assertTrue(mergedSeg != null);
 
-        assertNotNull(newSeg);
     }
 
+
+
+    @Test
+    public void testAutoMergeWithGap() throws Exception {
+        CubeManager mgr = CubeManager.getInstance(getTestConfig());
+        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+
+        cube.setAutoMergeTimeRanges(new long[] {2000, 6000});
+        mgr.updateCube(new CubeBuilder(cube));
+
+        assertTrue(cube.needAutoMerge());
+
+        // no segment at first
+        assertEquals(0, cube.getSegments().size());
+
+        // append first
+        CubeSegment seg1 = mgr.appendSegments(cube, 1000);
+        seg1.setStatus(SegmentStatusEnum.READY);
+
+
+        CubeSegment seg3 = mgr.appendSegments(cube, 2000, 4000, false, false);
+        seg3.setStatus(SegmentStatusEnum.READY);
+
+        CubeBuilder cubeBuilder = new CubeBuilder(cube);
+        cubeBuilder.setToAddSegs(seg3);
+        cubeBuilder.setToUpdateSegs(seg1);
+
+        mgr.updateCube(cubeBuilder);
+
+        assertEquals(2, cube.getSegments().size());
+
+        CubeSegment mergedSeg = mgr.autoMergeCubeSegments(cube);
+
+        assertTrue(mergedSeg == null);
+
+        // append a new seg which will be merged
+
+        CubeSegment seg4 = mgr.appendSegments(cube, 4000, 8000, false, false);
+        seg4.setStatus(SegmentStatusEnum.READY);
+
+        cubeBuilder = new CubeBuilder(cube);
+        cubeBuilder.setToAddSegs(seg4);
+
+        mgr.updateCube(cubeBuilder);
+
+        assertEquals(3, cube.getSegments().size());
+
+        mergedSeg = mgr.autoMergeCubeSegments(cube);
+
+        assertTrue(mergedSeg != null);
+        assertTrue(mergedSeg.getDateRangeStart() == 2000 && mergedSeg.getDateRangeEnd() == 8000);
+
+
+        // fill the gap
+
+        CubeSegment seg2 = mgr.appendSegments(cube, 1000, 2000, true, true);
+        seg2.setStatus(SegmentStatusEnum.READY);
+
+        assertEquals(4, cube.getSegments().size());
+
+        mergedSeg = mgr.autoMergeCubeSegments(cube);
+
+        assertTrue(mergedSeg != null);
+        assertTrue(mergedSeg.getDateRangeStart() == 0 && mergedSeg.getDateRangeEnd() == 8000);
+    }
+
+
+
+
     public CubeDescManager getCubeDescManager() {
         return CubeDescManager.getInstance(getTestConfig());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index 0e00899..a7dbb74 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -158,7 +158,39 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
         assertEquals(merge, cube.getSegments().get(1));
         assertEquals(seg2, cube.getSegments().get(2));
     }
-    
+
+
+    @Test
+    public void testAllowGap()  throws IOException {
+
+        CubeManager mgr = mgr();
+        CubeInstance cube = mgr.getCube("test_kylin_cube_without_slr_left_join_empty");
+
+        // no segment at first
+        assertEquals(0, cube.getSegments().size());
+
+        // append first
+        CubeSegment seg1 = mgr.appendSegments(cube, 1000);
+        seg1.setStatus(SegmentStatusEnum.READY);
+
+        assertEquals(1, cube.getSegments().size());
+
+        CubeSegment seg3 = mgr.appendSegments(cube, 2000, 3000, false, false);
+        seg3.setStatus(SegmentStatusEnum.READY);
+        CubeBuilder builder = new CubeBuilder(cube).setToAddSegs(seg3);
+
+        mgr.updateCube(builder);
+        assertEquals(2, cube.getSegments().size());
+
+        CubeSegment seg2 = mgr.appendSegments(cube, 1000, 2000, false, false);
+        builder = new CubeBuilder(cube).setToAddSegs(seg2);
+        mgr.updateCube(builder);
+        assertEquals(3, cube.getSegments().size());
+
+    }
+
+
+
     private void discard(Object o) {
         // throw away input parameter
     }