You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/09 09:14:58 UTC

[7/8] kylin git commit: fix NPE in CacheDictionary in Spark cubing

fix NPE in CacheDictionary in Spark cubing


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/66bca9a6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/66bca9a6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/66bca9a6

Branch: refs/heads/master
Commit: 66bca9a676fda02aa88cc25bad2e545f779f6dde
Parents: 2cf52b4
Author: shaofengshi <sh...@apache.org>
Authored: Sat Jan 7 10:17:33 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 9 16:58:11 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/dict/CacheDictionary.java   |  2 +-
 .../apache/kylin/engine/mr/BatchCubingJobBuilder.java |  5 ++---
 .../kylin/engine/mr/BatchCubingJobBuilder2.java       |  7 +++----
 .../org/apache/kylin/engine/mr/JobBuilderSupport.java | 14 +-------------
 .../engine/spark/SparkBatchCubingJobBuilder2.java     |  1 +
 .../apache/kylin/engine/spark/SparkCubingByLayer.java |  2 +-
 .../kylin/engine/spark/SparkCubingJobBuilder.java     |  1 -
 7 files changed, 9 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java
index d7ed6bd..b2bad53 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java
@@ -35,7 +35,7 @@ public abstract class CacheDictionary<T> extends Dictionary<T> {
 
     protected transient int baseId;
 
-    protected transient BytesConverter<T> bytesConvert;
+    protected BytesConverter<T> bytesConvert;
 
     public CacheDictionary() {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 456f615..36c12a1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -65,12 +65,11 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         // Phase 3: Build Cube
         RowKeyDesc rowKeyDesc = seg.getCubeDesc().getRowkey();
         final int groupRowkeyColumnsCount = seg.getCubeDesc().getBuildLevel();
-        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, groupRowkeyColumnsCount);
         // base cuboid step
-        result.addTask(createBaseCuboidStep(cuboidOutputTempPath[0], jobId));
+        result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
         // n dim cuboid steps
         for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
-            result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath[i - 1], cuboidOutputTempPath[i], i));
+            result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i));
         }
         outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 700f821..dd866bd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -76,14 +76,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
-    private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+    protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final int maxLevel = seg.getCubeDesc().getBuildLevel();
-        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, maxLevel);
         // base cuboid step
-        result.addTask(createBaseCuboidStep(cuboidOutputTempPath[0], jobId));
+        result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
         // n dim cuboid steps
         for (int i = 1; i <= maxLevel; i++) {
-            result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath[i - 1], cuboidOutputTempPath[i], i, jobId));
+            result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i-1), i, jobId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 14252ee..696b22a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -193,23 +193,11 @@ public class JobBuilderSupport {
         return buf.append(" -").append(paraName).append(" ").append(paraValue);
     }
 
-    public String[] getCuboidOutputPaths(String cuboidRootPath, int levels) {
-        String[] paths = new String[levels + 1];
-        for (int i = 0; i <= levels; i++) {
-            if (i == 0) {
-                paths[i] = cuboidRootPath + "base_cuboid";
-            } else {
-                paths[i] = cuboidRootPath + "level_" + i + "_cuboid";
-            }
-        }
-        return paths;
-    }
-
     public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) {
         if (level == 0) {
             return cuboidRootPath + "base_cuboid";
         } else {
-            return cuboidRootPath + level + "level_cuboid";
+            return cuboidRootPath + "level_" + level + "_cuboid";
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 9431468..55e11c4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -41,6 +41,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
         super(newSegment, submitter);
     }
 
+    @Override
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 53c1f96..93cce81 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -260,7 +260,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         // aggregate to ND cuboids
         PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
 
-        for (level = 1; level < totalLevels; level++) {
+        for (level = 1; level <= totalLevels; level++) {
             partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
             logger.info("Level " + level + " partition number: " + partition);
             allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition).persist(storageLevel);

http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
index edd9460..76e4521 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
@@ -50,7 +50,6 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
 
     public DefaultChainedExecutable build() {
         final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
-        final String jobId = result.getId();
 
         inputSide.addStepPhase1_CreateFlatTable(result);
         final IJoinedFlatTableDesc joinedFlatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);