You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/04/22 06:12:04 UTC

[35/50] [abbrv] kylin git commit: KYLIN-1560 Make BatchCubingJobBuilder2 easier to add additional step

KYLIN-1560 Make BatchCubingJobBuilder2 easier to add additional step

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/be6e2065
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/be6e2065
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/be6e2065

Branch: refs/heads/1.5.x-HBase1.1.3
Commit: be6e2065a853f71a2e898ac9cf3c4144ed6fb8fd
Parents: 9a8153a
Author: shaofengshi <sh...@apache.org>
Authored: Wed Apr 6 14:46:55 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Apr 6 14:55:55 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfigBase.java |  9 +++++++++
 .../org/apache/kylin/engine/EngineFactory.java   |  5 +++--
 .../kylin/engine/mr/BatchCubingJobBuilder.java   | 13 -------------
 .../kylin/engine/mr/BatchCubingJobBuilder2.java  | 19 +++++--------------
 .../kylin/engine/mr/BatchMergeJobBuilder2.java   |  5 +++++
 .../kylin/engine/mr/JobBuilderSupport.java       | 14 ++++++++++++++
 .../engine/mr/steps/BaseCuboidMapperBase.java    | 19 ++++++++++++++-----
 .../engine/mr/steps/HiveToBaseCuboidMapper.java  |  8 --------
 8 files changed, 50 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9db3081..2c0b353 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -631,4 +631,13 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.init.tasks");
     }
 
+    public String getMRBatchEngineV1Class() {
+        return getOptional("kylin.cube.mr.engine.v1.class", "org.apache.kylin.engine.mr.MRBatchCubingEngine");
+    }
+
+    public String getMRBatchEngineV2Class() {
+        return getOptional("kylin.cube.mr.engine.v2.class", "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 919ede6..bba9060 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -24,6 +24,7 @@ import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V2;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -35,8 +36,8 @@ public class EngineFactory {
     private static ImplementationSwitch<IStreamingCubingEngine> streamingEngines;
     static {
         Map<Integer, String> impls = new HashMap<>();
-        impls.put(ID_MR_V1, "org.apache.kylin.engine.mr.MRBatchCubingEngine");
-        impls.put(ID_MR_V2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+        impls.put(ID_MR_V1, KylinConfig.getInstanceFromEnv().getMRBatchEngineV1Class());
+        impls.put(ID_MR_V2, KylinConfig.getInstanceFromEnv().getMRBatchEngineV2Class());
         batchEngines = new ImplementationSwitch<IBatchCubingEngine>(impls, IBatchCubingEngine.class);
 
         impls.clear();

http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 45d03d1..7f729a6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -126,17 +126,4 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         return ndCuboidStep;
     }
 
-    private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
-        String[] paths = new String[groupRowkeyColumnsCount + 1];
-        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
-            int dimNum = totalRowkeyColumnCount - i;
-            if (dimNum == totalRowkeyColumnCount) {
-                paths[i] = cuboidRootPath + "base_cuboid";
-            } else {
-                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
-            }
-        }
-        return paths;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 5f4a3ed..0b1bd90 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -59,6 +59,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         result.addTask(createFactDistinctColumnsStepWithStats(jobId));
         result.addTask(createBuildDictionaryStep(jobId));
         result.addTask(createSaveStatisticsStep(jobId));
+        addOtherStepBeforeCubing(result);
         outputSide.addStepPhase2_BuildDictionary(result);
 
         // Phase 3: Build Cube
@@ -98,6 +99,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
+    protected void addOtherStepBeforeCubing(CubingJob result) {
+
+    }
+
     private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
         // base cuboid job
         MapReduceExecutable cubeStep = new MapReduceExecutable();
@@ -162,18 +167,4 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
         return ndCuboidStep;
     }
-
-    private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
-        String[] paths = new String[groupRowkeyColumnsCount + 1];
-        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
-            int dimNum = totalRowkeyColumnCount - i;
-            if (dimNum == totalRowkeyColumnCount) {
-                paths[i] = cuboidRootPath + "base_cuboid";
-            } else {
-                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
-            }
-        }
-        return paths;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index e151674..08ddaf8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -64,6 +64,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         // Phase 1: Merge Dictionary
         result.addTask(createMergeDictionaryStep(mergingSegmentIds));
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
+        addOtherStepBeforeMerge(result);
         outputSide.addStepPhase1_MergeDictionary(result);
 
         // Phase 2: Merge Cube Files
@@ -107,4 +108,8 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         return mergeCuboidDataStep;
     }
 
+    protected void addOtherStepBeforeMerge(CubingJob result) {
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index a1a0cf3..7463fe0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -187,4 +187,18 @@ public class JobBuilderSupport {
         return buf.append(" -").append(paraName).append(" ").append(paraValue);
     }
 
+    public String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+        String[] paths = new String[groupRowkeyColumnsCount + 1];
+        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnCount - i;
+            if (dimNum == totalRowkeyColumnCount) {
+                paths[i] = cuboidRootPath + "base_cuboid";
+            } else {
+                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
+            }
+        }
+        return paths;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index bc664aa..a1eeb1b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
@@ -56,7 +57,7 @@ import com.google.common.collect.Lists;
 /**
  */
 public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
-    protected static final Logger logger = LoggerFactory.getLogger(HiveToBaseCuboidMapper.class);
+    protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class);
     public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
     public static final byte[] ONE = Bytes.toBytes("1");
     protected String cubeName;
@@ -78,8 +79,8 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
     protected AbstractRowKeyEncoder rowKeyEncoder;
     protected MeasureCodec measureCodec;
     private int errorRecordCounter;
-    private Text outputKey = new Text();
-    private Text outputValue = new Text();
+    protected Text outputKey = new Text();
+    protected Text outputValue = new Text();
     private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
 
     @Override
@@ -132,7 +133,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         }
     }
 
-    private boolean isNull(byte[] v) {
+    protected boolean isNull(byte[] v) {
         for (byte[] nullByte : nullBytes) {
             if (Bytes.equals(v, nullByte))
                 return true;
@@ -140,7 +141,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         return false;
     }
 
-    private byte[] buildKey(SplittedBytes[] splitBuffers) {
+    protected byte[] buildKey(SplittedBytes[] splitBuffers) {
         int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
         for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
             int index = rowKeyColumnIndexes[i];
@@ -207,6 +208,14 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         context.write(outputKey, outputValue);
     }
 
+    protected byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
+        byte[][] result = new byte[row.length][];
+        for (int i = 0; i < row.length; i++) {
+            result[i] = row[i] == null ? HIVE_NULL : row[i].getBytes("UTF-8");
+        }
+        return result;
+    }
+
     protected void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
 
         logger.error("Insane record: " + bytesSplitter, ex);

http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index 8f5557d..96e8030 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -57,12 +57,4 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
         }
     }
 
-    private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
-        byte[][] result = new byte[row.length][];
-        for (int i = 0; i < row.length; i++) {
-            result[i] = row[i] == null ? HIVE_NULL : row[i].getBytes("UTF-8");
-        }
-        return result;
-    }
-
 }