You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/04/22 06:12:04 UTC
[35/50] [abbrv] kylin git commit: KYLIN-1560 Make
BatchCubingJobBuilder2 easier to add additional step
KYLIN-1560 Make BatchCubingJobBuilder2 easier to add additional step
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/be6e2065
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/be6e2065
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/be6e2065
Branch: refs/heads/1.5.x-HBase1.1.3
Commit: be6e2065a853f71a2e898ac9cf3c4144ed6fb8fd
Parents: 9a8153a
Author: shaofengshi <sh...@apache.org>
Authored: Wed Apr 6 14:46:55 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Apr 6 14:55:55 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfigBase.java | 9 +++++++++
.../org/apache/kylin/engine/EngineFactory.java | 5 +++--
.../kylin/engine/mr/BatchCubingJobBuilder.java | 13 -------------
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 19 +++++--------------
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 5 +++++
.../kylin/engine/mr/JobBuilderSupport.java | 14 ++++++++++++++
.../engine/mr/steps/BaseCuboidMapperBase.java | 19 ++++++++++++++-----
.../engine/mr/steps/HiveToBaseCuboidMapper.java | 8 --------
8 files changed, 50 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9db3081..2c0b353 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -631,4 +631,13 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.init.tasks");
}
+ public String getMRBatchEngineV1Class() {
+ return getOptional("kylin.cube.mr.engine.v1.class", "org.apache.kylin.engine.mr.MRBatchCubingEngine");
+ }
+
+ public String getMRBatchEngineV2Class() {
+ return getOptional("kylin.cube.mr.engine.v2.class", "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 919ede6..bba9060 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -24,6 +24,7 @@ import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V2;
import java.util.HashMap;
import java.util.Map;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ImplementationSwitch;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -35,8 +36,8 @@ public class EngineFactory {
private static ImplementationSwitch<IStreamingCubingEngine> streamingEngines;
static {
Map<Integer, String> impls = new HashMap<>();
- impls.put(ID_MR_V1, "org.apache.kylin.engine.mr.MRBatchCubingEngine");
- impls.put(ID_MR_V2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+ impls.put(ID_MR_V1, KylinConfig.getInstanceFromEnv().getMRBatchEngineV1Class());
+ impls.put(ID_MR_V2, KylinConfig.getInstanceFromEnv().getMRBatchEngineV2Class());
batchEngines = new ImplementationSwitch<IBatchCubingEngine>(impls, IBatchCubingEngine.class);
impls.clear();
http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 45d03d1..7f729a6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -126,17 +126,4 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
return ndCuboidStep;
}
- private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
- String[] paths = new String[groupRowkeyColumnsCount + 1];
- for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
- int dimNum = totalRowkeyColumnCount - i;
- if (dimNum == totalRowkeyColumnCount) {
- paths[i] = cuboidRootPath + "base_cuboid";
- } else {
- paths[i] = cuboidRootPath + dimNum + "d_cuboid";
- }
- }
- return paths;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 5f4a3ed..0b1bd90 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -59,6 +59,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
result.addTask(createFactDistinctColumnsStepWithStats(jobId));
result.addTask(createBuildDictionaryStep(jobId));
result.addTask(createSaveStatisticsStep(jobId));
+ addOtherStepBeforeCubing(result);
outputSide.addStepPhase2_BuildDictionary(result);
// Phase 3: Build Cube
@@ -98,6 +99,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
+ protected void addOtherStepBeforeCubing(CubingJob result) {
+
+ }
+
private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
// base cuboid job
MapReduceExecutable cubeStep = new MapReduceExecutable();
@@ -162,18 +167,4 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
return ndCuboidStep;
}
-
- private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
- String[] paths = new String[groupRowkeyColumnsCount + 1];
- for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
- int dimNum = totalRowkeyColumnCount - i;
- if (dimNum == totalRowkeyColumnCount) {
- paths[i] = cuboidRootPath + "base_cuboid";
- } else {
- paths[i] = cuboidRootPath + dimNum + "d_cuboid";
- }
- }
- return paths;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index e151674..08ddaf8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -64,6 +64,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
// Phase 1: Merge Dictionary
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
+ addOtherStepBeforeMerge(result);
outputSide.addStepPhase1_MergeDictionary(result);
// Phase 2: Merge Cube Files
@@ -107,4 +108,8 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
return mergeCuboidDataStep;
}
+ protected void addOtherStepBeforeMerge(CubingJob result) {
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index a1a0cf3..7463fe0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -187,4 +187,18 @@ public class JobBuilderSupport {
return buf.append(" -").append(paraName).append(" ").append(paraValue);
}
+ public String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+ String[] paths = new String[groupRowkeyColumnsCount + 1];
+ for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnCount - i;
+ if (dimNum == totalRowkeyColumnCount) {
+ paths[i] = cuboidRootPath + "base_cuboid";
+ } else {
+ paths[i] = cuboidRootPath + dimNum + "d_cuboid";
+ }
+ }
+ return paths;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index bc664aa..a1eeb1b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@@ -56,7 +57,7 @@ import com.google.common.collect.Lists;
/**
*/
public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
- protected static final Logger logger = LoggerFactory.getLogger(HiveToBaseCuboidMapper.class);
+ protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class);
public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
public static final byte[] ONE = Bytes.toBytes("1");
protected String cubeName;
@@ -78,8 +79,8 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
protected AbstractRowKeyEncoder rowKeyEncoder;
protected MeasureCodec measureCodec;
private int errorRecordCounter;
- private Text outputKey = new Text();
- private Text outputValue = new Text();
+ protected Text outputKey = new Text();
+ protected Text outputValue = new Text();
private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
@Override
@@ -132,7 +133,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
}
}
- private boolean isNull(byte[] v) {
+ protected boolean isNull(byte[] v) {
for (byte[] nullByte : nullBytes) {
if (Bytes.equals(v, nullByte))
return true;
@@ -140,7 +141,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
return false;
}
- private byte[] buildKey(SplittedBytes[] splitBuffers) {
+ protected byte[] buildKey(SplittedBytes[] splitBuffers) {
int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
int index = rowKeyColumnIndexes[i];
@@ -207,6 +208,14 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
context.write(outputKey, outputValue);
}
+ protected byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
+ byte[][] result = new byte[row.length][];
+ for (int i = 0; i < row.length; i++) {
+ result[i] = row[i] == null ? HIVE_NULL : row[i].getBytes("UTF-8");
+ }
+ return result;
+ }
+
protected void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
logger.error("Insane record: " + bytesSplitter, ex);
http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index 8f5557d..96e8030 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -57,12 +57,4 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
}
}
- private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
- byte[][] result = new byte[row.length][];
- for (int i = 0; i < row.length; i++) {
- result[i] = row[i] == null ? HIVE_NULL : row[i].getBytes("UTF-8");
- }
- return result;
- }
-
}