You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/06/08 08:52:16 UTC
incubator-kylin git commit: KYLIN-816 Allow gap in cube segments,
mainly for streaming case
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 d4e5120f3 -> 44b3c6a3a
KYLIN-816 Allow gap in cube segments, mainly for streaming case
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/44b3c6a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/44b3c6a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/44b3c6a3
Branch: refs/heads/0.8.0
Commit: 44b3c6a3ada15f9785df035ef557d3bfed1454fa
Parents: d4e5120
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jun 8 14:52:01 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jun 8 14:52:01 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeInstance.java | 4 +
.../java/org/apache/kylin/cube/CubeManager.java | 46 +++++---
.../java/org/apache/kylin/cube/CubeSegment.java | 2 +-
.../org/apache/kylin/cube/CubeManagerTest.java | 104 ++++++++++++++++++-
.../org/apache/kylin/cube/CubeSegmentsTest.java | 34 +++++-
5 files changed, 168 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index d744bdf..f62904e 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -409,6 +409,10 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
return autoMergeTimeRanges;
}
+ public void setAutoMergeTimeRanges(long[] autoMergeTimeRanges) {
+ this.autoMergeTimeRanges = autoMergeTimeRanges;
+ }
+
public boolean needAutoMerge() {
if (!this.getDescriptor().getModel().getPartitionDesc().isPartitioned())
return false;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 5fc5584..b990d92 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -342,19 +342,22 @@ public class CubeManager implements IRealizationProvider {
return appendSegments(cube, endDate, true, true);
}
- public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean checkNoBuilding, boolean saveChange) throws IOException {
- if (checkNoBuilding)
- checkNoBuildingSegment(cube);
-
- CubeSegment newSegment;
+ public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean strictChecking, boolean saveChange) throws IOException {
+ long startDate = 0;
if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
- long startDate = calculateStartDateForAppendSegment(cube);
- newSegment = newSegment(cube, startDate, endDate);
+ startDate = calculateStartDateForAppendSegment(cube);
} else {
- newSegment = newSegment(cube, 0, Long.MAX_VALUE);
+ endDate = Long.MAX_VALUE;
}
+ return appendSegments(cube, startDate, endDate, strictChecking, saveChange);
+ }
- validateNewSegments(cube, newSegment);
+ public CubeSegment appendSegments(CubeInstance cube, long startDate, long endDate, boolean strictChecking, boolean saveChange) throws IOException {
+ if (strictChecking)
+ checkNoBuildingSegment(cube);
+
+ CubeSegment newSegment = newSegment(cube, startDate, endDate);
+ validateNewSegments(cube, strictChecking, newSegment);
if (saveChange) {
@@ -546,20 +549,29 @@ public class CubeManager implements IRealizationProvider {
for (int i = timeRanges.length - 1; i >= 0; i--) {
long toMergeRange = timeRanges[i];
long currentRange = 0;
+ long lastEndTime = 0;
List<CubeSegment> toMergeSegments = Lists.newArrayList();
for (CubeSegment segment : readySegments) {
long thisSegmentRange = segment.getDateRangeEnd() - segment.getDateRangeStart();
- if (thisSegmentRange >= toMergeRange) {
+ if (thisSegmentRange >= toMergeRange ) {
// this segment and its previous segments will not be merged
toMergeSegments.clear();
currentRange = 0;
+ lastEndTime = segment.getDateRangeEnd();
continue;
}
+ if (segment.getDateRangeStart() != lastEndTime && toMergeSegments.isEmpty() == false) {
+ // gap exists, give up the small segments before the gap;
+ toMergeSegments.clear();
+ currentRange = 0;
+ }
+
currentRange += thisSegmentRange;
if (currentRange < toMergeRange) {
toMergeSegments.add(segment);
+ lastEndTime = segment.getDateRangeEnd();
} else {
// merge
toMergeSegments.add(segment);
@@ -576,7 +588,7 @@ public class CubeManager implements IRealizationProvider {
}
public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
- List<CubeSegment> tobe = calculateToBeSegments(cube);
+ List<CubeSegment> tobe = calculateToBeSegments(cube, false);
for (CubeSegment seg : newSegments) {
if (tobe.contains(seg) == false)
@@ -610,7 +622,11 @@ public class CubeManager implements IRealizationProvider {
}
public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
- List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
+ validateNewSegments(cube, true, newSegments);
+ }
+
+ public void validateNewSegments(CubeInstance cube, boolean strictChecking, CubeSegment... newSegments) {
+ List<CubeSegment> tobe = calculateToBeSegments(cube, strictChecking, newSegments);
List<CubeSegment> newList = Arrays.asList(newSegments);
if (tobe.containsAll(newList) == false) {
throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
@@ -623,7 +639,7 @@ public class CubeManager implements IRealizationProvider {
* - Favors new segments over the old
* - Favors big segments over the small
*/
- private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... newSegments) {
+ private List<CubeSegment> calculateToBeSegments(CubeInstance cube, boolean strictChecking, CubeSegment... newSegments) {
CubeDesc cubeDesc = cube.getDescriptor();
PartitionDesc partDesc = cubeDesc.getModel().getPartitionDesc();
@@ -638,7 +654,7 @@ public class CubeManager implements IRealizationProvider {
// check first segment start time
CubeSegment firstSeg = tobe.get(0);
- if (firstSeg.getDateRangeStart() != partDesc.getPartitionDateStart()) {
+ if (strictChecking && firstSeg.getDateRangeStart() != partDesc.getPartitionDateStart()) {
throw new IllegalStateException("For " + cube + ", the first segment, " + firstSeg + ", must start at " + partDesc.getPartitionDateStart());
}
firstSeg.validate();
@@ -680,7 +696,7 @@ public class CubeManager implements IRealizationProvider {
}
// if i, j in sequence
- if (is.getDateRangeEnd() == js.getDateRangeStart()) {
+ if ((!strictChecking && is.getDateRangeEnd() <= js.getDateRangeStart() || strictChecking && is.getDateRangeEnd() == js.getDateRangeStart())) {
i++;
j++;
continue;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index fbc014b..035b5eb 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -268,7 +268,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
public void validate() {
if (cubeInstance.getDescriptor().getModel().getPartitionDesc().isPartitioned() && dateRangeStart >= dateRangeEnd)
- throw new IllegalStateException("dateRangeStart(" + dateRangeStart + ") must be greater than dateRangeEnd(" + dateRangeEnd + ") in segment " + this);
+ throw new IllegalStateException("dateRangeStart(" + dateRangeStart + ") must be smaller than dateRangeEnd(" + dateRangeEnd + ") in segment " + this);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 3b0bd3f..ddf89d9 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.junit.After;
@@ -84,14 +85,107 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
}
@Test
- public void testAutoMerge() throws Exception {
- CubeManager cubeManager = CubeManager.getInstance(getTestConfig());
- CubeInstance cube = cubeManager.getCube("test_kylin_cube_with_slr_ready_2_segments");
- CubeSegment newSeg = cubeManager.autoMergeCubeSegments(cube);
+ public void testAutoMergeNormal() throws Exception {
+ CubeManager mgr = CubeManager.getInstance(getTestConfig());
+ CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+
+ cube.setAutoMergeTimeRanges(new long[] {2000, 6000});
+ mgr.updateCube(new CubeBuilder(cube));
+
+ assertTrue(cube.needAutoMerge());
+
+ // no segment at first
+ assertEquals(0, cube.getSegments().size());
+
+ // append first
+ CubeSegment seg1 = mgr.appendSegments(cube, 1000);
+ seg1.setStatus(SegmentStatusEnum.READY);
+
+
+ CubeSegment seg2 = mgr.appendSegments(cube, 2000);
+ seg2.setStatus(SegmentStatusEnum.READY);
+
+ CubeBuilder cubeBuilder = new CubeBuilder(cube);
+
+ mgr.updateCube(cubeBuilder);
+
+ assertEquals(2, cube.getSegments().size());
+
+ CubeSegment mergedSeg = mgr.autoMergeCubeSegments(cube);
+
+ assertTrue(mergedSeg != null);
- assertNotNull(newSeg);
}
+
+
+ @Test
+ public void testAutoMergeWithGap() throws Exception {
+ CubeManager mgr = CubeManager.getInstance(getTestConfig());
+ CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+
+ cube.setAutoMergeTimeRanges(new long[] {2000, 6000});
+ mgr.updateCube(new CubeBuilder(cube));
+
+ assertTrue(cube.needAutoMerge());
+
+ // no segment at first
+ assertEquals(0, cube.getSegments().size());
+
+ // append first
+ CubeSegment seg1 = mgr.appendSegments(cube, 1000);
+ seg1.setStatus(SegmentStatusEnum.READY);
+
+
+ CubeSegment seg3 = mgr.appendSegments(cube, 2000, 4000, false, false);
+ seg3.setStatus(SegmentStatusEnum.READY);
+
+ CubeBuilder cubeBuilder = new CubeBuilder(cube);
+ cubeBuilder.setToAddSegs(seg3);
+ cubeBuilder.setToUpdateSegs(seg1);
+
+ mgr.updateCube(cubeBuilder);
+
+ assertEquals(2, cube.getSegments().size());
+
+ CubeSegment mergedSeg = mgr.autoMergeCubeSegments(cube);
+
+ assertTrue(mergedSeg == null);
+
+ // append a new seg which will be merged
+
+ CubeSegment seg4 = mgr.appendSegments(cube, 4000, 8000, false, false);
+ seg4.setStatus(SegmentStatusEnum.READY);
+
+ cubeBuilder = new CubeBuilder(cube);
+ cubeBuilder.setToAddSegs(seg4);
+
+ mgr.updateCube(cubeBuilder);
+
+ assertEquals(3, cube.getSegments().size());
+
+ mergedSeg = mgr.autoMergeCubeSegments(cube);
+
+ assertTrue(mergedSeg != null);
+ assertTrue(mergedSeg.getDateRangeStart() == 2000 && mergedSeg.getDateRangeEnd() == 8000);
+
+
+ // fill the gap
+
+ CubeSegment seg2 = mgr.appendSegments(cube, 1000, 2000, true, true);
+ seg2.setStatus(SegmentStatusEnum.READY);
+
+ assertEquals(4, cube.getSegments().size());
+
+ mergedSeg = mgr.autoMergeCubeSegments(cube);
+
+ assertTrue(mergedSeg != null);
+ assertTrue(mergedSeg.getDateRangeStart() == 0 && mergedSeg.getDateRangeEnd() == 8000);
+ }
+
+
+
+
public CubeDescManager getCubeDescManager() {
return CubeDescManager.getInstance(getTestConfig());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/44b3c6a3/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index 0e00899..a7dbb74 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -158,7 +158,39 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
assertEquals(merge, cube.getSegments().get(1));
assertEquals(seg2, cube.getSegments().get(2));
}
-
+
+
+ @Test
+ public void testAllowGap() throws IOException {
+
+ CubeManager mgr = mgr();
+ CubeInstance cube = mgr.getCube("test_kylin_cube_without_slr_left_join_empty");
+
+ // no segment at first
+ assertEquals(0, cube.getSegments().size());
+
+ // append first
+ CubeSegment seg1 = mgr.appendSegments(cube, 1000);
+ seg1.setStatus(SegmentStatusEnum.READY);
+
+ assertEquals(1, cube.getSegments().size());
+
+ CubeSegment seg3 = mgr.appendSegments(cube, 2000, 3000, false, false);
+ seg3.setStatus(SegmentStatusEnum.READY);
+ CubeBuilder builder = new CubeBuilder(cube).setToAddSegs(seg3);
+
+ mgr.updateCube(builder);
+ assertEquals(2, cube.getSegments().size());
+
+ CubeSegment seg2 = mgr.appendSegments(cube, 1000, 2000, false, false);
+ builder = new CubeBuilder(cube).setToAddSegs(seg2);
+ mgr.updateCube(builder);
+ assertEquals(3, cube.getSegments().size());
+
+ }
+
+
+
private void discard(Object o) {
// throw away input parameter
}