You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:28 UTC
[20/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory2.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory2.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory2.java
new file mode 100644
index 0000000..9e26378
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.IBuildable;
+
+/**
+ */
+public class StorageFactory2 {
+
+ private static final IStorage dft = (IStorage) ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStorage");
+
+ public static <T> T createEngineAdapter(IBuildable buildable, Class<T> engineInterface) {
+ return dft.adaptToBuildEngine(engineInterface);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
new file mode 100644
index 0000000..c6b1a18
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.tuple;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+
+import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+
+/**
+ * @author xjiang
+ */
+public class Tuple implements ITuple {
+
+ @IgnoreSizeOf
+ private final TupleInfo info;
+ private final Object[] values;
+
+ public Tuple(TupleInfo info) {
+ this.info = info;
+ this.values = new Object[info.size()];
+ }
+
+ public List<String> getAllFields() {
+ return info.getAllFields();
+ }
+
+ public List<TblColRef> getAllColumns() {
+ return info.getAllColumns();
+ }
+
+ public Object[] getAllValues() {
+ return values;
+ }
+
+ @Override
+ public ITuple makeCopy() {
+ Tuple ret = new Tuple(this.info);
+ for (int i = 0; i < this.values.length; ++i) {
+ ret.values[i] = this.values[i];
+ }
+ return ret;
+ }
+
+ public TupleInfo getInfo() {
+ return info;
+ }
+
+ public String getFieldName(TblColRef col) {
+ return info.getFieldName(col);
+ }
+
+ public TblColRef getFieldColumn(String fieldName) {
+ return info.getColumn(fieldName);
+ }
+
+ public Object getValue(String fieldName) {
+ int index = info.getFieldIndex(fieldName);
+ return values[index];
+ }
+
+ public Object getValue(TblColRef col) {
+ int index = info.getColumnIndex(col);
+ return values[index];
+ }
+
+ public String getDataTypeName(int idx) {
+ return info.getDataTypeName(idx);
+ }
+
+ public void setDimensionValue(String fieldName, String fieldValue) {
+ setDimensionValue(info.getFieldIndex(fieldName), fieldValue);
+ }
+
+ public void setDimensionValue(int idx, String fieldValue) {
+ Object objectValue = convertOptiqCellValue(fieldValue, getDataTypeName(idx));
+ values[idx] = objectValue;
+ }
+
+ public void setMeasureValue(String fieldName, Object fieldValue) {
+ setMeasureValue(info.getFieldIndex(fieldName), fieldValue);
+ }
+
+ public void setMeasureValue(int idx, Object fieldValue) {
+ fieldValue = convertWritableToJava(fieldValue);
+
+ String dataType = getDataTypeName(idx);
+ // special handling for BigDecimal, allow double be aggregated as
+ // BigDecimal during cube build for best precision
+ if ("double".equals(dataType) && fieldValue instanceof BigDecimal) {
+ fieldValue = ((BigDecimal) fieldValue).doubleValue();
+ } else if ("integer".equals(dataType) && !(fieldValue instanceof Integer)) {
+ fieldValue = ((Number) fieldValue).intValue();
+ } else if ("float".equals(dataType) && fieldValue instanceof BigDecimal) {
+ fieldValue = ((BigDecimal) fieldValue).floatValue();
+ }
+ values[idx] = fieldValue;
+ }
+
+ private Object convertWritableToJava(Object o) {
+ if (o instanceof LongMutable)
+ o = ((LongMutable) o).get();
+ else if (o instanceof DoubleMutable)
+ o = ((DoubleMutable) o).get();
+ return o;
+ }
+
+ public boolean hasColumn(TblColRef column) {
+ return info.hasColumn(column);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (String field : info.getAllFields()) {
+ sb.append(field);
+ sb.append("=");
+ sb.append(getValue(field));
+ sb.append(",");
+ }
+ return sb.toString();
+ }
+
+ public static long epicDaysToMillis(int days) {
+ return 1L * days * (1000 * 3600 * 24);
+ }
+
+ public static int dateToEpicDays(String strValue) {
+ Date dateValue = DateFormat.stringToDate(strValue); // NOTE: forces GMT timezone
+ long millis = dateValue.getTime();
+ return (int) (millis / (1000 * 3600 * 24));
+ }
+
+ public static long getTs(ITuple row, TblColRef partitionCol) {
+ //ts column type differentiate
+ if (partitionCol.getDatatype().equals("date")) {
+ return Tuple.epicDaysToMillis(Integer.valueOf(row.getValue(partitionCol).toString()));
+ } else {
+ return Long.valueOf(row.getValue(partitionCol).toString());
+ }
+ }
+
+ public static Object convertOptiqCellValue(String strValue, String dataTypeName) {
+ if (strValue == null)
+ return null;
+
+ if ((strValue.equals("") || strValue.equals("\\N")) && !dataTypeName.equals("string"))
+ return null;
+
+ // TODO use data type enum instead of string comparison
+ if ("date".equals(dataTypeName)) {
+ // convert epoch time
+ return dateToEpicDays(strValue);// Optiq expects Integer instead of Long. by honma
+ } else if ("timestamp".equals(dataTypeName) || "datetime".equals(dataTypeName)) {
+ return Long.valueOf(DateFormat.stringToMillis(strValue));
+ } else if ("tinyint".equals(dataTypeName)) {
+ return Byte.valueOf(strValue);
+ } else if ("short".equals(dataTypeName) || "smallint".equals(dataTypeName)) {
+ return Short.valueOf(strValue);
+ } else if ("integer".equals(dataTypeName)) {
+ return Integer.valueOf(strValue);
+ } else if ("long".equals(dataTypeName) || "bigint".equals(dataTypeName)) {
+ return Long.valueOf(strValue);
+ } else if ("double".equals(dataTypeName)) {
+ return Double.valueOf(strValue);
+ } else if ("decimal".equals(dataTypeName)) {
+ return new BigDecimal(strValue);
+ } else if ("float".equals(dataTypeName)){
+ return Float.valueOf(strValue);
+ } else {
+ return strValue;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-storage/src/main/java/org/apache/kylin/storage/tuple/TupleInfo.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/TupleInfo.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/TupleInfo.java
new file mode 100644
index 0000000..735cc64
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/TupleInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.tuple;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public class TupleInfo {
+
+ private final Map<String, Integer> fieldMap;
+ private final Map<TblColRef, Integer> columnMap;
+
+ private final List<String> fields;
+ private final List<TblColRef> columns;
+ private final List<String> dataTypeNames;
+
+ public TupleInfo() {
+ fieldMap = new HashMap<String, Integer>();
+ columnMap = new HashMap<TblColRef, Integer>();
+ fields = new ArrayList<String>();
+ columns = new ArrayList<TblColRef>();
+ dataTypeNames = new ArrayList<String>();
+ }
+
+ public TblColRef getColumn(String fieldName) {
+ int idx = getFieldIndex(fieldName);
+ return columns.get(idx);
+ }
+
+ public int getColumnIndex(TblColRef col) {
+ return columnMap.get(col);
+ }
+
+ public String getDataTypeName(int index) {
+ return dataTypeNames.get(index);
+ }
+
+ public int getFieldIndex(String fieldName) {
+ return fieldMap.get(fieldName);
+ }
+
+ public boolean hasField(String fieldName) {
+ return fieldMap.containsKey(fieldName);
+ }
+
+ public String getFieldName(TblColRef col) {
+ int idx = columnMap.get(col);
+ return fields.get(idx);
+ }
+
+ public boolean hasColumn(TblColRef col) {
+ return columnMap.containsKey(col);
+ }
+
+ public void setField(String fieldName, TblColRef col, int index) {
+ fieldMap.put(fieldName, index);
+
+ if (col != null)
+ columnMap.put(col, index);
+
+ if (fields.size() > index)
+ fields.set(index, fieldName);
+ else
+ fields.add(index, fieldName);
+
+ if (columns.size() > index)
+ columns.set(index, col);
+ else
+ columns.add(index, col);
+
+ if (dataTypeNames.size() > index)
+ dataTypeNames.set(index, col.getType().getName());
+ else
+ dataTypeNames.add(index, col.getType().getName());
+ }
+
+ public List<String> getAllFields() {
+ return fields;
+ }
+
+ public List<TblColRef> getAllColumns() {
+ return columns;
+ }
+
+ public int size() {
+ return fields.size();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index a226e80..2d6f157 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -79,18 +79,33 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mrunit</groupId>
+ <artifactId>mrunit</artifactId>
+ <classifier>hadoop2</classifier>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
new file mode 100644
index 0000000..6b5cfa4
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+public class BatchCubingJobBuilder extends JobBuilderSupport {
+
+ private final IMRBatchCubingInputSide inputSide;
+ private final IMRBatchCubingOutputSide outputSide;
+
+ public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
+
+ // Phase 1: Create Flat Table
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ // Phase 2: Build Dictionary
+ result.addTask(createFactDistinctColumnsStep(jobId));
+ result.addTask(createBuildDictionaryStep(jobId));
+
+ // Phase 3: Build Cube
+ final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
+ final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+ final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+ // base cuboid step
+ result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
+ // n dim cuboid steps
+ for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnsCount - i;
+ result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
+ }
+ outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
+ // base cuboid job
+ MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg);
+
+ baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
+
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", ""); // marks flat table input
+ appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "level", "0");
+
+ baseCuboidStep.setMapReduceParams(cmd.toString());
+ baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+ baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+ return baseCuboidStep;
+ }
+
+ private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
+ // ND cuboid job
+ MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
+
+ ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+ appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
+
+ ndCuboidStep.setMapReduceParams(cmd.toString());
+ ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
+ return ndCuboidStep;
+ }
+
+ private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+ String[] paths = new String[groupRowkeyColumnsCount + 1];
+ for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnCount - i;
+ if (dimNum == totalRowkeyColumnCount) {
+ paths[i] = cuboidRootPath + "base_cuboid";
+ } else {
+ paths[i] = cuboidRootPath + dimNum + "d_cuboid";
+ }
+ }
+ return paths;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..abdabd8
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+ private final IMRBatchCubingInputSide inputSide;
+ private final IMRBatchCubingOutputSide2 outputSide;
+
+ public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final String jobId = result.getId();
+
+ // Phase 1: Create Flat Table
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ // Phase 2: Build Dictionary
+ result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+ result.addTask(createBuildDictionaryStep(jobId));
+ result.addTask(createSaveStatisticsStep(jobId));
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 3: Build Cube
+ result.addTask(createInMemCubingStep(jobId));
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+ SaveStatisticsStep result = new SaveStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setStatisticsPath(getStatisticsPath(jobId));
+ return result;
+ }
+
+ private MapReduceExecutable createInMemCubingStep(String jobId) {
+ // base cuboid job
+ MapReduceExecutable cubeStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg);
+
+ cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "jobflowid", jobId);
+
+ cubeStep.setMapReduceParams(cmd.toString());
+ cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
+ cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+ return cubeStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
new file mode 100644
index 0000000..3f6201c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder extends JobBuilderSupport {
+
+ private final IMRBatchMergeOutputSide outputSide;
+
+ public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
+ super(mergeSegment, submitter);
+ this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
+
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingSegmentIds = Lists.newArrayList();
+ final List<String> mergingCuboidPaths = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingSegmentIds.add(merging.getUuid());
+ mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
+ }
+
+ // Phase 1: Merge Dictionary
+ result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+
+ // Phase 2: Merge Cube Files
+ String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+ result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
+ outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
+
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+ outputSide.addStepPhase3_Cleanup(result);
+
+ return result;
+ }
+
+ private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
+ MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+ mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", outputPath);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+ mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
+ return mergeCuboidDataStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..c0fe759
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+import java.util.List;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+ private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
+
+ public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+ super(mergeSegment, submitter);
+ this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final String jobId = result.getId();
+
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingSegmentIds = Lists.newArrayList();
+ final List<String> mergingHTables = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingSegmentIds.add(merging.getUuid());
+ mergingHTables.add(merging.getStorageLocationIdentifier());
+ }
+
+ // Phase 1: Merge Dictionary
+ result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+ result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+ outputSide.addStepPhase1_MergeDictionary(result);
+
+ // Phase 2: Merge Cube
+ String formattedTables = StringUtil.join(mergingHTables, ",");
+ result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
+ outputSide.addStepPhase2_BuildCube(result);
+
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+ outputSide.addStepPhase3_Cleanup(result);
+
+ return result;
+ }
+
+ private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+ MergeStatisticsStep result = new MergeStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setMergedStatisticsPath(mergedStatisticsFolder);
+ return result;
+ }
+
+ private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+ MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+ mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, "jobflowid", jobId);
+
+ mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
+ mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+ return mergeCuboidDataStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
new file mode 100644
index 0000000..37a8841
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
@@ -0,0 +1,166 @@
+package org.apache.kylin.engine.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.kylin.common.util.Bytes;
+
+public class ByteArrayWritable implements WritableComparable<ByteArrayWritable> {
+
+ private byte[] data;
+ private int offset;
+ private int length;
+
+ public ByteArrayWritable() {
+ this(null, 0, 0);
+ }
+
+ public ByteArrayWritable(int capacity) {
+ this(new byte[capacity], 0, capacity);
+ }
+
+ public ByteArrayWritable(byte[] data) {
+ this(data, 0, data == null ? 0 : data.length);
+ }
+
+ public ByteArrayWritable(byte[] data, int offset, int length) {
+ this.data = data;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public byte[] array() {
+ return data;
+ }
+
+ public int offset() {
+ return offset;
+ }
+
+ public int length() {
+ return length;
+ }
+
+ public void set(byte[] array) {
+ set(array, 0, array.length);
+ }
+
+ public void set(byte[] array, int offset, int length) {
+ this.data = array;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public ByteBuffer asBuffer() {
+ if (data == null)
+ return null;
+ else if (offset == 0 && length == data.length)
+ return ByteBuffer.wrap(data);
+ else
+ return ByteBuffer.wrap(data, offset, length).slice();
+ }
+
+ @Override
+ public int hashCode() {
+ if (data == null)
+ return 0;
+ else
+ return Bytes.hashCode(data, offset, length);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.length);
+ out.write(this.data, this.offset, this.length);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.length = in.readInt();
+ this.data = new byte[this.length];
+ in.readFully(this.data, 0, this.length);
+ this.offset = 0;
+ }
+
+ // Below methods copied from BytesWritable
+ /**
+ * Define the sort order of the BytesWritable.
+ * @param that The other bytes writable
+ * @return Positive if left is bigger than right, 0 if they are equal, and
+ * negative if left is smaller than right.
+ */
+ public int compareTo(ByteArrayWritable that) {
+ return WritableComparator.compareBytes(this.data, this.offset, this.length, that.data, that.offset, that.length);
+ }
+
+ /**
+ * Compares the bytes in this object to the specified byte array
+ * @param that
+ * @return Positive if left is bigger than right, 0 if they are equal, and
+ * negative if left is smaller than right.
+ */
+ public int compareTo(final byte[] that) {
+ return WritableComparator.compareBytes(this.data, this.offset, this.length, that, 0, that.length);
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object right_obj) {
+ if (right_obj instanceof byte[]) {
+ return compareTo((byte[]) right_obj) == 0;
+ }
+ if (right_obj instanceof ByteArrayWritable) {
+ return compareTo((ByteArrayWritable) right_obj) == 0;
+ }
+ return false;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(3 * this.length);
+ final int endIdx = this.offset + this.length;
+ for (int idx = this.offset; idx < endIdx; idx++) {
+ sb.append(' ');
+ String num = Integer.toHexString(0xff & this.data[idx]);
+ // if it is only one digit, add a leading 0.
+ if (num.length() < 2) {
+ sb.append('0');
+ }
+ sb.append(num);
+ }
+ return sb.length() > 0 ? sb.substring(1) : "";
+ }
+
+ /** A Comparator optimized for ByteArrayWritable.
+ */
+ public static class Comparator extends WritableComparator {
+ private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
+
+ /** constructor */
+ public Comparator() {
+ super(ByteArrayWritable.class);
+ }
+
+ /**
+ * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int)
+ */
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return comparator.compare(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(ByteArrayWritable.class, new Comparator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
new file mode 100644
index 0000000..7251730
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
+
+/**
+ */
+public class CubingJob extends DefaultChainedExecutable {
+
+ // KEYS of Output.extraInfo map, info passed across job steps
+ public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
+ public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
+ public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
+ public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
+
+ private static final String CUBE_INSTANCE_NAME = "cubeName";
+ private static final String SEGMENT_ID = "segmentId";
+
+ public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
+ return initCubingJob(seg, "BUILD", submitter, config);
+ }
+
+ public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
+ return initCubingJob(seg, "MERGE", submitter, config);
+ }
+
+ private static CubingJob initCubingJob(CubeSegment seg, String jobType, String submitter, JobEngineConfig config) {
+ CubingJob result = new CubingJob();
+ SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+ format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " + format.format(new Date(System.currentTimeMillis())));
+ result.setSubmitter(submitter);
+ result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
+ return result;
+ }
+
+ public CubingJob() {
+ super();
+ }
+
+ void setCubeName(String name) {
+ setParam(CUBE_INSTANCE_NAME, name);
+ }
+
+ public String getCubeName() {
+ return getParam(CUBE_INSTANCE_NAME);
+ }
+
+ void setSegmentId(String segmentId) {
+ setParam(SEGMENT_ID, segmentId);
+ }
+
+ public String getSegmentId() {
+ return getParam(SEGMENT_ID);
+ }
+
+ @Override
+ protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
+ CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
+ final Output output = jobService.getOutput(getId());
+ String logMsg;
+ state = output.getState();
+ if (state != ExecutableState.ERROR &&
+ !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString().toLowerCase())) {
+ logger.info("state:" + state + " no need to notify users");
+ return null;
+ }
+ switch (state) {
+ case ERROR:
+ logMsg = output.getVerboseMsg();
+ break;
+ case DISCARDED:
+ logMsg = "job has been discarded";
+ break;
+ case SUCCEED:
+ logMsg = "job has succeeded";
+ break;
+ default:
+ return null;
+ }
+ if (logMsg == null) {
+ logMsg = "no error message";
+ }
+ String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
+ content = content.replaceAll("\\$\\{job_name\\}", getName());
+ content = content.replaceAll("\\$\\{result\\}", state.toString());
+ content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
+ content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
+ content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
+ content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins");
+ content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString());
+ content = content.replaceAll("\\$\\{submitter\\}", getSubmitter());
+ content = content.replaceAll("\\$\\{error_log\\}", logMsg);
+
+ try {
+ InetAddress inetAddress = InetAddress.getLocalHost();
+ content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
+ } catch (UnknownHostException e) {
+ logger.warn(e.getLocalizedMessage(), e);
+ }
+
+ String title = "["+ state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
+ return Pair.of(title, content);
+ }
+
+ @Override
+ protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
+ long time = 0L;
+ for (AbstractExecutable task: getTasks()) {
+ final ExecutableState status = task.getStatus();
+ if (status != ExecutableState.SUCCEED) {
+ break;
+ }
+ if (task instanceof MapReduceExecutable) {
+ time += ((MapReduceExecutable) task).getMapReduceWaitTime();
+ }
+ }
+ setMapReduceWaitTime(time);
+ super.onExecuteFinished(result, executableContext);
+ }
+
+ public long getMapReduceWaitTime() {
+ return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
+ }
+
+ public void setMapReduceWaitTime(long t) {
+ addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
+ }
+
+ public long findSourceRecordCount() {
+ return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
+ }
+
+ public long findSourceSizeBytes() {
+ return Long.parseLong(findExtraInfo(SOURCE_SIZE_BYTES, "0"));
+ }
+
+ public long findCubeSizeBytes() {
+ return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
+ }
+
+ private String findExtraInfo(String key, String dft) {
+ for (AbstractExecutable child : getTasks()) {
+ Output output = executableManager.getOutput(child.getId());
+ String value = output.getExtra().get(key);
+ if (value != null)
+ return value;
+ }
+ return dft;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
new file mode 100644
index 0000000..0c39398
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface.
+ */
+public interface IMRInput {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+
+ /** Return an InputFormat that reads from specified table. */
+ public IMRTableInputFormat getTableInputFormat(TableDesc table);
+
+ /**
+ * Utility that configures mapper to read from a table.
+ */
+ public interface IMRTableInputFormat {
+
+ /** Configure the InputFormat of given job. */
+ public void configureJob(Job job);
+
+ /** Parse a mapper input object into column values. */
+ public String[] parseMapperInput(Object mapperInput);
+ }
+
+ /**
+ * Participate the batch cubing flow as the input side. Responsible for creating
+ * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary (with FlatTableInputFormat)
+ * - Phase 3: Build Cube (with FlatTableInputFormat)
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchCubingInputSide {
+
+ /** Return an InputFormat that reads from the intermediate flat table */
+ public IMRTableInputFormat getFlatTableInputFormat();
+
+ /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does necessary clean up, like delete the intermediate flat table */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
new file mode 100644
index 0000000..bc6ee1f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage (Phase 3).
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchCubingOutputSide {
+
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
+ * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ public IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage (Phase 2).
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ public interface IMRBatchMergeOutputSide {
+
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
+ * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
new file mode 100644
index 0000000..974e2fc
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -0,0 +1,88 @@
+package org.apache.kylin.engine.mr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput2 {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side.
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube (with StorageOutputFormat)
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchCubingOutputSide2 {
+
+ public IMRStorageOutputFormat getStorageOutputFormat();
+
+ /** Add step that executes after build dictionary and before build cube. */
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+ /** Add step that executes after build cube. */
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
+
+ public interface IMRBatchMergeInputSide2 {
+ public IMRStorageInputFormat getStorageInputFormat();
+ }
+
+ @SuppressWarnings("rawtypes")
+ public interface IMRStorageInputFormat {
+
+ public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+
+ public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+
+ public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch merge flow as the output side.
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ public interface IMRBatchMergeOutputSide2 {
+
+ public IMRStorageOutputFormat getStorageOutputFormat();
+
+ /** Add step that executes after merge dictionary and before merge cube. */
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ /** Add step that executes after merge cube. */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public interface IMRStorageOutputFormat {
+ public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
+
+ public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
new file mode 100644
index 0000000..5e23531
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Hold reusable steps for builders.
+ */
+public class JobBuilderSupport {
+
+ final protected JobEngineConfig config;
+ final protected CubeSegment seg;
+ final protected String submitter;
+
+ public JobBuilderSupport(CubeSegment seg, String submitter) {
+ Preconditions.checkNotNull(seg, "segment cannot be null");
+ this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ this.seg = seg;
+ this.submitter = submitter;
+ }
+
+ public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
+ return createFactDistinctColumnsStep(jobId, false);
+ }
+
+ public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
+ return createFactDistinctColumnsStep(jobId, true);
+ }
+
+ private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+ result.setMapReduceJobClass(FactDistinctColumnsJob.class);
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+ appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+ appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
+
+ result.setMapReduceParams(cmd.toString());
+ return result;
+ }
+
+ public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
+ // base cuboid job
+ HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+ buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
+
+ buildDictionaryStep.setJobParams(cmd.toString());
+ buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
+ return buildDictionaryStep;
+ }
+
+ public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
+ final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
+ updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+ updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
+ updateCubeInfoStep.setSegmentId(seg.getUuid());
+ updateCubeInfoStep.setCubingJobId(jobId);
+ return updateCubeInfoStep;
+ }
+
+ public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
+ MergeDictionaryStep result = new MergeDictionaryStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ return result;
+ }
+
+ public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
+ UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
+ result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setCubingJobId(jobId);
+ return result;
+ }
+
+ // ============================================================================
+
+ public String getJobWorkingDir(String jobId) {
+ return getJobWorkingDir(config, jobId);
+ }
+
+ public String getCuboidRootPath(String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
+ }
+
+ public String getCuboidRootPath(CubeSegment seg) {
+ return getCuboidRootPath(seg.getLastBuildJobID());
+ }
+
+ public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
+ try {
+ String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
+ if (jobConf != null && jobConf.length() > 0) {
+ buf.append(" -conf ").append(jobConf);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getFactDistinctColumnsPath(String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
+ }
+
+
+ public String getStatisticsPath(String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
+ }
+
+ // ============================================================================
+ // static methods also shared by other job flow participant
+ // ----------------------------------------------------------------------------
+
+ public static String getJobWorkingDir(JobEngineConfig conf, String jobId) {
+ return conf.getHdfsWorkingDirectory() + "/" + "kylin-" + jobId;
+ }
+
+ public static StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
+ return buf.append(" -").append(paraName).append(" ").append(paraValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
new file mode 100644
index 0000000..61328c9
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine implements IBatchCubingEngine {
+
+ @Override
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return new BatchCubingJobBuilder(newSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return new BatchMergeJobBuilder(mergeSegment, submitter).build();
+ }
+
+ @Override
+ public Class<?> getSourceInterface() {
+ return IMRInput.class;
+ }
+
+ @Override
+ public Class<?> getStorageInterface() {
+ return IMROutput.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..57ec128
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+ @Override
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return new BatchCubingJobBuilder2(newSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+ }
+
+ @Override
+ public Class<?> getSourceInterface() {
+ return IMRInput.class;
+ }
+
+ @Override
+ public Class<?> getStorageInterface() {
+ return IMROutput2.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
new file mode 100644
index 0000000..dc0533e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -0,0 +1,55 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.storage.StorageFactory2;
+
+public class MRUtil {
+
+ public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ return TableSourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
+ }
+
+ public static IMRTableInputFormat getTableInputFormat(String tableName) {
+ return getTableInputFormat(getTableDesc(tableName));
+ }
+
+ public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
+ return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
+ }
+
+ private static TableDesc getTableDesc(String tableName) {
+ return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+ }
+
+ public static IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchMergeOutputSide(seg);
+ }
+
+ public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
+ }
+
+ public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
+ }
+
+}