You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/07/12 03:52:10 UTC
[2/2] kylin git commit: minor refactor to make builder more extendable
minor refactor to make builder more extendable
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/23d73ef3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/23d73ef3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/23d73ef3
Branch: refs/heads/master
Commit: 23d73ef3bee464dec5abaf2188d86598f8493178
Parents: 80bb6f7
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Jul 12 11:38:58 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Jul 12 11:39:34 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeInstance.java | 1 +
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 19 ++++++++++++++++---
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 8 +++++++-
.../kylin/engine/mr/steps/MergeCuboidJob.java | 1 -
.../kylin/engine/mr/steps/MergeCuboidMapper.java | 2 +-
5 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 6820a60..ce12ac8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -305,6 +305,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return result;
}
+
public CubeSegment getSegment(String name, SegmentStatusEnum status) {
for (CubeSegment segment : segments) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 9c4ddb2..afa601c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -22,6 +22,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
@@ -115,11 +116,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
cubeStep.setMapReduceParams(cmd.toString());
- cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
+ cubeStep.setMapReduceJobClass(getInMemCuboidJob());
cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
return cubeStep;
}
+ protected Class<? extends AbstractHadoopJob> getInMemCuboidJob() {
+ return InMemCuboidJob.class;
+ }
+
private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
// base cuboid job
MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
@@ -138,11 +143,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
baseCuboidStep.setMapReduceParams(cmd.toString());
- baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+ baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return baseCuboidStep;
}
+ protected Class<? extends AbstractHadoopJob> getBaseCuboidJob() {
+ return BaseCuboidJob.class;
+ }
+
private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount, String jobId) {
// ND cuboid job
MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
@@ -160,7 +169,11 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
ndCuboidStep.setMapReduceParams(cmd.toString());
- ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
+ ndCuboidStep.setMapReduceJobClass(getNDCuboidJob());
return ndCuboidStep;
}
+
+ protected Class<? extends AbstractHadoopJob> getNDCuboidJob() {
+ return NDCuboidJob.class;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index f1d7281..06b7528 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -103,8 +104,13 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
+ mergeCuboidDataStep.setMapReduceJobClass(getMergeCuboidJob());
return mergeCuboidDataStep;
}
+ protected Class<? extends AbstractHadoopJob> getMergeCuboidJob() {
+ return MergeCuboidJob.class;
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index be1a1c1..5546bce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -69,7 +69,6 @@ public class MergeCuboidJob extends CuboidJob {
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
- // Reducer - only one
job.setReducerClass(CuboidReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index bacc77b..5fd321c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -143,7 +143,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
- public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
+ public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
String filePath = fileSplit.getPath().toString();
String jobID = extractJobIDFromPath(filePath);
return findSegmentWithUuid(jobID, cube);