You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:13 UTC

[11/18] kylin git commit: APACHE-KYLIN-3018: Add getLongestDepth() in CuboidUtil for getting a reasonable maxLevel for layered cubing

APACHE-KYLIN-3018: Add getLongestDepth() in CuboidUtil for getting a reasonable maxLevel for layered cubing

Signed-off-by: Zhong <nj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1b6d8fe9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1b6d8fe9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1b6d8fe9

Branch: refs/heads/ci-dong
Commit: 1b6d8fe91f8cfc25b8684ab66007358ef8c535f9
Parents: 5ecc48a
Author: Wang Ken <mi...@ebay.com>
Authored: Tue Nov 7 19:43:00 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Nov 23 13:31:34 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/cube/cuboid/CuboidUtil.java    |  37 +++++++
 .../cube/cuboid/TreeCuboidSchedulerManager.java | 102 -------------------
 .../kylin/cube/cuboid/CuboidUtilTest.java       |  57 +++++++++++
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |   3 +-
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |   4 +-
 .../engine/mr/BatchOptimizeJobBuilder2.java     |   7 +-
 .../engine/mr/common/CuboidSchedulerUtil.java   |  26 ++---
 .../kylin/engine/mr/steps/NDCuboidMapper.java   |   1 +
 .../kylin/engine/spark/SparkCubingByLayer.java  |   3 +-
 9 files changed, 119 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
index a84f153..e5404c8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
@@ -18,7 +18,17 @@
 
 package org.apache.kylin.cube.cuboid;
 
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.algorithm.CuboidStatsUtil;
+
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class CuboidUtil {
 
@@ -45,4 +55,31 @@ public class CuboidUtil {
         }
         return allCuboidsBitSet;
     }
+
+    public static int getLongestDepth(Set<Long> cuboidSet) {
+        Map<Long, List<Long>> directChildrenCache = CuboidStatsUtil.createDirectChildrenCache(cuboidSet);
+        List<Long> cuboids = Lists.newArrayList(cuboidSet);
+        Collections.sort(cuboids, new Comparator<Long>() {
+            @Override
+            public int compare(Long o1, Long o2) {
+                return -Long.compare(o1, o2);
+            }
+        });
+
+        int longestDepth = 0;
+        Map<Long, Integer> cuboidDepthMap = Maps.newHashMap();
+        for (Long cuboid : cuboids) {
+            int parentDepth = cuboidDepthMap.get(cuboid) == null ? 0 : cuboidDepthMap.get(cuboid);
+            for (Long childCuboid : directChildrenCache.get(cuboid)) {
+                if (cuboidDepthMap.get(childCuboid) == null || cuboidDepthMap.get(childCuboid) < parentDepth + 1) {
+                    cuboidDepthMap.put(childCuboid, parentDepth + 1);
+                    if (longestDepth < parentDepth + 1) {
+                        longestDepth = parentDepth + 1;
+                    }
+                }
+            }
+        }
+
+        return longestDepth;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
deleted file mode 100644
index 22e636b..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.cuboid;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class TreeCuboidSchedulerManager {
-    private static ConcurrentMap<String, TreeCuboidScheduler> cache = Maps.newConcurrentMap();
-
-    private class TreeCuboidSchedulerSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            cache.clear();
-        }
-
-        @Override
-        public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
-                throws IOException {
-            cache.remove(cacheKey);
-        }
-    }
-
-    public TreeCuboidSchedulerManager() {
-        Broadcaster.getInstance(KylinConfig.getInstanceFromEnv())
-                .registerListener(new TreeCuboidSchedulerSyncListener(), "cube");
-    }
-
-    private static TreeCuboidSchedulerManager instance = new TreeCuboidSchedulerManager();
-
-    public static TreeCuboidSchedulerManager getInstance() {
-        return instance;
-    }
-
-    /**
-     *
-     * @param cubeName
-     * @return null if the cube has no pre-built cuboids
-     */
-    public TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) {
-        TreeCuboidScheduler result = cache.get(cubeName);
-        if (result == null) {
-            CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-            if (cubeInstance == null) {
-                return null;
-            }
-            TreeCuboidScheduler treeCuboidScheduler = getTreeCuboidScheduler(cubeInstance.getDescriptor(),
-                    cubeManager.getCube(cubeName).getCuboids());
-            if (treeCuboidScheduler == null) {
-                return null;
-            }
-            cache.put(cubeName, treeCuboidScheduler);
-            result = treeCuboidScheduler;
-        }
-        return result;
-    }
-
-    public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) {
-        if (cuboidsWithRowCnt == null || cuboidsWithRowCnt.isEmpty()) {
-            return null;
-        }
-        return getTreeCuboidScheduler(cubeDesc, Lists.newArrayList(cuboidsWithRowCnt.keySet()), cuboidsWithRowCnt);
-    }
-
-    public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds,
-            Map<Long, Long> cuboidsWithRowCnt) {
-        if (cuboidIds == null || cuboidsWithRowCnt == null) {
-            return null;
-        }
-        TreeCuboidScheduler treeCuboidScheduler = new TreeCuboidScheduler(cubeDesc, cuboidIds,
-                new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt));
-        return treeCuboidScheduler;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidUtilTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidUtilTest.java b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidUtilTest.java
new file mode 100644
index 0000000..18a9312
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidUtilTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.cuboid;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+
+public class CuboidUtilTest {
+    @Test
+    public void testGetLongestDepth() {
+        Stopwatch sw = new Stopwatch();
+
+        Set<Long> cuboidSet1 = Sets.newHashSet(7L, 6L, 5L, 4L, 3L, 2L, 1L);
+        sw.start();
+        assertEquals(2, CuboidUtil.getLongestDepth(cuboidSet1));
+        System.out.println("Time cost for GetLongestDepth: " + sw.elapsed(TimeUnit.MILLISECONDS) + "ms");
+
+        Set<Long> cuboidSet2 = Sets.newHashSet(1024L, 1666L, 1667L, 1728L, 1730L, 1731L, 1760L, 1762L, 1763L, 1776L,
+                1778L, 1779L, 1784L, 1788L, 1790L, 1791L, 1920L, 1922L, 1923L, 1984L, 1986L, 1987L, 2016L, 2018L, 2019L,
+                2032L, 2034L, 2035L, 2040L, 2044L, 2046L, 2047L);
+        sw.reset();
+        sw.start();
+        assertEquals(8, CuboidUtil.getLongestDepth(cuboidSet2));
+        System.out.println("Time cost for GetLongestDepth: " + sw.elapsed(TimeUnit.MILLISECONDS) + "ms");
+
+        Set<Long> cuboidSet3 = Sets.newHashSet(31L, 11L, 5L, 3L, 1L);
+        sw.reset();
+        sw.start();
+        assertEquals(3, CuboidUtil.getLongestDepth(cuboidSet3));
+        System.out.println("Time cost for GetLongestDepth: " + sw.elapsed(TimeUnit.MILLISECONDS) + "ms");
+
+        sw.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index f64365a..432e1ab 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
@@ -64,7 +65,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
 
         // Phase 3: Build Cube
         RowKeyDesc rowKeyDesc = seg.getCubeDesc().getRowkey();
-        final int groupRowkeyColumnsCount = seg.getCuboidScheduler().getBuildLevel();
+        final int groupRowkeyColumnsCount = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
         // base cuboid step
         result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
         // n dim cuboid steps

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 4b808d1..8fbc0c9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -98,7 +99,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
     }
 
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
-        final int maxLevel = seg.getCuboidScheduler().getBuildLevel();
+        // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
+        final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
         // base cuboid step
         result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
         // n dim cuboid steps

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
index a8127cc..af92368 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -151,14 +152,14 @@ public class BatchOptimizeJobBuilder2 extends JobBuilderSupport {
     private void addLayerCubingSteps(final CubingJob result, final String jobId, final CuboidModeEnum cuboidMode,
             final String cuboidRootPath) {
         // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
-        final int maxLevel = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+        final int maxLevel = CuboidUtil.getLongestDepth(seg.getCubeInstance().getCuboidsByMode(cuboidMode));
         // Don't need to build base cuboid
         // n dim cuboid steps
         for (int i = 1; i <= maxLevel; i++) {
             String parentCuboidPath = i == 1 ? getBaseCuboidPath(cuboidRootPath)
                     : getCuboidOutputPathsByLevel(cuboidRootPath, i - 1);
-            result.addTask(createNDimensionCuboidStep(parentCuboidPath,
-                    getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId, cuboidMode));
+            result.addTask(createNDimensionCuboidStep(parentCuboidPath, getCuboidOutputPathsByLevel(cuboidRootPath, i),
+                    i, jobId, cuboidMode));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
index d684c04..1809ff0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
@@ -19,36 +19,36 @@
 package org.apache.kylin.engine.mr.common;
 
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler;
-import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager;
+import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
+
+import com.google.common.collect.Lists;
 
 public class CuboidSchedulerUtil {
 
     public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, String cuboidModeName) {
-        return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidModeName));
+        return getCuboidSchedulerByMode(segment, CuboidModeEnum.getByModeName(cuboidModeName));
     }
 
     public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, CuboidModeEnum cuboidMode) {
-        return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode));
+        return getCuboidScheduler(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode));
     }
 
-    public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, Set<Long> cuboidSet) {
-        CuboidScheduler cuboidScheduler;
+    public static CuboidScheduler getCuboidScheduler(CubeSegment segment, Set<Long> cuboidSet) {
         try {
-            cuboidScheduler = TreeCuboidSchedulerManager.getInstance().getTreeCuboidScheduler(segment.getCubeDesc(), //
-                    CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment));
+            Map<Long, Long> cuboidsWithRowCnt = CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment);
+            Comparator<Long> comparator = cuboidsWithRowCnt == null ? Cuboid.cuboidSelectComparator
+                    : new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt);
+            return new TreeCuboidScheduler(segment.getCubeDesc(), Lists.newArrayList(cuboidSet), comparator);
         } catch (IOException e) {
             throw new RuntimeException("Fail to cube stats for segment" + segment + " due to " + e);
         }
-
-        if (cuboidScheduler == null) {
-            cuboidScheduler = new DefaultCuboidScheduler(segment.getCubeDesc());
-        }
-        return cuboidScheduler;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index f936393..a58415a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -103,6 +103,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
         if (handleCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
             logger.info("Handling record with ordinal: " + handleCounter);
+            logger.info("Parent cuboid: " + parentCuboid.getId() + "; Children: " + myChildren);
         }
 
         for (Long child : myChildren) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 8d75070..f7c5fee 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -45,6 +45,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -181,7 +182,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, needAggr);
         }
 
-        final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+        final int totalLevels = CuboidUtil.getLongestDepth(cubeSegment.getCuboidScheduler().getAllCuboidIds());
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
         int level = 0;
         int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);