You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:13 UTC
[11/18] kylin git commit: APACHE-KYLIN-3018: Add getLongestDepth() in
CuboidUtil for getting a reasonable maxLevel for layered cubing
APACHE-KYLIN-3018: Add getLongestDepth() in CuboidUtil for getting a reasonable maxLevel for layered cubing
Signed-off-by: Zhong <nj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1b6d8fe9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1b6d8fe9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1b6d8fe9
Branch: refs/heads/ci-dong
Commit: 1b6d8fe91f8cfc25b8684ab66007358ef8c535f9
Parents: 5ecc48a
Author: Wang Ken <mi...@ebay.com>
Authored: Tue Nov 7 19:43:00 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Nov 23 13:31:34 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/cube/cuboid/CuboidUtil.java | 37 +++++++
.../cube/cuboid/TreeCuboidSchedulerManager.java | 102 -------------------
.../kylin/cube/cuboid/CuboidUtilTest.java | 57 +++++++++++
.../kylin/engine/mr/BatchCubingJobBuilder.java | 3 +-
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 4 +-
.../engine/mr/BatchOptimizeJobBuilder2.java | 7 +-
.../engine/mr/common/CuboidSchedulerUtil.java | 26 ++---
.../kylin/engine/mr/steps/NDCuboidMapper.java | 1 +
.../kylin/engine/spark/SparkCubingByLayer.java | 3 +-
9 files changed, 119 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
index a84f153..e5404c8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
@@ -18,7 +18,17 @@
package org.apache.kylin.cube.cuboid;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.algorithm.CuboidStatsUtil;
+
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
public class CuboidUtil {
@@ -45,4 +55,31 @@ public class CuboidUtil {
}
return allCuboidsBitSet;
}
+
+ public static int getLongestDepth(Set<Long> cuboidSet) {
+ Map<Long, List<Long>> directChildrenCache = CuboidStatsUtil.createDirectChildrenCache(cuboidSet);
+ List<Long> cuboids = Lists.newArrayList(cuboidSet);
+ Collections.sort(cuboids, new Comparator<Long>() {
+ @Override
+ public int compare(Long o1, Long o2) {
+ return -Long.compare(o1, o2);
+ }
+ });
+
+ int longestDepth = 0;
+ Map<Long, Integer> cuboidDepthMap = Maps.newHashMap();
+ for (Long cuboid : cuboids) {
+ int parentDepth = cuboidDepthMap.get(cuboid) == null ? 0 : cuboidDepthMap.get(cuboid);
+ for (Long childCuboid : directChildrenCache.get(cuboid)) {
+ if (cuboidDepthMap.get(childCuboid) == null || cuboidDepthMap.get(childCuboid) < parentDepth + 1) {
+ cuboidDepthMap.put(childCuboid, parentDepth + 1);
+ if (longestDepth < parentDepth + 1) {
+ longestDepth = parentDepth + 1;
+ }
+ }
+ }
+ }
+
+ return longestDepth;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
deleted file mode 100644
index 22e636b..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.cuboid;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class TreeCuboidSchedulerManager {
- private static ConcurrentMap<String, TreeCuboidScheduler> cache = Maps.newConcurrentMap();
-
- private class TreeCuboidSchedulerSyncListener extends Broadcaster.Listener {
- @Override
- public void onClearAll(Broadcaster broadcaster) throws IOException {
- cache.clear();
- }
-
- @Override
- public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
- throws IOException {
- cache.remove(cacheKey);
- }
- }
-
- public TreeCuboidSchedulerManager() {
- Broadcaster.getInstance(KylinConfig.getInstanceFromEnv())
- .registerListener(new TreeCuboidSchedulerSyncListener(), "cube");
- }
-
- private static TreeCuboidSchedulerManager instance = new TreeCuboidSchedulerManager();
-
- public static TreeCuboidSchedulerManager getInstance() {
- return instance;
- }
-
- /**
- *
- * @param cubeName
- * @return null if the cube has no pre-built cuboids
- */
- public TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) {
- TreeCuboidScheduler result = cache.get(cubeName);
- if (result == null) {
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cubeInstance = cubeManager.getCube(cubeName);
- if (cubeInstance == null) {
- return null;
- }
- TreeCuboidScheduler treeCuboidScheduler = getTreeCuboidScheduler(cubeInstance.getDescriptor(),
- cubeManager.getCube(cubeName).getCuboids());
- if (treeCuboidScheduler == null) {
- return null;
- }
- cache.put(cubeName, treeCuboidScheduler);
- result = treeCuboidScheduler;
- }
- return result;
- }
-
- public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) {
- if (cuboidsWithRowCnt == null || cuboidsWithRowCnt.isEmpty()) {
- return null;
- }
- return getTreeCuboidScheduler(cubeDesc, Lists.newArrayList(cuboidsWithRowCnt.keySet()), cuboidsWithRowCnt);
- }
-
- public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds,
- Map<Long, Long> cuboidsWithRowCnt) {
- if (cuboidIds == null || cuboidsWithRowCnt == null) {
- return null;
- }
- TreeCuboidScheduler treeCuboidScheduler = new TreeCuboidScheduler(cubeDesc, cuboidIds,
- new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt));
- return treeCuboidScheduler;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidUtilTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidUtilTest.java b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidUtilTest.java
new file mode 100644
index 0000000..18a9312
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidUtilTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.cuboid;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+
+public class CuboidUtilTest {
+ @Test
+ public void testGetLongestDepth() {
+ Stopwatch sw = new Stopwatch();
+
+ Set<Long> cuboidSet1 = Sets.newHashSet(7L, 6L, 5L, 4L, 3L, 2L, 1L);
+ sw.start();
+ assertEquals(2, CuboidUtil.getLongestDepth(cuboidSet1));
+ System.out.println("Time cost for GetLongestDepth: " + sw.elapsed(TimeUnit.MILLISECONDS) + "ms");
+
+ Set<Long> cuboidSet2 = Sets.newHashSet(1024L, 1666L, 1667L, 1728L, 1730L, 1731L, 1760L, 1762L, 1763L, 1776L,
+ 1778L, 1779L, 1784L, 1788L, 1790L, 1791L, 1920L, 1922L, 1923L, 1984L, 1986L, 1987L, 2016L, 2018L, 2019L,
+ 2032L, 2034L, 2035L, 2040L, 2044L, 2046L, 2047L);
+ sw.reset();
+ sw.start();
+ assertEquals(8, CuboidUtil.getLongestDepth(cuboidSet2));
+ System.out.println("Time cost for GetLongestDepth: " + sw.elapsed(TimeUnit.MILLISECONDS) + "ms");
+
+ Set<Long> cuboidSet3 = Sets.newHashSet(31L, 11L, 5L, 3L, 1L);
+ sw.reset();
+ sw.start();
+ assertEquals(3, CuboidUtil.getLongestDepth(cuboidSet3));
+ System.out.println("Time cost for GetLongestDepth: " + sw.elapsed(TimeUnit.MILLISECONDS) + "ms");
+
+ sw.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index f64365a..432e1ab 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
@@ -64,7 +65,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
// Phase 3: Build Cube
RowKeyDesc rowKeyDesc = seg.getCubeDesc().getRowkey();
- final int groupRowkeyColumnsCount = seg.getCuboidScheduler().getBuildLevel();
+ final int groupRowkeyColumnsCount = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
// base cuboid step
result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
// n dim cuboid steps
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 4b808d1..8fbc0c9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -98,7 +99,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
}
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
- final int maxLevel = seg.getCuboidScheduler().getBuildLevel();
+ // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
+ final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
// base cuboid step
result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
// n dim cuboid steps
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
index a8127cc..af92368 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -151,14 +152,14 @@ public class BatchOptimizeJobBuilder2 extends JobBuilderSupport {
private void addLayerCubingSteps(final CubingJob result, final String jobId, final CuboidModeEnum cuboidMode,
final String cuboidRootPath) {
// Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
- final int maxLevel = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+ final int maxLevel = CuboidUtil.getLongestDepth(seg.getCubeInstance().getCuboidsByMode(cuboidMode));
// Don't need to build base cuboid
// n dim cuboid steps
for (int i = 1; i <= maxLevel; i++) {
String parentCuboidPath = i == 1 ? getBaseCuboidPath(cuboidRootPath)
: getCuboidOutputPathsByLevel(cuboidRootPath, i - 1);
- result.addTask(createNDimensionCuboidStep(parentCuboidPath,
- getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId, cuboidMode));
+ result.addTask(createNDimensionCuboidStep(parentCuboidPath, getCuboidOutputPathsByLevel(cuboidRootPath, i),
+ i, jobId, cuboidMode));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
index d684c04..1809ff0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
@@ -19,36 +19,36 @@
package org.apache.kylin.engine.mr.common;
import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
import java.util.Set;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler;
-import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager;
+import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
+
+import com.google.common.collect.Lists;
public class CuboidSchedulerUtil {
public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, String cuboidModeName) {
- return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidModeName));
+ return getCuboidSchedulerByMode(segment, CuboidModeEnum.getByModeName(cuboidModeName));
}
public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, CuboidModeEnum cuboidMode) {
- return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode));
+ return getCuboidScheduler(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode));
}
- public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, Set<Long> cuboidSet) {
- CuboidScheduler cuboidScheduler;
+ public static CuboidScheduler getCuboidScheduler(CubeSegment segment, Set<Long> cuboidSet) {
try {
- cuboidScheduler = TreeCuboidSchedulerManager.getInstance().getTreeCuboidScheduler(segment.getCubeDesc(), //
- CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment));
+ Map<Long, Long> cuboidsWithRowCnt = CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment);
+ Comparator<Long> comparator = cuboidsWithRowCnt == null ? Cuboid.cuboidSelectComparator
+ : new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt);
+ return new TreeCuboidScheduler(segment.getCubeDesc(), Lists.newArrayList(cuboidSet), comparator);
} catch (IOException e) {
throw new RuntimeException("Fail to cube stats for segment" + segment + " due to " + e);
}
-
- if (cuboidScheduler == null) {
- cuboidScheduler = new DefaultCuboidScheduler(segment.getCubeDesc());
- }
- return cuboidScheduler;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index f936393..a58415a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -103,6 +103,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
if (handleCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handling record with ordinal: " + handleCounter);
+ logger.info("Parent cuboid: " + parentCuboid.getId() + "; Children: " + myChildren);
}
for (Long child : myChildren) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6d8fe9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 8d75070..f7c5fee 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -45,6 +45,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
@@ -181,7 +182,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, needAggr);
}
- final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+ final int totalLevels = CuboidUtil.getLongestDepth(cubeSegment.getCuboidScheduler().getAllCuboidIds());
JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
int level = 0;
int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);