You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/29 14:59:17 UTC
kylin git commit: KYLIN-2059 Concurrent build issue in
CubeManager.calculateToBeSegments()
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2059 [created] 61c5aa9c5
KYLIN-2059 Concurrent build issue in CubeManager.calculateToBeSegments()
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/61c5aa9c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/61c5aa9c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/61c5aa9c
Branch: refs/heads/KYLIN-2059
Commit: 61c5aa9c551e7da690276af71cfd5c43077d058c
Parents: 03424a0
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 29 22:56:00 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 29 22:56:00 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 +
.../java/org/apache/kylin/cube/CubeManager.java | 77 ++++++++-------
.../org/apache/kylin/cube/CubeManagerTest.java | 99 ++++++++++++++++++++
3 files changed, 145 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/61c5aa9c/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 838ef97..4d1639b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -809,4 +809,8 @@ abstract public class KylinConfigBase implements Serializable {
public int getMaxBuildingSegments() {
return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "2"));
}
+
+ public void setMaxBuildingSegments(int maxBuildingSegments) {
+ setProperty("kylin.cube.building.segment.max", String.valueOf(maxBuildingSegments));
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/61c5aa9c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 962568c..d243f4d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -135,7 +135,7 @@ public class CubeManager implements IRealizationProvider {
logger.info("Initializing CubeManager with config " + config);
this.config = config;
this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
-
+
// touch lower level metadata before registering my listener
loadAllCubeInstance();
Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
@@ -159,12 +159,12 @@ public class CubeManager implements IRealizationProvider {
@Override
public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
String cubeName = cacheKey;
-
+
if (event == Event.DROP)
removeCubeLocal(cubeName);
else
reloadCubeLocal(cubeName);
-
+
for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE, cubeName)) {
broadcaster.notifyProjectDataUpdate(prj.getName());
}
@@ -615,7 +615,6 @@ public class CubeManager implements IRealizationProvider {
return max;
}
-
private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
List<CubeSegment> existing = cube.getSegments();
if (existing.isEmpty()) {
@@ -625,7 +624,6 @@ public class CubeManager implements IRealizationProvider {
}
}
-
private long calculateStartDateForAppendSegment(CubeInstance cube) {
List<CubeSegment> existing = cube.getSegments();
if (existing.isEmpty()) {
@@ -728,7 +726,7 @@ public class CubeManager implements IRealizationProvider {
List<CubeSegment> mergingSegs = Lists.newArrayList();
if (buildingSegs.size() > 0) {
-
+
for (CubeSegment building : buildingSegs) {
// exclude those under-merging segs
for (CubeSegment ready : readySegs) {
@@ -760,27 +758,22 @@ public class CubeManager implements IRealizationProvider {
return null;
}
- public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
- List<CubeSegment> tobe = calculateToBeSegments(cube);
+ public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
+ if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
+ throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier");
- for (CubeSegment seg : newSegments) {
- if (tobe.contains(seg) == false)
- throw new IllegalStateException("For cube " + cube + ", segment " + seg + " is expected but not in the tobe " + tobe);
+ if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
+ throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID");
- if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
- throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier");
+ if (isReady(newSegment) == true)
+ throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
- if (StringUtils.isBlank(seg.getLastBuildJobID()))
- throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID");
+ List<CubeSegment> tobe = calculateToBeSegments(cube, newSegment);
- seg.setStatus(SegmentStatusEnum.READY);
- }
+ if (tobe.contains(newSegment) == false)
+ throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
- for (CubeSegment seg : tobe) {
- if (isReady(seg) == false) {
- logger.warn("For cube " + cube + ", segment " + seg + " isn't READY yet.");
- }
- }
+ newSegment.setStatus(SegmentStatusEnum.READY);
List<CubeSegment> toRemoveSegs = Lists.newArrayList();
for (CubeSegment segment : cube.getSegments()) {
@@ -788,14 +781,14 @@ public class CubeManager implements IRealizationProvider {
toRemoveSegs.add(segment);
}
- logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(newSegments) + ", to remove segments " + toRemoveSegs);
+ logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs);
CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegments).setStatus(RealizationStatusEnum.READY);
+ cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
updateCube(cubeBuilder);
}
- public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
+ public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
List<CubeSegment> newList = Arrays.asList(newSegments);
if (tobe.containsAll(newList) == false) {
@@ -809,11 +802,12 @@ public class CubeManager implements IRealizationProvider {
* - Favors new segments over the old
* - Favors big segments over the small
*/
- private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... newSegments) {
+ private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment newSegments) {
List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
- if (newSegments != null)
- tobe.addAll(Arrays.asList(newSegments));
+ if (newSegments != null && !tobe.contains(newSegments)) {
+ tobe.add(newSegments);
+ }
if (tobe.size() == 0)
return tobe;
@@ -849,13 +843,17 @@ public class CubeManager implements IRealizationProvider {
} else {
tobe.remove(j);
}
- } else if (isNew(is)) {
- // otherwise, favor the new segment
- tobe.remove(j);
+ continue;
} else {
- tobe.remove(i);
+ // otherwise, favor the new segment
+ if (isNew(is) && is.equals(newSegments)) {
+ tobe.remove(j);
+ continue;
+ } else if (js.equals(newSegments)) {
+ tobe.remove(i);
+ continue;
+ }
}
- continue;
}
// if i, j in sequence
@@ -865,8 +863,17 @@ public class CubeManager implements IRealizationProvider {
continue;
}
- // seems j not fitting
- tobe.remove(j);
+ // js can be covered by is
+ if (is.equals(newSegments)) {
+ // seems j not fitting
+ tobe.remove(j);
+ continue;
+ } else {
+ i++;
+ j++;
+ continue;
+ }
+
}
return tobe;
http://git-wip-us.apache.org/repos/asf/kylin/blob/61c5aa9c/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 49bb128..e63fe99 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.kylin.cube;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -123,6 +124,104 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
}
+
+ @Test
+ public void testConcurrentBuildAndMerge() throws Exception {
+ CubeManager mgr = CubeManager.getInstance(getTestConfig());
+ CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+ getTestConfig().setMaxBuildingSegments(10);
+ // no segment at first
+ assertEquals(0, cube.getSegments().size());
+
+ // append first
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+ seg1.setStatus(SegmentStatusEnum.READY);
+
+ CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+ seg2.setStatus(SegmentStatusEnum.READY);
+
+
+ CubeSegment seg3 = mgr.mergeSegments(cube, 0, 0, 0000, 2000, true);
+ seg3.setStatus(SegmentStatusEnum.NEW);
+
+
+ CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+ seg4.setStatus(SegmentStatusEnum.NEW);
+ seg4.setLastBuildJobID("test");
+ seg4.setStorageLocationIdentifier("test");
+
+ CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+ seg5.setStatus(SegmentStatusEnum.READY);
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+
+ mgr.updateCube(cubeBuilder);
+
+
+ mgr.promoteNewlyBuiltSegments(cube, seg4);
+
+ assertTrue(cube.getSegments().size() == 5);
+
+ assertTrue(cube.getSegmentById(seg1.getUuid()) != null && cube.getSegmentById(seg1.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg2.getUuid()) != null && cube.getSegmentById(seg2.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+ assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg5.getUuid()) != null && cube.getSegmentById(seg5.getUuid()).getStatus() == SegmentStatusEnum.READY);
+
+ }
+
+
+ @Test
+ public void testConcurrentMergeAndMerge() throws Exception {
+ CubeManager mgr = CubeManager.getInstance(getTestConfig());
+ getTestConfig().setMaxBuildingSegments(10);
+ CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+
+ // no segment at first
+ assertEquals(0, cube.getSegments().size());
+
+ // append first
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+ seg1.setStatus(SegmentStatusEnum.READY);
+
+ CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+ seg2.setStatus(SegmentStatusEnum.READY);
+
+ CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+ seg3.setStatus(SegmentStatusEnum.READY);
+
+ CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+ seg4.setStatus(SegmentStatusEnum.READY);
+
+
+
+ CubeSegment merge1 = mgr.mergeSegments(cube, 0, 0, 0, 2000, true);
+ merge1.setStatus(SegmentStatusEnum.NEW);
+ merge1.setLastBuildJobID("test");
+ merge1.setStorageLocationIdentifier("test");
+
+ CubeSegment merge2 = mgr.mergeSegments(cube, 0, 0, 2000, 4000, true);
+ merge2.setStatus(SegmentStatusEnum.NEW);
+ merge2.setLastBuildJobID("test");
+ merge2.setStorageLocationIdentifier("test");
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ mgr.updateCube(cubeBuilder);
+
+
+ mgr.promoteNewlyBuiltSegments(cube, merge1);
+
+ assertTrue(cube.getSegments().size() == 4);
+
+ assertTrue(cube.getSegmentById(seg1.getUuid()) == null);
+ assertTrue(cube.getSegmentById(seg2.getUuid()) == null);
+ assertTrue(cube.getSegmentById(merge1.getUuid()) != null && cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(merge2.getUuid()) != null && cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+
+ }
+
@Test
public void testGetAllCubes() throws Exception {
final ResourceStore store = ResourceStore.getStore(getTestConfig());