You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/01/12 04:05:32 UTC
[29/50] [abbrv] kylin git commit: fix NPE in CacheDictionary in Spark
cubing
fix NPE in CacheDictionary in Spark cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/66bca9a6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/66bca9a6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/66bca9a6
Branch: refs/heads/master-hbase1.x
Commit: 66bca9a676fda02aa88cc25bad2e545f779f6dde
Parents: 2cf52b4
Author: shaofengshi <sh...@apache.org>
Authored: Sat Jan 7 10:17:33 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 9 16:58:11 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/dict/CacheDictionary.java | 2 +-
.../apache/kylin/engine/mr/BatchCubingJobBuilder.java | 5 ++---
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 7 +++----
.../org/apache/kylin/engine/mr/JobBuilderSupport.java | 14 +-------------
.../engine/spark/SparkBatchCubingJobBuilder2.java | 1 +
.../apache/kylin/engine/spark/SparkCubingByLayer.java | 2 +-
.../kylin/engine/spark/SparkCubingJobBuilder.java | 1 -
7 files changed, 9 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java
index d7ed6bd..b2bad53 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java
@@ -35,7 +35,7 @@ public abstract class CacheDictionary<T> extends Dictionary<T> {
protected transient int baseId;
- protected transient BytesConverter<T> bytesConvert;
+ protected BytesConverter<T> bytesConvert;
public CacheDictionary() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 456f615..36c12a1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -65,12 +65,11 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
// Phase 3: Build Cube
RowKeyDesc rowKeyDesc = seg.getCubeDesc().getRowkey();
final int groupRowkeyColumnsCount = seg.getCubeDesc().getBuildLevel();
- final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, groupRowkeyColumnsCount);
// base cuboid step
- result.addTask(createBaseCuboidStep(cuboidOutputTempPath[0], jobId));
+ result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
// n dim cuboid steps
for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
- result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath[i - 1], cuboidOutputTempPath[i], i));
+ result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i));
}
outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 700f821..dd866bd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -76,14 +76,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
- private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+ protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
final int maxLevel = seg.getCubeDesc().getBuildLevel();
- final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, maxLevel);
// base cuboid step
- result.addTask(createBaseCuboidStep(cuboidOutputTempPath[0], jobId));
+ result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
// n dim cuboid steps
for (int i = 1; i <= maxLevel; i++) {
- result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath[i - 1], cuboidOutputTempPath[i], i, jobId));
+ result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i-1), i, jobId));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 14252ee..696b22a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -193,23 +193,11 @@ public class JobBuilderSupport {
return buf.append(" -").append(paraName).append(" ").append(paraValue);
}
- public String[] getCuboidOutputPaths(String cuboidRootPath, int levels) {
- String[] paths = new String[levels + 1];
- for (int i = 0; i <= levels; i++) {
- if (i == 0) {
- paths[i] = cuboidRootPath + "base_cuboid";
- } else {
- paths[i] = cuboidRootPath + "level_" + i + "_cuboid";
- }
- }
- return paths;
- }
-
public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) {
if (level == 0) {
return cuboidRootPath + "base_cuboid";
} else {
- return cuboidRootPath + level + "level_cuboid";
+ return cuboidRootPath + "level_" + level + "_cuboid";
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 9431468..55e11c4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -41,6 +41,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
super(newSegment, submitter);
}
+ @Override
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 53c1f96..93cce81 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -260,7 +260,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
// aggregate to ND cuboids
PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
- for (level = 1; level < totalLevels; level++) {
+ for (level = 1; level <= totalLevels; level++) {
partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
logger.info("Level " + level + " partition number: " + partition);
allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition).persist(storageLevel);
http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
index edd9460..76e4521 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
@@ -50,7 +50,6 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
public DefaultChainedExecutable build() {
final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
- final String jobId = result.getId();
inputSide.addStepPhase1_CreateFlatTable(result);
final IJoinedFlatTableDesc joinedFlatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);