You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/20 03:52:25 UTC
[5/5] incubator-kylin git commit: KYLIN-878 half way
KYLIN-878 half way
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9a82f39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9a82f39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9a82f39b
Branch: refs/heads/KYLIN-878
Commit: 9a82f39bdfc60347e259547be6460f958986a0e8
Parents: 4d3af3a
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 20 08:59:10 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 20 08:59:57 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/BytesSplitter.java | 9 +-
.../apache/kylin/engine/BuildEngineFactory.java | 2 +-
.../kylin/engine/mr/BatchCubingJobBuilder.java | 10 +-
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 100 +++++++++++++++++
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 102 ++++++++++++++++++
.../org/apache/kylin/engine/mr/IMROutput.java | 16 ++-
.../org/apache/kylin/engine/mr/IMROutput2.java | 53 +++++++++
.../kylin/engine/mr/JobBuilderSupport.java | 15 +--
.../kylin/engine/mr/MRBatchCubingEngine2.java | 47 ++++++++
.../java/org/apache/kylin/engine/mr/MRUtil.java | 10 ++
.../engine/mr2/BatchCubingJobBuilder2.java | 102 ------------------
.../kylin/engine/mr2/BatchMergeJobBuilder2.java | 107 -------------------
.../org/apache/kylin/engine/mr2/IMROutput2.java | 5 -
.../kylin/engine/mr2/MRBatchCubingEngine2.java | 48 ---------
.../apache/kylin/job/hadoop/cube/CuboidJob.java | 45 +++++---
.../job/hadoop/cube/HiveToBaseCuboidMapper.java | 28 ++++-
.../kylin/storage/hbase/HBaseMROutput2.java | 71 ++++++++++++
17 files changed, 467 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
index bd16246..7249dcf 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * @author xjiang
*/
public class BytesSplitter {
private static final Logger logger = LoggerFactory.getLogger(BytesSplitter.class);
@@ -79,6 +78,14 @@ public class BytesSplitter {
return bufferSize;
}
+
+ public void setBuffers(byte[][] buffers) {
+ for (int i = 0; i < buffers.length; i++) {
+ splitBuffers[i].value = buffers[i];
+ splitBuffers[i].length = buffers[i].length;
+ }
+ this.bufferSize = buffers.length;
+ }
public byte inferByteRowDelimiter(byte[] bytes, int byteLen, int expectedSplits) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index 721b85a..d24c99c 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -21,7 +21,7 @@ package org.apache.kylin.engine;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.MRBatchCubingEngine;
-import org.apache.kylin.engine.mr2.MRBatchCubingEngine2;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
public class BuildEngineFactory {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 0ff3bef..a39ac74 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,7 +19,6 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.job.common.MapReduceExecutable;
@@ -42,13 +41,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
- final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
+ result.addTask(createFactDistinctColumnsStep(jobId));
result.addTask(createBuildDictionaryStep(jobId));
// Phase 3: Build Cube
@@ -56,7 +54,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
// base cuboid step
- result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
+ result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
// n dim cuboid steps
for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
int dimNum = totalRowkeyColumnsCount - i;
@@ -72,7 +70,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
return result;
}
- private MapReduceExecutable createBaseCuboidStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String[] cuboidOutputTempPath, String jobId) {
+ private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
// base cuboid job
MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
@@ -83,7 +81,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+ appendExecCmdParameters(cmd, "input", ""); // marks flat table input
appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "level", "0");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..a83c596
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
+import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+ private final IMRBatchCubingInputSide inputSide;
+ private final IMRBatchCubingOutputSide2 outputSide;
+
+ public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final String jobId = result.getId();
+ final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+
+ // Phase 1: Create Flat Table
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ // Phase 2: Build Dictionary
+ result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+ result.addTask(createBuildDictionaryStep(jobId));
+ result.addTask(createSaveStatisticsStep(jobId));
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 3: Build Cube
+ result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+ SaveStatisticsStep result = new SaveStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setStatisticsPath(getStatisticsPath(jobId));
+ return result;
+ }
+
+ private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+ // base cuboid job
+ MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg);
+
+ baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+ appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+ appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "level", "0");
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+ baseCuboidStep.setMapReduceParams(cmd.toString());
+ baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
+ baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+ return baseCuboidStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..f97de13
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+ private final IMRBatchCubingOutputSide2 outputSide;
+
+ public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+ super(mergeSegment, submitter);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final String jobId = result.getId();
+
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingSegmentIds = Lists.newArrayList();
+ final List<String> mergingHTables = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingSegmentIds.add(merging.getUuid());
+ mergingHTables.add(merging.getStorageLocationIdentifier());
+ }
+
+ // Phase 1: Merge Dictionary
+ result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+ result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 2: Merge Cube
+ String formattedTables = StringUtil.join(mergingHTables, ",");
+ result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+ MergeStatisticsStep result = new MergeStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setMergedStatisticsPath(mergedStatisticsFolder);
+ return result;
+ }
+
+ private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+ MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+ mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", inputTableNames);
+ appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+ mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+ mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+ return mergeCuboidDataStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 8896a2e..bc6ee1f 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -37,7 +37,13 @@ public interface IMROutput {
*/
public interface IMRBatchCubingOutputSide {
- /** Add step that saves cuboid output from HDFS to storage. */
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
+ * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
/** Add step that does any necessary clean up. */
@@ -57,7 +63,13 @@ public interface IMROutput {
*/
public interface IMRBatchMergeOutputSide {
- /** Add step that saves cuboid output from HDFS to storage. */
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
+ * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
/** Add step that does any necessary clean up. */
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
new file mode 100644
index 0000000..9aecba9
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -0,0 +1,53 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput2 {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side.
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchCubingOutputSide2 {
+
+ /** Add step that executes after build dictionary and before build cube. */
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+ /** Add step that executes after build cube. */
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch merge flow as the output side.
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ public interface IMRBatchMergeOutputSide2 {
+
+ /** Add step that executes after merge dictionary and before merge cube. */
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ /** Add step that executes after merge cube. */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 4652269..42d30c8 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.job.common.HadoopShellExecutable;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -49,15 +48,15 @@ public class JobBuilderSupport {
this.submitter = submitter;
}
- public MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, false);
+ public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
+ return createFactDistinctColumnsStep(jobId, false);
}
- public MapReduceExecutable createFactDistinctColumnsStepWithStats(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, true);
+ public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
+ return createFactDistinctColumnsStep(jobId, true);
}
- private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId, boolean withStats) {
+ private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
MapReduceExecutable result = new MapReduceExecutable();
result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
result.setMapReduceJobClass(FactDistinctColumnsJob.class);
@@ -142,10 +141,6 @@ public class JobBuilderSupport {
}
}
- public String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
- return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
- }
-
public String getFactDistinctColumnsPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..57ec128
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+ @Override
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return new BatchCubingJobBuilder2(newSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+ }
+
+ @Override
+ public Class<?> getSourceInterface() {
+ return IMRInput.class;
+ }
+
+ @Override
+ public Class<?> getStorageInterface() {
+ return IMROutput2.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 099a614..4c44af7 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -6,6 +6,8 @@ import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.TableSourceFactory;
@@ -42,4 +44,12 @@ public class MRUtil {
return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput.class).getBatchMergeOutputSide(seg);
}
+ public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput2.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput2.class).getBatchMergeOutputSide(seg);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
deleted file mode 100644
index e83db30..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-
-public class BatchCubingJobBuilder2 extends JobBuilderSupport {
-
- private final IMRBatchCubingInputSide inputSide;
-
- public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
- super(newSegment, submitter);
- this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- }
-
- public CubingJob build() {
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
- final String jobId = result.getId();
- final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
-
- // Phase 1: Create Flat Table
- inputSide.addStepPhase1_CreateFlatTable(result);
-
- // Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStepWithStats(flatHiveTableDesc, jobId));
- result.addTask(createBuildDictionaryStep(jobId));
-
- // Phase 3: Build Cube
- result.addTask(createSaveStatisticsStep(jobId)); //<<<<<
-
- // create htable step
- result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
- result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
- // bulk load step
- result.addTask(createBulkLoadStep(jobId)); //<<<<<
-
- // Phase 4: Update Metadata & Cleanup
- result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
- inputSide.addStepPhase4_Cleanup(result);
-
- return result;
- }
-
- private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
- SaveStatisticsStep result = new SaveStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setStatisticsPath(getStatisticsPath(jobId));
- return result;
- }
-
- private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- // base cuboid job
- MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
-
- baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "level", "0");
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
- baseCuboidStep.setMapReduceParams(cmd.toString());
- baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
- return baseCuboidStep;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
deleted file mode 100644
index 25ff082..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class BatchMergeJobBuilder2 extends JobBuilderSupport {
-
- public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
- super(mergeSegment, submitter);
- }
-
- public CubingJob build() {
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
- final String jobId = result.getId();
-
- final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
- final List<String> mergingSegmentIds = Lists.newArrayList();
- final List<String> mergingCuboidPaths = Lists.newArrayList();
- final List<String> mergingHTables = Lists.newArrayList();
- for (CubeSegment merging : mergingSegments) {
- mergingSegmentIds.add(merging.getUuid());
- mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
- mergingHTables.add(merging.getStorageLocationIdentifier());
- }
-
- result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-
- String mergedStatisticsFolder = getStatisticsPath(jobId);
- result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
- // create htable step
- result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
-
- String formattedTables = StringUtil.join(mergingHTables, ",");
- result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
-
- // bulk load step
- result.addTask(createBulkLoadStep(jobId)); //<<<<<
-
- // update cube info
- result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
-
- result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
-
- return result;
- }
-
- private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
- MergeStatisticsStep result = new MergeStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setMergedStatisticsPath(mergedStatisticsFolder);
- return result;
- }
-
- private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
- MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
- mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", inputTableNames);
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
- mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
- mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
- return mergeCuboidDataStep;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
deleted file mode 100644
index aeddb9b..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.kylin.engine.mr2;
-
-public interface IMROutput2 {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
deleted file mode 100644
index 8ec6f69..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class MRBatchCubingEngine2 implements IBatchCubingEngine {
-
- @Override
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return new BatchCubingJobBuilder2(newSegment, submitter).build();
- }
-
- @Override
- public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
- return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
- }
-
- @Override
- public Class<?> getSourceInterface() {
- return IMRInput.class;
- }
-
- @Override
- public Class<?> getStorageInterface() {
- return IMROutput2.class;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
index 87fc188..46bb9fc 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job.hadoop.cube;
import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -32,11 +33,15 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidCLI;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +60,9 @@ public class CuboidJob extends AbstractHadoopJob {
@Override
public int run(String[] args) throws Exception {
+ if (this.mapperClass == null)
+ throw new Exception("Mapper class is not set!");
+
Options options = new Options();
try {
@@ -67,7 +75,6 @@ public class CuboidJob extends AbstractHadoopJob {
options.addOption(OPTION_INPUT_FORMAT);
parseOptions(options, args);
- Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
@@ -79,26 +86,11 @@ public class CuboidJob extends AbstractHadoopJob {
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
logger.info("Starting: " + job.getJobName());
- FileInputFormat.setInputPaths(job, input);
setJobClasspath(job);
// Mapper
- if (this.mapperClass == null) {
- throw new Exception("Mapper class is not set!");
- }
-
- boolean isInputTextFormat = false;
- if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
- isInputTextFormat = true;
- }
-
- if (isInputTextFormat) {
- job.setInputFormatClass(TextInputFormat.class);
-
- } else {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- }
+ configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
job.setMapperClass(this.mapperClass);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
@@ -130,6 +122,25 @@ public class CuboidJob extends AbstractHadoopJob {
}
}
+ private void configureMapperInputFormat(CubeSegment cubeSeg) throws IOException {
+ String input = getOptionValue(OPTION_INPUT_PATH);
+
+ if (StringUtils.isBlank(input)) {
+ // base cuboid case
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
+ }
+ else {
+ // n-dimension cuboid case
+ FileInputFormat.setInputPaths(job, new Path(input));
+ if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
+ job.setInputFormatClass(TextInputFormat.class);
+ } else {
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ }
+ }
+ }
+
protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
Configuration jobConf = job.getConfiguration();
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
index 599dde8..9fa1159 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
@@ -19,25 +19,37 @@
package org.apache.kylin.job.hadoop.cube;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
-import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
/**
* @author George Song (ysong1)
*/
-public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Text> {
+public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Object> {
+
+ private IMRTableInputFormat flatTableInputFormat;
@Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+ protected void setup(Context context) throws IOException {
+ super.setup(context);
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+ }
+
+ @Override
+ public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
counter++;
if (counter % BatchConstants.COUNTER_MAX == 0) {
logger.info("Handled " + counter + " records!");
}
+
try {
//put a record into the shared bytesSplitter
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
+ String[] row = flatTableInputFormat.parseMapperInput(value);
+ bytesSplitter.setBuffers(convertUTF8Bytes(row));
//take care of the data in bytesSplitter
outputKV(context);
@@ -46,4 +58,12 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, T
}
}
+ private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
+ byte[][] result = new byte[row.length][];
+ for (int i = 0; i < row.length; i++) {
+ result[i] = row[i].getBytes("UTF-8");
+ }
+ return result;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
new file mode 100644
index 0000000..63e5902
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class HBaseMROutput2 implements IMROutput2 {
+
+ @Override
+ public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+ return new IMRBatchCubingOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+ result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ result.addTask(createBulkLoadStep(jobId)); //<<<<<
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ // nothing to do
+ }
+ };
+ }
+
+ @Override
+ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
+ return new IMRBatchMergeOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+ }
+
+ @Override
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
+ result.addTask(createBulkLoadStep(jobId)); //<<<<<
+ }
+
+ @Override
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+ result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
+ }
+ };
+ }
+}