You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/06/02 10:08:26 UTC

[2/2] incubator-kylin git commit: KYLIN-807 Avoid write conflict between job engine and stream cube builder

KYLIN-807 Avoid write conflict between job engine and stream cube builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/36af047a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/36af047a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/36af047a

Branch: refs/heads/0.8.0
Commit: 36af047a7c3ff7a295a59dbdffc05b55bbf4dd7d
Parents: 8ad0eab
Author: shaofengshi <sh...@apache.org>
Authored: Tue Jun 2 15:15:30 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Jun 2 16:04:03 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 82 ++++++++++++++------
 .../apache/kylin/cube/CubeManagerCacheTest.java |  2 +-
 .../kylin/job/cube/MergeDictionaryStep.java     |  4 +-
 .../job/cube/UpdateCubeInfoAfterBuildStep.java  |  3 +-
 .../kylin/job/streaming/CubeStreamBuilder.java  |  3 +-
 .../kylin/job/BuildCubeWithEngineTest.java      |  4 +-
 .../java/org/apache/kylin/job/DeployUtil.java   |  2 +-
 .../job/hadoop/cube/MergeCuboidMapperTest.java  |  5 +-
 .../job/streaming/CubeStreamBuilderTest.java    |  4 +-
 .../kylin/rest/controller/CubeController.java   |  5 +-
 .../apache/kylin/rest/service/CubeService.java  | 14 ++--
 .../apache/kylin/rest/service/JobService.java   |  4 +-
 .../kylin/rest/service/CacheServiceTest.java    |  4 +-
 13 files changed, 85 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 5c651fc..585e503 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -153,7 +153,7 @@ public class CubeManager implements IRealizationProvider {
         DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factColumnsPath);
         cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
 
-        updateCube(cubeSeg.getCubeInstance(), false);
+        updateCube(cubeSeg.getCubeInstance(), null, null, Lists.newArrayList(cubeSeg), null, false);
 
         return dictInfo;
     }
@@ -191,7 +191,7 @@ public class CubeManager implements IRealizationProvider {
 
         cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
 
-        updateCube(cubeSeg.getCubeInstance(), false);
+        updateCube(cubeSeg.getCubeInstance(), null, null, Lists.newArrayList(cubeSeg), null, false);
 
         return snapshot;
     }
@@ -226,7 +226,7 @@ public class CubeManager implements IRealizationProvider {
         CubeInstance cube = CubeInstance.create(cubeName, projectName, desc);
         cube.setOwner(owner);
 
-        updateCube(cube, false);
+        updateCube(cube, null, null, null, null, false);
         ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
 
         return cube;
@@ -243,10 +243,53 @@ public class CubeManager implements IRealizationProvider {
      * @return
      * @throws IOException
      */
-    public CubeInstance updateCube(CubeInstance cube, boolean updateProject) throws IOException {
+    public CubeInstance updateCube(CubeInstance cube, final List<CubeSegment> toAddSegs, final List<CubeSegment> toRemoveSegs, final List<CubeSegment> toUpdateSegs, RealizationStatusEnum newStatus, boolean updateProject) throws IOException {
+       return updateCube(cube, toAddSegs, toRemoveSegs, toUpdateSegs, newStatus, updateProject, 0);
+    }
+
+    private CubeInstance updateCube(CubeInstance cube, final List<CubeSegment> toAddSegs, final List<CubeSegment> toRemoveSegs, final List<CubeSegment> toUpdateSegs, RealizationStatusEnum newStatus, boolean updateProject, int retry) throws IOException {
+        if (cube == null)
+            throw new IllegalStateException();
 
         logger.info("Updating cube instance '" + cube.getName());
-        getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER);
+
+        if (toAddSegs != null && toAddSegs.size() > 0)
+            cube.getSegments().addAll(toAddSegs);
+
+        if (toRemoveSegs != null && toRemoveSegs.size() > 0)
+            cube.getSegments().removeAll(toRemoveSegs);
+
+        if (toUpdateSegs != null && toUpdateSegs.size() > 0) {
+            for (CubeSegment segment : toUpdateSegs) {
+                for (int i = 0; i < cube.getSegments().size(); i++) {
+                    if (cube.getSegments().get(i).getName().equals(segment.getName())) {
+                        cube.getSegments().set(i, segment);
+                    }
+                }
+
+            }
+        }
+
+        Collections.sort(cube.getSegments());
+
+        if (newStatus != null) {
+            cube.setStatus(newStatus);
+        }
+
+        try {
+            getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER);
+        } catch (IllegalStateException ise) {
+            logger.error("Get error when update cube " + cube.getName(), ise);
+
+            if (retry >= 3) {
+                logger.error("Retried 3 times till got error, abandoning...");
+                throw ise;
+            }
+
+            retry++;
+            cube = updateCube(reloadCubeLocal(cube.getName()), toAddSegs, toRemoveSegs, toUpdateSegs, newStatus, updateProject, retry);
+        }
+
         cubeMap.put(cube.getName(), cube);
 
         if (updateProject) {
@@ -271,7 +314,7 @@ public class CubeManager implements IRealizationProvider {
         CubeSegment mergeSegment = newSegment(cube, startDate, endDate);
 
         validateNewSegments(cube, mergeSegment);
-        saveCubeSegmentChange(cube, Lists.newArrayList(appendSegment, mergeSegment), null);
+        updateCube(cube, Lists.newArrayList(appendSegment, mergeSegment), null, null, null, false);
 
         return new Pair<CubeSegment, CubeSegment>(appendSegment, mergeSegment);
     }
@@ -296,7 +339,7 @@ public class CubeManager implements IRealizationProvider {
         validateNewSegments(cube, newSegment);
 
         if (saveChange)
-            saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+            updateCube(cube, Lists.newArrayList(newSegment), null, null, null, false);
 
         return newSegment;
     }
@@ -305,7 +348,7 @@ public class CubeManager implements IRealizationProvider {
         checkNoBuildingSegment(cube);
 
         CubeSegment newSegment = newSegment(cube, startDate, endDate);
-        saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+        updateCube(cube, Lists.newArrayList(newSegment), null, null, null, false);
 
         return newSegment;
     }
@@ -318,23 +361,11 @@ public class CubeManager implements IRealizationProvider {
         CubeSegment newSegment = newSegment(cube, range.getFirst(), range.getSecond());
 
         validateNewSegments(cube, newSegment);
-        saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+        updateCube(cube, Lists.newArrayList(newSegment), null, null, null, false);
 
         return newSegment;
     }
 
-    protected void saveCubeSegmentChange(CubeInstance cube, List<CubeSegment> toAdd, List<CubeSegment> toRemove) throws IOException {
-        if (toAdd != null && toAdd.size() > 0)
-            cube.getSegments().addAll(toAdd);
-
-        if (toRemove != null && toRemove.size() > 0)
-            cube.getSegments().removeAll(toRemove);
-
-        Collections.sort(cube.getSegments());
-        updateCube(cube, false);
-
-    }
-
     private Pair<Long, Long> alignMergeRange(CubeInstance cube, long startDate, long endDate) {
         List<CubeSegment> readySegments = cube.getSegment(SegmentStatusEnum.READY);
         if (readySegments.isEmpty()) {
@@ -539,11 +570,14 @@ public class CubeManager implements IRealizationProvider {
                 throw new IllegalStateException("For cube " + cube + ", segment " + seg + " should be READY but is not");
         }
 
-        cube.setSegments(tobe);
-        cube.setStatus(RealizationStatusEnum.READY);
+        List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+        for (CubeSegment segment : cube.getSegments()) {
+            if (!tobe.contains(segment))
+                toRemoveSegs.add(segment);
+        }
 
         logger.info("Promoting cube " + cube + ", new segments " + newSegments);
-        updateCube(cube, updateProj);
+        updateCube(cube, Lists.newArrayList(newSegments), toRemoveSegs, null, RealizationStatusEnum.READY, updateProj);
     }
 
     public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
index 7c74993..6b96bd4 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
@@ -67,7 +67,7 @@ public class CubeManagerCacheTest extends LocalFileMetadataTestCase {
         assertEquals(RealizationStatusEnum.DISABLED, createdCube.getStatus());
         createdCube.setStatus(RealizationStatusEnum.DESCBROKEN);
 
-        cubeManager.updateCube(createdCube,true);
+        cubeManager.updateCube(createdCube, null, null, null, null, true);
         assertEquals(RealizationStatusEnum.DESCBROKEN, cubeManager.getCube("a_whole_new_cube").getStatus());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
index d0a7db3..82a8984 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
@@ -62,8 +62,8 @@ public class MergeDictionaryStep extends AbstractExecutable {
             
             makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
             makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-            
-            mgr.updateCube(cube,false);
+
+            mgr.updateCube(cube, null, null, Lists.newArrayList(newSegment), null, false);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to merge dictionary or lookup snapshots", e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
index 1472424..e00b36d 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.cube;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -127,7 +128,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
             if (segmentReady) {
                 cubeManager.promoteNewlyBuiltSegments(cube, cube.getSegments().size() == 1, segment);
             } else {
-                cubeManager.updateCube(cube, false);
+                cubeManager.updateCube(cube, null, null, Lists.newArrayList(segment), null, false);
             }
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 3986db6..34d4eaf 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -341,8 +341,7 @@ public class CubeStreamBuilder extends StreamBuilder {
         CubeInstance cube = CubeManager.getInstance(kylinConfig).reloadCubeLocal(cubeSegment.getCubeInstance().getName());
         cube.getSegments().add(cubeSegment);
         Collections.sort(cube.getSegments());
-
-        CubeManager.getInstance(kylinConfig).updateCube(cube, false);
+        CubeManager.getInstance(kylinConfig).updateCube(cube, Lists.newArrayList(cubeSegment), null, null, null, false);
     }
 
     private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 74c824c..0888994 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -258,8 +258,8 @@ public class BuildCubeWithEngineTest {
 
     private void clearSegment(String cubeName) throws Exception {
         CubeInstance cube = cubeManager.getCube(cubeName);
-        cube.getSegments().clear();
-        cubeManager.updateCube(cube, true);
+        // remove all existing segments
+        cubeManager.updateCube(cube, null, cube.getSegments(), null, null, true);
     }
 
     private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index 6d348bd..e238e12 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -60,7 +60,7 @@ public class DeployUtil {
         // update cube desc signature.
         for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {
             cube.getDescriptor().setSignature(cube.getDescriptor().calculateSignature());
-            CubeManager.getInstance(config()).updateCube(cube,true);
+            CubeManager.getInstance(config()).updateCube(cube, null, null, null, null, true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 9f7e5f9..e9cdb96 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.MapDriver;
@@ -134,11 +135,13 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
 
             // cubeManager.saveResource(segment.getCubeInstance());
             // cubeManager.afterCubeUpdated(segment.getCubeInstance());
-            cubeManager.updateCube(cube,true);
 
             isFirstSegment = false;
         }
 
+
+        cube = cubeManager.updateCube(cube, null, null, cube.getSegments(), null, true);
+
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
index f0ef5e1..96b7cb4 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
@@ -50,8 +50,8 @@ public class CubeStreamBuilderTest {
         DeployUtil.deployMetadata();
         DeployUtil.overrideJobJarLocations();
         final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
-        cube.getSegments().clear();
-        CubeManager.getInstance(kylinConfig).updateCube(cube, true);
+        // remove all existing segments
+        CubeManager.getInstance(kylinConfig).updateCube(cube, null, cube.getSegments(), null, null, true);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a7df432..33a2886 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.CubeInstance;
@@ -419,10 +420,8 @@ public class CubeController extends BasicController {
         CubeSegment segment = deserializeCubeSegment(cubeSegmentRequest);
 
         cubeService.getCubeManager().validateNewSegments(cube, segment);
-        cube.getSegments().add(segment);
-        Collections.sort(cube.getSegments());
         try {
-            cubeService.getCubeManager().updateCube(cube, true);
+            cubeService.getCubeManager().updateCube(cube, Lists.newArrayList(segment), null, null, null, true);
         } catch (IOException e) {
             logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
             throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 08e4112..7e38d73 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -150,7 +150,7 @@ public class CubeService extends BasicService {
         String owner = SecurityContextHolder.getContext().getAuthentication().getName();
         cube.setOwner(owner);
 
-        return getCubeManager().updateCube(cube, true);
+        return getCubeManager().updateCube(cube, null, null, null, null, true);
     }
 
     public CubeInstance createCubeAndDesc(String cubeName, String projectName, CubeDesc desc) throws IOException {
@@ -348,7 +348,7 @@ public class CubeService extends BasicService {
         cube.setStatus(RealizationStatusEnum.DISABLED);
 
         try {
-            return getCubeManager().updateCube(cube, true);
+            return getCubeManager().updateCube(cube, null, null, null, RealizationStatusEnum.DISABLED, true);
         } catch (IOException e) {
             cube.setStatus(ostatus);
             throw e;
@@ -380,12 +380,11 @@ public class CubeService extends BasicService {
             throw new JobException("Enable is not allowed with a running job.");
         }
         if (!cube.getDescriptor().calculateSignature().equals(cube.getDescriptor().getSignature())) {
-            this.releaseAllSegments(cube);
+            cube = this.releaseAllSegments(cube);
         }
 
-        cube.setStatus(RealizationStatusEnum.READY);
         try {
-            return getCubeManager().updateCube(cube, true);
+            return getCubeManager().updateCube(cube, null, null, null, RealizationStatusEnum.READY, true);
         } catch (IOException e) {
             cube.setStatus(ostatus);
             throw e;
@@ -520,7 +519,7 @@ public class CubeService extends BasicService {
      * @throws IOException
      * @throws JobException
      */
-    private void releaseAllSegments(CubeInstance cube) throws IOException, JobException {
+    private CubeInstance releaseAllSegments(CubeInstance cube) throws IOException, JobException {
         final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null);
         for (CubingJob cubingJob : cubingJobs) {
             final ExecutableState status = cubingJob.getStatus();
@@ -528,8 +527,7 @@ public class CubeService extends BasicService {
                 getExecutableManager().discardJob(cubingJob.getId());
             }
         }
-        cube.getSegments().clear();
-        CubeManager.getInstance(getConfig()).updateCube(cube, true);
+        return CubeManager.getInstance(getConfig()).updateCube(cube, null, null, cube.getSegments(), null, true);
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 3c150ef..3b3cbec 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -263,8 +263,8 @@ public class JobService extends BasicService {
         CubeInstance cubeInstance = getCubeManager().getCube(jobInstance.getRelatedCube());
         final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
         if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) {
-            cubeInstance.getSegments().remove(segment);
-            getCubeManager().updateCube(cubeInstance, false);
+            // Remove all existing segments
+            getCubeManager().updateCube(cubeInstance, null, cubeInstance.getSegments(), null, null, false);
         }
         getExecutableManager().discardJob(jobId);
         return jobInstance;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 788ce19..e26f387 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.service;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.cache.RemoteCacheUpdater;
 import org.apache.kylin.common.restclient.Broadcaster;
@@ -228,8 +229,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
         CubeSegment segment = new CubeSegment();
         segment.setName("test_segment");
-        cube.getSegments().add(segment);
-        cubeManager.updateCube(cube, true);
+        cube = cubeManager.updateCube(cube, Lists.newArrayList(segment), null, null, null, true);
         //one for cube update, one for project update
         assertEquals(2, broadcaster.getCounterAndClear());
         waitForCounterAndClear(2);