You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/29 14:59:17 UTC

kylin git commit: KYLIN-2059 Concurrent build issue in CubeManager.calculateToBeSegments()

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2059 [created] 61c5aa9c5


KYLIN-2059 Concurrent build issue in CubeManager.calculateToBeSegments()

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/61c5aa9c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/61c5aa9c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/61c5aa9c

Branch: refs/heads/KYLIN-2059
Commit: 61c5aa9c551e7da690276af71cfd5c43077d058c
Parents: 03424a0
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 29 22:56:00 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 29 22:56:00 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 +
 .../java/org/apache/kylin/cube/CubeManager.java | 77 ++++++++-------
 .../org/apache/kylin/cube/CubeManagerTest.java  | 99 ++++++++++++++++++++
 3 files changed, 145 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/61c5aa9c/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 838ef97..4d1639b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -809,4 +809,8 @@ abstract public class KylinConfigBase implements Serializable {
     public int getMaxBuildingSegments() {
         return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "2"));
     }
+
+    public void setMaxBuildingSegments(int maxBuildingSegments) {
+        setProperty("kylin.cube.building.segment.max", String.valueOf(maxBuildingSegments));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/61c5aa9c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 962568c..d243f4d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -135,7 +135,7 @@ public class CubeManager implements IRealizationProvider {
         logger.info("Initializing CubeManager with config " + config);
         this.config = config;
         this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
-        
+
         // touch lower level metadata before registering my listener
         loadAllCubeInstance();
         Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
@@ -159,12 +159,12 @@ public class CubeManager implements IRealizationProvider {
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
             String cubeName = cacheKey;
-            
+
             if (event == Event.DROP)
                 removeCubeLocal(cubeName);
             else
                 reloadCubeLocal(cubeName);
-            
+
             for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE, cubeName)) {
                 broadcaster.notifyProjectDataUpdate(prj.getName());
             }
@@ -615,7 +615,6 @@ public class CubeManager implements IRealizationProvider {
         return max;
     }
 
-
     private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
         List<CubeSegment> existing = cube.getSegments();
         if (existing.isEmpty()) {
@@ -625,7 +624,6 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-
     private long calculateStartDateForAppendSegment(CubeInstance cube) {
         List<CubeSegment> existing = cube.getSegments();
         if (existing.isEmpty()) {
@@ -728,7 +726,7 @@ public class CubeManager implements IRealizationProvider {
 
         List<CubeSegment> mergingSegs = Lists.newArrayList();
         if (buildingSegs.size() > 0) {
-            
+
             for (CubeSegment building : buildingSegs) {
                 // exclude those under-merging segs
                 for (CubeSegment ready : readySegs) {
@@ -760,27 +758,22 @@ public class CubeManager implements IRealizationProvider {
         return null;
     }
 
-    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
-        List<CubeSegment> tobe = calculateToBeSegments(cube);
+    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
+        if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
+            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier");
 
-        for (CubeSegment seg : newSegments) {
-            if (tobe.contains(seg) == false)
-                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " is expected but not in the tobe " + tobe);
+        if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
+            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID");
 
-            if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
-                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier");
+        if (isReady(newSegment) == true)
+            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
 
-            if (StringUtils.isBlank(seg.getLastBuildJobID()))
-                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID");
+        List<CubeSegment> tobe = calculateToBeSegments(cube, newSegment);
 
-            seg.setStatus(SegmentStatusEnum.READY);
-        }
+        if (tobe.contains(newSegment) == false)
+            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
 
-        for (CubeSegment seg : tobe) {
-            if (isReady(seg) == false) {
-                logger.warn("For cube " + cube + ", segment " + seg + " isn't READY yet.");
-            }
-        }
+        newSegment.setStatus(SegmentStatusEnum.READY);
 
         List<CubeSegment> toRemoveSegs = Lists.newArrayList();
         for (CubeSegment segment : cube.getSegments()) {
@@ -788,14 +781,14 @@ public class CubeManager implements IRealizationProvider {
                 toRemoveSegs.add(segment);
         }
 
-        logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(newSegments) + ", to remove segments " + toRemoveSegs);
+        logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs);
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegments).setStatus(RealizationStatusEnum.READY);
+        cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
         updateCube(cubeBuilder);
     }
 
-    public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
+    public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
         List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
         List<CubeSegment> newList = Arrays.asList(newSegments);
         if (tobe.containsAll(newList) == false) {
@@ -809,11 +802,12 @@ public class CubeManager implements IRealizationProvider {
      * - Favors new segments over the old
      * - Favors big segments over the small
      */
-    private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... newSegments) {
+    private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment newSegments) {
 
         List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
-        if (newSegments != null)
-            tobe.addAll(Arrays.asList(newSegments));
+        if (newSegments != null && !tobe.contains(newSegments)) {
+            tobe.add(newSegments);
+        }
         if (tobe.size() == 0)
             return tobe;
 
@@ -849,13 +843,17 @@ public class CubeManager implements IRealizationProvider {
                     } else {
                         tobe.remove(j);
                     }
-                } else if (isNew(is)) {
-                    // otherwise, favor the new segment
-                    tobe.remove(j);
+                    continue;
                 } else {
-                    tobe.remove(i);
+                    // otherwise, favor the new segment
+                    if (isNew(is) && is.equals(newSegments)) {
+                        tobe.remove(j);
+                        continue;
+                    } else if (js.equals(newSegments)) {
+                        tobe.remove(i);
+                        continue;
+                    }
                 }
-                continue;
             }
 
             // if i, j in sequence
@@ -865,8 +863,17 @@ public class CubeManager implements IRealizationProvider {
                 continue;
             }
 
-            // seems j not fitting
-            tobe.remove(j);
+            // js can be covered by is
+            if (is.equals(newSegments)) {
+                // seems j not fitting
+                tobe.remove(j);
+                continue;
+            } else {
+                i++;
+                j++;
+                continue;
+            }
+
         }
 
         return tobe;

http://git-wip-us.apache.org/repos/asf/kylin/blob/61c5aa9c/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 49bb128..e63fe99 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -123,6 +124,104 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
 
     }
 
+
+    @Test
+    public void testConcurrentBuildAndMerge() throws Exception {
+        CubeManager mgr = CubeManager.getInstance(getTestConfig());
+        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+        getTestConfig().setMaxBuildingSegments(10);
+        // no segment at first
+        assertEquals(0, cube.getSegments().size());
+
+        // append first
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+        seg1.setStatus(SegmentStatusEnum.READY);
+
+        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+        seg2.setStatus(SegmentStatusEnum.READY);
+
+
+        CubeSegment seg3 = mgr.mergeSegments(cube, 0, 0, 0000, 2000, true);
+        seg3.setStatus(SegmentStatusEnum.NEW);
+
+
+        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+        seg4.setStatus(SegmentStatusEnum.NEW);
+        seg4.setLastBuildJobID("test");
+        seg4.setStorageLocationIdentifier("test");
+
+        CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+        seg5.setStatus(SegmentStatusEnum.READY);
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+
+        mgr.updateCube(cubeBuilder);
+
+
+        mgr.promoteNewlyBuiltSegments(cube, seg4);
+
+        assertTrue(cube.getSegments().size() == 5);
+
+        assertTrue(cube.getSegmentById(seg1.getUuid()) != null && cube.getSegmentById(seg1.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg2.getUuid()) != null && cube.getSegmentById(seg2.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+        assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg5.getUuid()) != null && cube.getSegmentById(seg5.getUuid()).getStatus() == SegmentStatusEnum.READY);
+
+    }
+
+
+    @Test
+    public void testConcurrentMergeAndMerge() throws Exception {
+        CubeManager mgr = CubeManager.getInstance(getTestConfig());
+        getTestConfig().setMaxBuildingSegments(10);
+        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+
+        // no segment at first
+        assertEquals(0, cube.getSegments().size());
+
+        // append first
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+        seg1.setStatus(SegmentStatusEnum.READY);
+
+        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+        seg2.setStatus(SegmentStatusEnum.READY);
+
+        CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+        seg3.setStatus(SegmentStatusEnum.READY);
+
+        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+        seg4.setStatus(SegmentStatusEnum.READY);
+
+
+
+        CubeSegment merge1 = mgr.mergeSegments(cube, 0, 0, 0, 2000, true);
+        merge1.setStatus(SegmentStatusEnum.NEW);
+        merge1.setLastBuildJobID("test");
+        merge1.setStorageLocationIdentifier("test");
+
+        CubeSegment merge2 = mgr.mergeSegments(cube, 0, 0, 2000, 4000, true);
+        merge2.setStatus(SegmentStatusEnum.NEW);
+        merge2.setLastBuildJobID("test");
+        merge2.setStorageLocationIdentifier("test");
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        mgr.updateCube(cubeBuilder);
+
+
+        mgr.promoteNewlyBuiltSegments(cube, merge1);
+
+        assertTrue(cube.getSegments().size() == 4);
+
+        assertTrue(cube.getSegmentById(seg1.getUuid()) == null);
+        assertTrue(cube.getSegmentById(seg2.getUuid()) == null);
+        assertTrue(cube.getSegmentById(merge1.getUuid()) != null && cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(merge2.getUuid()) != null && cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+
+    }
+
     @Test
     public void testGetAllCubes() throws Exception {
         final ResourceStore store = ResourceStore.getStore(getTestConfig());