You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/01/23 09:32:25 UTC

[38/50] [abbrv] incubator-kylin git commit: KYLIN-567, coding done, ready for testing

KYLIN-567, coding done, ready for testing


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8072fb77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8072fb77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8072fb77

Branch: refs/heads/inverted-index
Commit: 8072fb7729d78017a062d249451f7c61c27ef3ca
Parents: 8da37ea
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jan 23 14:13:32 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jan 23 14:13:32 2015 +0800

----------------------------------------------------------------------
 .../java/com/kylinolap/cube/CubeManager.java    | 151 ++++++++++++++---
 .../kylinolap/cube/CubeSegmentValidator.java    | 161 -------------------
 .../java/com/kylinolap/cube/model/CubeDesc.java |  13 +-
 .../kylinolap/job/cube/MergeDictionaryStep.java |  14 +-
 .../job/cube/UpdateCubeInfoAfterBuildStep.java  |   7 +-
 .../job/cube/UpdateCubeInfoAfterMergeStep.java  |  40 ++---
 6 files changed, 164 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/cube/src/main/java/com/kylinolap/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/CubeManager.java b/cube/src/main/java/com/kylinolap/cube/CubeManager.java
index be22ea0..70fb8b6 100644
--- a/cube/src/main/java/com/kylinolap/cube/CubeManager.java
+++ b/cube/src/main/java/com/kylinolap/cube/CubeManager.java
@@ -18,6 +18,7 @@ package com.kylinolap.cube;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.Iterator;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.kylinolap.common.KylinConfig;
 import com.kylinolap.common.persistence.JsonSerializer;
@@ -41,6 +43,7 @@ import com.kylinolap.common.restclient.Broadcaster;
 import com.kylinolap.common.restclient.CaseInsensitiveStringCache;
 import com.kylinolap.cube.model.CubeBuildTypeEnum;
 import com.kylinolap.cube.model.CubeDesc;
+import com.kylinolap.cube.model.CubePartitionDesc;
 import com.kylinolap.cube.model.DimensionDesc;
 import com.kylinolap.dict.DateStrDictionary;
 import com.kylinolap.dict.Dictionary;
@@ -244,12 +247,7 @@ public class CubeManager implements IRealizationProvider {
 
     public CubeInstance updateCube(CubeInstance cube) throws IOException {
         logger.info("Updating cube instance '" + cube.getName());
-
-        // save resource
         saveResource(cube);
-
-        logger.info("Cube with " + cube.getSegments().size() + " segments is saved");
-
         return cube;
     }
 
@@ -294,9 +292,9 @@ public class CubeManager implements IRealizationProvider {
                 }
             }
         }
-        CubeSegment newSegment = buildSegment(cubeInstance, start, end);
 
-        validateNewSegments(cubeInstance, CubeBuildTypeEnum.MERGE, newSegment);
+        CubeSegment newSegment = newSegment(cubeInstance, start, end);
+        validateNewSegments(cubeInstance, newSegment);
 
         cubeInstance.getSegments().add(newSegment);
         Collections.sort(cubeInstance.getSegments());
@@ -315,15 +313,16 @@ public class CubeManager implements IRealizationProvider {
         if (cubeInstance.getDescriptor().getCubePartitionDesc().isPartitioned()) {
             if (readySegments.isEmpty()) {
                 startDate = cubeInstance.getDescriptor().getCubePartitionDesc().getPartitionDateStart();
-                newSegment = buildSegment(cubeInstance, startDate, endDate);
+                newSegment = newSegment(cubeInstance, startDate, endDate);
             } else {
                 startDate = readySegments.get(readySegments.size() - 1).getDateRangeEnd();
-                newSegment = buildSegment(cubeInstance, startDate, endDate);
+                newSegment = newSegment(cubeInstance, startDate, endDate);
             }
         } else {
-            newSegment = buildSegment(cubeInstance, 0, Long.MAX_VALUE);
+            newSegment = newSegment(cubeInstance, 0, Long.MAX_VALUE);
         }
-        validateNewSegments(cubeInstance, CubeBuildTypeEnum.BUILD, newSegment);
+
+        validateNewSegments(cubeInstance, newSegment);
 
         cubeInstance.getSegments().add(newSegment);
         Collections.sort(cubeInstance.getSegments());
@@ -444,13 +443,7 @@ public class CubeManager implements IRealizationProvider {
         removeCubeCache(droppedCube);
     }
 
-    /**
-     * @param cubeInstance
-     * @param startDate    (pass 0 if full build)
-     * @param endDate      (pass 0 if full build)
-     * @return
-     */
-    private CubeSegment buildSegment(CubeInstance cubeInstance, long startDate, long endDate) {
+    private CubeSegment newSegment(CubeInstance cubeInstance, long startDate, long endDate) {
         CubeSegment segment = new CubeSegment();
         String incrementalSegName = CubeSegment.getSegmentName(startDate, endDate);
         segment.setUuid(UUID.randomUUID().toString());
@@ -482,19 +475,125 @@ public class CubeManager implements IRealizationProvider {
         return tableName;
     }
 
+    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
+        List<CubeSegment> tobe = calculateToBeSegments(cube);
+
+        for (CubeSegment seg : newSegments) {
+            if (tobe.contains(seg) == false)
+                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " is expected but not in the tobe " + tobe);
+
+            if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
+                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier");
+
+            if (StringUtils.isBlank(seg.getLastBuildJobID()))
+                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID");
+
+            seg.setStatus(SegmentStatusEnum.READY);
+        }
+
+        for (CubeSegment seg : tobe) {
+            if (isReady(seg) == false)
+                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " should be READY but is not");
+        }
+
+        cube.setSegments(tobe);
+        cube.setStatus(RealizationStatusEnum.READY);
+
+        logger.info("Promoting cube " + cube + ", new segments " + newSegments);
+        saveResource(cube);
+    }
+
+    private void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
+        List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
+        List<CubeSegment> newList = Arrays.asList(newSegments);
+        if (tobe.containsAll(newList) == false) {
+            throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
+        }
+    }
+
     /**
+     * Smartly figure out the TOBE segments once all new segments are built.
+     * - Ensures no gap, no overlap 
+     * - Favors new segments over the old
+     * - Favors big segments over the small
      */
-    private void validateNewSegments(CubeInstance cubeInstance, CubeBuildTypeEnum buildType, CubeSegment newSegment) {
-        if (cubeInstance.getDescriptor().getCubePartitionDesc().isPartitioned() == false) {
-            // do nothing for non-incremental build
-            return;
+    private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... extraSegments) {
+        CubeDesc cubeDesc = cube.getDescriptor();
+        CubePartitionDesc partDesc = cubeDesc.getCubePartitionDesc();
+        if (partDesc.isPartitioned() == false) {
+            return cube.getSegments(); // do nothing for non-incremental build
         }
-        if (newSegment.getDateRangeEnd() <= newSegment.getDateRangeStart()) {
-            throw new IllegalStateException(" end date.");
+
+        List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
+        if (extraSegments != null)
+            tobe.addAll(Arrays.asList(extraSegments));
+        if (tobe.size() == 0)
+            return tobe;
+
+        // sort by start time, then end time
+        Collections.sort(tobe);
+
+        // check first segment start time
+        CubeSegment firstSeg = tobe.get(0);
+        if (firstSeg.getDateRangeStart() != partDesc.getPartitionDateStart()) {
+            throw new IllegalStateException("For " + cube + ", the first segment, " + firstSeg + ", must start at " + partDesc.getPartitionDateStart());
         }
 
-        CubeSegmentValidator cubeSegmentValidator = CubeSegmentValidator.getCubeSegmentValidator(buildType);
-        cubeSegmentValidator.validate(cubeInstance, newSegment);
+        for (int i = 0, j = 1; j < tobe.size();) {
+            CubeSegment is = tobe.get(i);
+            CubeSegment js = tobe.get(j);
+
+            // check i is either ready or new
+            if (!isNew(is) && !isReady(is)) {
+                tobe.remove(i);
+                continue;
+            }
+
+            // check j is either ready or new
+            if (!isNew(js) && !isReady(js)) {
+                tobe.remove(j);
+                continue;
+            }
+
+            // if i, j competes
+            if (is.getDateRangeStart() == js.getDateRangeStart()) {
+                // if both new or ready, favor the bigger segment
+                if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) {
+                    if (is.getDateRangeEnd() <= js.getDateRangeEnd()) {
+                        tobe.remove(i);
+                    } else {
+                        tobe.remove(j);
+                    }
+                }
+                // otherwise, favor the new segment
+                else if (isNew(is)) {
+                    tobe.remove(j);
+                } else {
+                    tobe.remove(i);
+                }
+                continue;
+            }
+
+            // if i, j in sequence
+            if (is.getDateRangeEnd() == js.getDateRangeStart()) {
+                i++;
+                j++;
+                continue;
+            }
+
+            // seems j not fitting
+            tobe.remove(j);
+        }
+
+        return tobe;
+    }
+
+    private boolean isReady(CubeSegment seg) {
+        return seg.getStatus() == SegmentStatusEnum.READY;
+    }
+
+    private boolean isNew(CubeSegment seg) {
+        return seg.getStatus() == SegmentStatusEnum.NEW || seg.getStatus() == SegmentStatusEnum.READY_PENDING;
     }
 
     private void loadAllCubeInstance() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java
deleted file mode 100644
index c2c6495..0000000
--- a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.cube;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import com.kylinolap.cube.model.CubeBuildTypeEnum;
-import com.kylinolap.cube.model.CubeDesc;
-import com.kylinolap.cube.model.CubePartitionDesc;
-import com.kylinolap.cube.model.DimensionDesc;
-import com.kylinolap.dict.DictionaryManager;
-import com.kylinolap.metadata.model.SegmentStatusEnum;
-import com.kylinolap.metadata.model.TblColRef;
-
-/**
- * @author xduo
- */
-public abstract class CubeSegmentValidator {
-
-    private CubeSegmentValidator() {
-    }
-
-    public static CubeSegmentValidator getCubeSegmentValidator(CubeBuildTypeEnum buildType) {
-        switch (buildType) {
-        case MERGE:
-            return new MergeOperationValidator();
-        case BUILD:
-            return new BuildOperationValidator();
-        default:
-            throw new RuntimeException("invalid build type:" + buildType);
-        }
-    }
-
-    abstract void validate(CubeInstance cubeInstance, CubeSegment newSegment);
-
-    private static class MergeOperationValidator extends CubeSegmentValidator {
-
-        private void checkContingency(CubeInstance cubeInstance, CubeSegment newSegment) {
-            if (cubeInstance.getSegments().size() < 2) {
-                throw new IllegalStateException("No segments to merge.");
-            }
-            CubeSegment startSeg = null;
-            CubeSegment endSeg = null;
-            for (CubeSegment segment : cubeInstance.getSegments()) {
-                if (segment.getDateRangeStart() == newSegment.getDateRangeStart()) {
-                    startSeg = segment;
-                }
-                if (segment.getDateRangeEnd() == newSegment.getDateRangeEnd()) {
-                    endSeg = segment;
-                }
-            }
-
-            if (null == startSeg || null == endSeg || startSeg.getDateRangeStart() >= endSeg.getDateRangeStart()) {
-                throw new IllegalStateException("Invalid date range.");
-            }
-        }
-
-        private void checkLoopTableConsistency(CubeInstance cube, CubeSegment newSegment) {
-
-            DictionaryManager dictMgr = DictionaryManager.getInstance(cube.getConfig());
-            List<CubeSegment> segmentList = cube.getMergingSegments(newSegment);
-
-            HashSet<TblColRef> cols = new HashSet<TblColRef>();
-            CubeDesc cubeDesc = cube.getDescriptor();
-            for (DimensionDesc dim : cubeDesc.getDimensions()) {
-                for (TblColRef col : dim.getColumnRefs()) {
-                    // include those dictionaries that do not need mergning
-                    try {
-                        if (newSegment.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                            String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                            if (!cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
-                                cols.add(col);
-                            }
-                        }
-                    } catch (IOException e) {
-                        throw new IllegalStateException("checkLoopTableConsistency not passed when allocating a new segment.");
-                    }
-                }
-            }
-
-            // check if all dictionaries on lookup table columns are identical
-            for (TblColRef col : cols) {
-                String dictOfFirstSegment = null;
-                for (CubeSegment segment : segmentList) {
-                    String temp = segment.getDictResPath(col);
-                    if (temp == null) {
-                        throw new IllegalStateException("Dictionary is null on column: " + col + " Segment: " + segment);
-                    }
-
-                    if (dictOfFirstSegment == null) {
-                        dictOfFirstSegment = temp;
-                    } else {
-                        if (!dictOfFirstSegment.equalsIgnoreCase(temp)) {
-                            throw new IllegalStateException("Segments with different dictionaries(on lookup table) cannot be merged");
-                        }
-                    }
-                }
-            }
-
-            // check if all segments' snapshot are identical
-            CubeSegment firstSegment = null;
-            for (CubeSegment segment : segmentList) {
-                if (firstSegment == null) {
-                    firstSegment = segment;
-                } else {
-                    Collection<String> a = firstSegment.getSnapshots().values();
-                    Collection<String> b = segment.getSnapshots().values();
-                    if (!((a.size() == b.size()) && a.containsAll(b)))
-                        throw new IllegalStateException("Segments with different snapshots cannot be merged");
-                }
-            }
-
-        }
-
-        @Override
-        public void validate(CubeInstance cubeInstance, CubeSegment newSegment) {
-            this.checkContingency(cubeInstance, newSegment);
-            this.checkLoopTableConsistency(cubeInstance, newSegment);
-        }
-    }
-
-    private static class BuildOperationValidator extends CubeSegmentValidator {
-
-        @Override
-        void validate(CubeInstance cubeInstance, CubeSegment newSegment) {
-            List<CubeSegment> readySegments = cubeInstance.getSegments(SegmentStatusEnum.READY);
-            CubePartitionDesc cubePartitionDesc = cubeInstance.getDescriptor().getCubePartitionDesc();
-            final long initStartDate = cubePartitionDesc.isPartitioned() ? cubePartitionDesc.getPartitionDateStart() : 0;
-            long startDate = initStartDate;
-            for (CubeSegment readySegment: readySegments) {
-                if (startDate == readySegment.getDateRangeStart() && startDate < readySegment.getDateRangeEnd()) {
-                    startDate = readySegment.getDateRangeEnd();
-                } else {
-                    throw new IllegalStateException("there is gap in cube segments");
-                }
-            }
-            if (newSegment.getDateRangeStart() == startDate && startDate < newSegment.getDateRangeEnd()) {
-                return;
-            }
-            throw new IllegalStateException("invalid segment date range from " + newSegment.getDateRangeStart() + " to " + newSegment.getDateRangeEnd());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java b/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java
index 999a964..5301081 100644
--- a/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java
+++ b/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java
@@ -31,11 +31,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import com.kylinolap.common.util.CaseInsensitiveStringMap;
-import com.kylinolap.metadata.model.*;
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.hbase.util.Strings;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -47,11 +45,18 @@ import com.kylinolap.common.KylinConfig;
 import com.kylinolap.common.persistence.ResourceStore;
 import com.kylinolap.common.persistence.RootPersistentEntity;
 import com.kylinolap.common.util.Array;
+import com.kylinolap.common.util.CaseInsensitiveStringMap;
 import com.kylinolap.common.util.JsonUtil;
 import com.kylinolap.metadata.MetadataConstances;
 import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.ColumnDesc;
+import com.kylinolap.metadata.model.DataModelDesc;
+import com.kylinolap.metadata.model.DataType;
 import com.kylinolap.metadata.model.FunctionDesc;
+import com.kylinolap.metadata.model.JoinDesc;
+import com.kylinolap.metadata.model.MeasureDesc;
 import com.kylinolap.metadata.model.ParameterDesc;
+import com.kylinolap.metadata.model.TableDesc;
 import com.kylinolap.metadata.model.TblColRef;
 
 /**
@@ -705,7 +710,7 @@ public class CubeDesc extends RootPersistentEntity {
             }
             
             // verify holistic count distinct as a dependent measure
-            if (m.isHolisticCountDistinct() && Strings.isEmpty(m.getDependentMeasureRef())) {
+            if (m.isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) {
                 throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!");
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java b/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java
index c13bdc0..a243364 100644
--- a/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java
+++ b/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java
@@ -46,10 +46,14 @@ public class MergeDictionaryStep extends AbstractExecutable {
         final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
         final List<CubeSegment> mergingSegments = getMergingSegments(cube);
         
+        Collections.sort(mergingSegments);
+        
         try {
+            checkLookupSnapshotsMustIncremental(mergingSegments);
+            
             makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
             makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
+            
             mgr.updateCube(cube);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
@@ -67,6 +71,11 @@ public class MergeDictionaryStep extends AbstractExecutable {
         return result;
     }
 
+    private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
+
+        // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
+    }
+
     /**
      * For the new segment, we need to create dictionaries for it, too. For
      * those dictionaries on fact table, create it by merging underlying
@@ -130,7 +139,8 @@ public class MergeDictionaryStep extends AbstractExecutable {
      * @param newSeg
      */
     private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
-        for (Map.Entry<String, String> entry : mergingSegments.get(0).getSnapshots().entrySet()) {
+        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
+        for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
             newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java
index 5e9acac..1ac2257 100644
--- a/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ b/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java
@@ -14,8 +14,6 @@ import com.kylinolap.job.exception.ExecuteException;
 import com.kylinolap.job.execution.ExecutableContext;
 import com.kylinolap.job.execution.ExecuteResult;
 import com.kylinolap.job.impl.threadpool.AbstractExecutable;
-import com.kylinolap.metadata.model.SegmentStatusEnum;
-import com.kylinolap.metadata.realization.RealizationStatusEnum;
 
 /**
  * Created by qianzhou on 1/4/15.
@@ -102,17 +100,14 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size.");
         long size = Long.parseLong(cubeSizeString) / 1024;
 
-
         segment.setLastBuildJobID(getCubingJobId());
         segment.setLastBuildTime(System.currentTimeMillis());
         segment.setSizeKB(size);
         segment.setSourceRecords(sourceCount);
         segment.setSourceRecordsSize(sourceSize);
-        segment.setStatus(SegmentStatusEnum.READY);
-        cube.setStatus(RealizationStatusEnum.READY);
 
         try {
-            cubeManager.updateCube(cube);
+            cubeManager.promoteNewlyBuiltSegments(cube, segment);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to update cube after build", e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java
index 90c22ab..9e0fd63 100644
--- a/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java
+++ b/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java
@@ -1,5 +1,12 @@
 package com.kylinolap.job.cube;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.kylinolap.common.KylinConfig;
@@ -12,13 +19,6 @@ import com.kylinolap.job.exception.ExecuteException;
 import com.kylinolap.job.execution.ExecutableContext;
 import com.kylinolap.job.execution.ExecuteResult;
 import com.kylinolap.job.impl.threadpool.AbstractExecutable;
-import com.kylinolap.metadata.model.SegmentStatusEnum;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 
 /**
  * Created by qianzhou on 1/7/15.
@@ -43,10 +43,6 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         final CubeInstance cube = cubeManager.getCube(getCubeName());
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        if (mergingSegmentIds.isEmpty()) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
-        }
         CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
         if (mergedSegment == null) {
             return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
@@ -55,30 +51,28 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size.");
         long cubeSize = Long.parseLong(cubeSizeString) / 1024;
 
-        List<CubeSegment> toBeRemoved = Lists.newArrayListWithExpectedSize(mergingSegmentIds.size());
-        for (CubeSegment segment : cube.getSegments()) {
-            if (mergingSegmentIds.contains(segment.getUuid())) {
-                toBeRemoved.add(segment);
-            }
+        List<String> mergingSegmentIds = getMergingSegmentIds();
+        if (mergingSegmentIds.isEmpty()) {
+            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
         }
-
+        
         long sourceCount = 0L;
         long sourceSize = 0L;
-        for (CubeSegment segment : toBeRemoved) {
+        for (String id : mergingSegmentIds) {
+            CubeSegment segment = cube.getSegmentById(id);
             sourceCount += segment.getSourceRecords();
             sourceSize += segment.getSourceRecordsSize();
         }
-        //update segment info
+        
+        // update segment info
         mergedSegment.setSizeKB(cubeSize);
         mergedSegment.setSourceRecords(sourceCount);
         mergedSegment.setSourceRecordsSize(sourceSize);
         mergedSegment.setLastBuildJobID(getCubingJobId());
-        mergedSegment.setStatus(SegmentStatusEnum.READY);
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
-        //remove old segment
-        cube.getSegments().removeAll(toBeRemoved);
+        
         try {
-            cubeManager.updateCube(cube);
+            cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
             return new ExecuteResult(ExecuteResult.State.SUCCEED);
         } catch (IOException e) {
             logger.error("fail to update cube after merge", e);