You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:33:30 UTC
[07/19] incubator-kylin git commit: KYLIN-1112 make code compile
KYLIN-1112 make code compile
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/aa4944d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/aa4944d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/aa4944d4
Branch: refs/heads/KYLIN-1112-2
Commit: aa4944d41daafcf63bbfaf1c757f466b2a3a204d
Parents: d921f3c
Author: shaofengshi <sh...@apache.org>
Authored: Fri Oct 30 22:01:27 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:24:17 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/BuildIIWithEngineTest.java | 7 +-
.../apache/kylin/job/BuildIIWithStreamTest.java | 2 +-
.../kylin/common/util/ImplementationSwitch.java | 4 +-
.../java/org/apache/kylin/cube/CubeSegment.java | 18 +-
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../kylin/metadata/model/IEngineAware.java | 1 +
.../realization/IRealizationSegment.java | 20 +
.../kylin/engine/mr/BatchCubingJobBuilder.java | 23 +-
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 12 +-
.../kylin/engine/mr/BatchMergeJobBuilder.java | 12 +-
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 15 +-
.../org/apache/kylin/engine/mr/IMRInput.java | 4 +-
.../org/apache/kylin/engine/mr/IMROutput.java | 27 ++
.../kylin/engine/mr/JobBuilderSupport.java | 46 +-
.../java/org/apache/kylin/engine/mr/MRUtil.java | 11 +-
.../mr/invertedindex/BatchIIJobBuilder.java | 132 ++++++
.../engine/mr/invertedindex/IIBulkLoadJob.java | 74 ----
.../mr/invertedindex/IICreateHFileJob.java | 81 ----
.../mr/invertedindex/IICreateHFileMapper.java | 55 ---
.../mr/invertedindex/IICreateHTableJob.java | 148 -------
.../invertedindex/IIDeployCoprocessorCLI.java | 157 -------
.../mr/invertedindex/IIDistinctColumnsJob.java | 22 +-
.../invertedindex/IIDistinctColumnsMapper.java | 41 +-
.../kylin/engine/mr/invertedindex/IIJob.java | 23 +
.../engine/mr/invertedindex/IIJobBuilder.java | 443 +++++++++----------
.../mr/invertedindex/InvertedIndexJob.java | 17 +-
.../mr/invertedindex/InvertedIndexMapper.java | 22 +-
.../UpdateInvertedIndexInfoAfterBuildStep.java | 93 ++++
.../engine/spark/SparkCubingJobBuilder.java | 7 +-
invertedindex/pom.xml | 4 +
.../apache/kylin/invertedindex/IIInstance.java | 18 +-
.../apache/kylin/invertedindex/IISegment.java | 34 +-
.../kylin/invertedindex/model/IIDesc.java | 32 +-
server/pom.xml | 5 +
.../apache/kylin/source/hive/HiveMRInput.java | 13 +-
.../kylin/storage/hbase/ii/IIBulkLoadJob.java | 64 +++
.../storage/hbase/ii/IICreateHFileJob.java | 81 ++++
.../storage/hbase/ii/IICreateHFileMapper.java | 55 +++
.../storage/hbase/ii/IICreateHTableJob.java | 149 +++++++
.../storage/hbase/steps/HBaseMROutput.java | 18 +
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 94 +++-
.../hbase/util/IIDeployCoprocessorCLI.java | 157 +++++++
42 files changed, 1362 insertions(+), 880 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index 47415a8..3c49c74 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.engine.mr.invertedindex.BatchIIJobBuilder;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -45,7 +46,6 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.engine.mr.invertedindex.IIJob;
-import org.apache.kylin.engine.mr.invertedindex.IIJobBuilder;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
@@ -225,8 +225,9 @@ public class BuildIIWithEngineTest {
IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
iiInstance.getSegments().add(segment);
iiManager.updateII(iiInstance);
- IIJobBuilder iiJobBuilder = new IIJobBuilder(jobEngineConfig);
- IIJob job = iiJobBuilder.buildJob(segment, "TEST");
+
+ BatchIIJobBuilder batchIIJobBuilder = new BatchIIJobBuilder(segment, "SYSTEM");
+ IIJob job = batchIIJobBuilder.build();
jobService.addJob(job);
waitForJob(job.getId());
return job.getId();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 4ccc6b4..d97b644 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -65,7 +65,7 @@ import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.engine.mr.invertedindex.IICreateHTableJob;
+import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.source.hive.HiveTableReader;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
index f6924a0..4a47b83 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
@@ -44,7 +44,7 @@ public class ImplementationSwitch<I> {
maxId = Math.max(maxId, id);
}
if (maxId > 100)
- throw new IllegalArgumentException("you have more than 100 implentations?");
+ throw new IllegalArgumentException("you have more than 100 implementations?");
Object[] result = new Object[maxId + 1];
@@ -65,7 +65,7 @@ public class ImplementationSwitch<I> {
I result = (I) instances[id];
if (result == null)
- throw new IllegalArgumentException("Implementations missing, ID " + id + ", interafce " + interfaceClz.getName());
+ throw new IllegalArgumentException("Implementations missing, ID " + id + ", interface " + interfaceClz.getName());
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 1a44fcf..7d17d30 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -27,9 +27,10 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
-import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -39,9 +40,11 @@ import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
+public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IRealizationSegment {
@JsonBackReference
private CubeInstance cubeInstance;
@@ -115,6 +118,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
this.uuid = id;
}
+ @Override
public String getName() {
return name;
}
@@ -211,6 +215,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
this.cubeInstance = cubeInstance;
}
+ @Override
public String getStorageLocationIdentifier() {
return storageLocationIdentifier;
@@ -410,4 +415,13 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
return ret;
}
+ @Override
+ public IRealization getRealization() {
+ return cubeInstance;
+ }
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
+ return new CubeJoinedFlatTableDesc(this.getCubeDesc(), this);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 30cefaf..ba50880 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -59,6 +59,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
+ public static final String STEP_NAME_UPDATE_II_INFO = "Update Inverted Index Info";
public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
public static final String PROP_JOB_FLOW = "jobFlow";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
index 60bd825..882b2e3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
@@ -22,6 +22,7 @@ public interface IEngineAware {
public static final int ID_MR_V1 = 0;
public static final int ID_MR_V2 = 2;
+ public static final int ID_MR_II = 3;
public static final int ID_SPARK = 5;
int getEngineType();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
new file mode 100644
index 0000000..afab86b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
@@ -0,0 +1,20 @@
+package org.apache.kylin.metadata.realization;
+
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+
+/**
+ * Created by shaoshi on 10/30/15.
+ */
+public interface IRealizationSegment extends IBuildable {
+
+ public String getUuid();
+
+ public String getName();
+
+ public String getStorageLocationIdentifier();
+
+ public IRealization getRealization();
+
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index d00f592..dcb887d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -38,13 +39,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
super(newSegment, submitter);
this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide((CubeSegment)seg);
}
public CubingJob build() {
logger.info("MR_V1 new job to BUILD segment " + seg);
-
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
@@ -56,8 +56,9 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
result.addTask(createBuildDictionaryStep(jobId));
// Phase 3: Build Cube
- final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
- final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+ RowKeyDesc rowKeyDesc = ((CubeSegment)seg).getCubeDesc().getRowkey();
+ final int groupRowkeyColumnsCount = rowKeyDesc.getNCuboidBuildLevels();
+ final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
// base cuboid step
result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
@@ -81,15 +82,15 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
+ appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
appendExecCmdParameters(cmd, "level", "0");
baseCuboidStep.setMapReduceParams(cmd.toString());
@@ -105,12 +106,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
ndCuboidStep.setMapReduceParams(cmd.toString());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index fe9f1d6..f8fbc33 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -37,13 +37,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
super(newSegment, submitter);
this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment)seg);
}
public CubingJob build() {
logger.info("MR_V2 new job to BUILD segment " + seg);
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
final String jobId = result.getId();
// Phase 1: Create Flat Table
@@ -70,7 +70,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
SaveStatisticsStep result = new SaveStatisticsStep();
result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
+ result.setCubeName(seg.getRealization().getName());
result.setSegmentId(seg.getUuid());
result.setStatisticsPath(getStatisticsPath(jobId));
return result;
@@ -81,13 +81,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
MapReduceExecutable cubeStep = new MapReduceExecutable();
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
+ appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName());
appendExecCmdParameters(cmd, "jobflowid", jobId);
cubeStep.setMapReduceParams(cmd.toString());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index bc377ed..4b93b5d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -39,17 +39,17 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
+ this.outputSide = MRUtil.getBatchMergeOutputSide((CubeSegment)seg);
}
public CubingJob build() {
logger.info("MR_V1 new job to MERGE segment " + seg);
-
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final CubeSegment cubeSegment = (CubeSegment)seg;
+ final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
- final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
final List<String> mergingSegmentIds = Lists.newArrayList();
final List<String> mergingCuboidPaths = Lists.newArrayList();
@@ -63,7 +63,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
// Phase 2: Merge Cube Files
String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
- result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
+ result.addTask(createMergeCuboidDataStep(cubeSegment, formattedPath, cuboidRootPath));
outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
// Phase 3: Update Metadata & Cleanup
@@ -78,7 +78,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
+ appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
appendExecCmdParameters(cmd, "input", inputPath);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 443cf95..48a717f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -39,16 +39,17 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+ this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment)seg);
}
public CubingJob build() {
logger.info("MR_V2 new job to MERGE segment " + seg);
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final CubeSegment cubeSegment = (CubeSegment)seg;
+ final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
final String jobId = result.getId();
- final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
final List<String> mergingSegmentIds = Lists.newArrayList();
final List<String> mergingHTables = Lists.newArrayList();
@@ -59,7 +60,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
// Phase 1: Merge Dictionary
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
- result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+ result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
// Phase 2: Merge Cube
@@ -89,10 +90,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getRealization().getName() + "_Step");
appendExecCmdParameters(cmd, "jobflowid", jobId);
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 336a66f..a61e0dd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -19,9 +19,9 @@
package org.apache.kylin.engine.mr;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
/**
* Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface.
@@ -29,7 +29,7 @@ import org.apache.kylin.metadata.model.TableDesc;
public interface IMRInput {
/** Return a helper to participate in batch cubing job flow. */
- public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+ public IMRBatchCubingInputSide getBatchCubingInputSide(IRealizationSegment seg);
/** Return an InputFormat that reads from specified table. */
public IMRTableInputFormat getTableInputFormat(TableDesc table);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 577a836..e989042 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
public interface IMROutput {
@@ -26,6 +27,10 @@ public interface IMROutput {
/** Return a helper to participate in batch cubing job flow. */
public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchInvertedIndexingOutputSide getBatchInvertedIndexingOutputSide(IISegment seg);
+
/**
* Participate the batch cubing flow as the output side. Responsible for saving
* the cuboid output to storage (Phase 3).
@@ -75,4 +80,26 @@ public interface IMROutput {
/** Add step that does any necessary clean up. */
public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
}
+
+
+ /**
+ * Participate the batch inverted indexing flow as the output side. Responsible for saving
+ * the output to storage (Phase 3).
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build II
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchInvertedIndexingOutputSide {
+
+ /**
+ * Add step that saves II output from HDFS to storage.
+ *
+ */
+ public void addStepPhase3_BuildII(DefaultChainedExecutable jobFlow, String rootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 5fba37c..8c770f9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -25,15 +25,19 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.invertedindex.UpdateInvertedIndexInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
/**
* Hold reusable steps for builders.
@@ -41,10 +45,10 @@ import com.google.common.base.Preconditions;
public class JobBuilderSupport {
final protected JobEngineConfig config;
- final protected CubeSegment seg;
+ final protected IRealizationSegment seg;
final protected String submitter;
- public JobBuilderSupport(CubeSegment seg, String submitter) {
+ public JobBuilderSupport(IRealizationSegment seg, String submitter) {
Preconditions.checkNotNull(seg, "segment cannot be null");
this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
this.seg = seg;
@@ -64,14 +68,14 @@ public class JobBuilderSupport {
result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
result.setMapReduceJobClass(FactDistinctColumnsJob.class);
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
appendExecCmdParameters(cmd, "segmentname", seg.getName());
appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
result.setMapReduceParams(cmd.toString());
return result;
@@ -82,7 +86,7 @@ public class JobBuilderSupport {
HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
@@ -94,7 +98,7 @@ public class JobBuilderSupport {
public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
+ updateCubeInfoStep.setCubeName(seg.getRealization().getName());
updateCubeInfoStep.setSegmentId(seg.getUuid());
updateCubeInfoStep.setCubingJobId(jobId);
return updateCubeInfoStep;
@@ -103,7 +107,7 @@ public class JobBuilderSupport {
public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
MergeDictionaryStep result = new MergeDictionaryStep();
result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
- result.setCubeName(seg.getCubeInstance().getName());
+ result.setCubeName(seg.getRealization().getName());
result.setSegmentId(seg.getUuid());
result.setMergingSegmentIds(mergingSegmentIds);
return result;
@@ -112,30 +116,44 @@ public class JobBuilderSupport {
public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- result.setCubeName(seg.getCubeInstance().getName());
+ result.setCubeName(seg.getRealization().getName());
result.setSegmentId(seg.getUuid());
result.setMergingSegmentIds(mergingSegmentIds);
result.setCubingJobId(jobId);
return result;
}
+
+
+ public UpdateInvertedIndexInfoAfterBuildStep createUpdateInvertedIndexInfoAfterBuildStep(String jobId) {
+ final UpdateInvertedIndexInfoAfterBuildStep updateIIInfoStep = new UpdateInvertedIndexInfoAfterBuildStep();
+ updateIIInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_II_INFO);
+ updateIIInfoStep.setInvertedIndexName(seg.getRealization().getName());
+ updateIIInfoStep.setSegmentId(seg.getUuid());
+ updateIIInfoStep.setJobId(jobId);
+ return updateIIInfoStep;
+ }
+
// ============================================================================
public String getJobWorkingDir(String jobId) {
return getJobWorkingDir(config, jobId);
}
+ public String getRealizationRootPath(String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getRealization().getName();
+ }
public String getCuboidRootPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
+ return getRealizationRootPath(jobId) + "/cuboid/";
}
public String getCuboidRootPath(CubeSegment seg) {
return getCuboidRootPath(seg.getLastBuildJobID());
}
- public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
+ public void appendMapReduceParameters(StringBuilder buf, DataModelDesc modelDesc) {
try {
- String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
+ String jobConf = config.getHadoopJobConfFilePath(modelDesc.getCapacity());
if (jobConf != null && jobConf.length() > 0) {
buf.append(" -conf ").append(jobConf);
}
@@ -145,11 +163,11 @@ public class JobBuilderSupport {
}
public String getFactDistinctColumnsPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
+ return getRealizationRootPath(jobId) + "/fact_distinct_columns";
}
public String getStatisticsPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
+ return getRealizationRootPath(jobId) + "/statistics";
}
// ============================================================================
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 9a1c1f5..55fa9e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -9,17 +9,20 @@ import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
import org.apache.kylin.source.SourceFactory;
import org.apache.kylin.storage.StorageFactory;
public class MRUtil {
- public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ public static IMRBatchCubingInputSide getBatchCubingInputSide(IRealizationSegment seg) {
return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
}
-
+
public static IMRTableInputFormat getTableInputFormat(String tableName) {
return getTableInputFormat(getTableDesc(tableName));
}
@@ -52,4 +55,8 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
+ public static IMROutput.IMRBatchInvertedIndexingOutputSide getBatchInvertedIndexingOutputSide(IISegment seg) {
+ return StorageFactory.createEngineAdapter(seg, IMROutput.class).getBatchInvertedIndexingOutputSide(seg);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
new file mode 100644
index 0000000..97e27d0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class BatchIIJobBuilder extends JobBuilderSupport {
+
+ private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class);
+
+ private final IMRBatchCubingInputSide inputSide;
+ private final IMROutput.IMRBatchInvertedIndexingOutputSide outputSide;
+
+ public BatchIIJobBuilder(IISegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ this.inputSide = MRUtil.getBatchCubingInputSide(newSegment);
+ this.outputSide = MRUtil.getBatchInvertedIndexingOutputSide(newSegment);
+ }
+
+ public IIJob build() {
+ logger.info("MR new job to BUILD segment " + seg);
+
+ final IIJob result = IIJob.createBuildJob((IISegment)seg, submitter, config);
+ final String jobId = result.getId();
+
+ final String iiRootPath = getRealizationRootPath(jobId) + "/";
+ // Phase 1: Create Flat Table
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ final String intermediateTableIdentity = seg.getJoinedFlatTableDesc().getTableName();
+ // Phase 2: Build Dictionary
+ result.addTask(createIIFactDistinctColumnsStep(seg, intermediateTableIdentity, getFactDistinctColumnsPath(jobId)));
+ result.addTask(createIIBuildDictionaryStep(seg, getFactDistinctColumnsPath(jobId)));
+
+ // Phase 3: Build Cube
+ result.addTask(createInvertedIndexStep((IISegment)seg, intermediateTableIdentity, iiRootPath));
+ outputSide.addStepPhase3_BuildII(result, iiRootPath);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateInvertedIndexInfoAfterBuildStep(jobId));
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private MapReduceExecutable createIIFactDistinctColumnsStep(IRealizationSegment seg, String factTableName, String output) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+ result.setMapReduceJobClass(IIDistinctColumnsJob.class);
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+ appendExecCmdParameters(cmd, "tablename", factTableName);
+ appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "output", output);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
+
+ result.setMapReduceParams(cmd.toString());
+ return result;
+ }
+
+ private HadoopShellExecutable createIIBuildDictionaryStep(IRealizationSegment seg, String factDistinctColumnsPath) {
+ // base cuboid job
+ HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+ buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
+
+ buildDictionaryStep.setJobParams(cmd.toString());
+ buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
+ return buildDictionaryStep;
+ }
+
+ private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
+ // base cuboid job
+ MapReduceExecutable buildIIStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+
+ buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
+
+ appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
+ appendExecCmdParameters(cmd, "output", iiOutputTempPath);
+ appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
+
+ buildIIStep.setMapReduceParams(cmd.toString());
+ buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
+ return buildIIStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIBulkLoadJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIBulkLoadJob.java
deleted file mode 100644
index a0a5ca6..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIBulkLoadJob.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- */
-public class IIBulkLoadJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- options.addOption(OPTION_II_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- String input = getOptionValue(OPTION_INPUT_PATH);
- String iiname = getOptionValue(OPTION_II_NAME);
-
- FileSystem fs = FileSystem.get(getConf());
- FsPermission permission = new FsPermission((short) 0777);
- fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission);
-
- int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });
-
- IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
- IIInstance ii = mgr.getII(iiname);
- IISegment seg = ii.getFirstSegment();
- seg.setStorageLocationIdentifier(tableName);
- seg.setStatus(SegmentStatusEnum.READY);
- mgr.updateII(ii);
-
- return hbaseExitCode;
-
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileJob.java
deleted file mode 100644
index 4ab3051..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileJob.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- *
- */
-public class IICreateHFileJob extends AbstractHadoopJob {
-
- protected static final Logger logger = LoggerFactory.getLogger(IICreateHFileJob.class);
-
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-
- setJobClasspath(job);
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
- FileOutputFormat.setOutputPath(job, output);
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(IICreateHFileMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
- HFileOutputFormat.configureIncrementalLoad(job, htable);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileMapper.java
deleted file mode 100644
index fdcc138..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileMapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.invertedindex.model.IIDesc;
-
-/**
- * @author yangli9
- */
-public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
-
- long timestamp;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- timestamp = System.currentTimeMillis();
- }
-
- @Override
- protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
-
- KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
- IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
- IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
- timestamp, Type.Put, //
- value.get(), value.getOffset(), value.getLength());
-
- context.write(key, kv);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHTableJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHTableJob.java
deleted file mode 100644
index 3ccd701..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHTableJob.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-
-/**
- * @author George Song (ysong1)
- */
-public class IICreateHTableJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- String iiName = getOptionValue(OPTION_II_NAME);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- IIManager iiManager = IIManager.getInstance(config);
- IIInstance ii = iiManager.getII(iiName);
- int sharding = ii.getDescriptor().getSharding();
-
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
- HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
- cf.setMaxVersions(1);
-
- String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase();
-
- switch (hbaseDefaultCC) {
- case "snappy": {
- logger.info("hbase will use snappy to compress data");
- cf.setCompressionType(Compression.Algorithm.SNAPPY);
- break;
- }
- case "lzo": {
- logger.info("hbase will use lzo to compress data");
- cf.setCompressionType(Compression.Algorithm.LZO);
- break;
- }
- case "gz":
- case "gzip": {
- logger.info("hbase will use gzip to compress data");
- cf.setCompressionType(Compression.Algorithm.GZ);
- break;
- }
- case "lz4": {
- logger.info("hbase will use lz4 to compress data");
- cf.setCompressionType(Compression.Algorithm.LZ4);
- break;
- }
- default: {
- logger.info("hbase will not user any compression codec to compress data");
- }
- }
-
- cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- tableDesc.addFamily(cf);
- tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
- tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
-
- Configuration conf = HBaseConfiguration.create(getConf());
- if (User.isHBaseSecurityEnabled(conf)) {
- // add coprocessor for bulk load
- tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
-
- // drop the table first
- HBaseAdmin admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- }
-
- // create table
- byte[][] splitKeys = getSplits(sharding);
- if (splitKeys.length == 0)
- splitKeys = null;
- admin.createTable(tableDesc, splitKeys);
- if (splitKeys != null) {
- for (int i = 0; i < splitKeys.length; i++) {
- System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
- }
- }
- System.out.println("create hbase table " + tableName + " done.");
- admin.close();
-
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- //one region for one shard
- private byte[][] getSplits(int shard) {
- byte[][] result = new byte[shard - 1][];
- for (int i = 1; i < shard; ++i) {
- byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
- BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
- result[i - 1] = split;
- }
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDeployCoprocessorCLI.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDeployCoprocessorCLI.java
deleted file mode 100644
index da4b95b..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDeployCoprocessorCLI.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * THIS IS A TAILORED DUPLICATE OF org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI TO AVOID CYCLIC
- * DEPENDENCY. INVERTED-INDEX CODE NOW SPLITTED ACROSS kylin-invertedindex AND kylin-storage-hbase.
- * DEFENITELY NEED FURTHER REFACTOR.
- */
-public class IIDeployCoprocessorCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(IIDeployCoprocessorCLI.class);
-
- public static final String CubeObserverClass = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
- public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
- public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
-
- public static void deployCoprocessor(HTableDescriptor tableDesc) {
- try {
- initHTableCoprocessor(tableDesc);
- logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
-
- } catch (Exception ex) {
- logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
- logger.error("Will try creating the table without coprocessor.");
- }
- }
-
- private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
- Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
- addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- }
-
- private static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Add coprocessor on " + desc.getNameAsString());
- desc.addCoprocessor(IIEndpointClass, hdfsCoprocessorJar, 1000, null);
- desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
- desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
- }
-
- private static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
- Path uploadPath = null;
- File localCoprocessorFile = new File(localCoprocessorJar);
-
- // check existing jars
- if (oldJarPaths == null) {
- oldJarPaths = new HashSet<String>();
- }
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (isSame(localCoprocessorFile, fileStatus)) {
- uploadPath = fileStatus.getPath();
- break;
- }
- String filename = fileStatus.getPath().toString();
- if (filename.endsWith(".jar")) {
- oldJarPaths.add(filename);
- }
- }
-
- // upload if not existing
- if (uploadPath == null) {
- // figure out a unique new jar file name
- Set<String> oldJarNames = new HashSet<String>();
- for (String path : oldJarPaths) {
- oldJarNames.add(new Path(path).getName());
- }
- String baseName = getBaseFileName(localCoprocessorJar);
- String newName = null;
- int i = 0;
- while (newName == null) {
- newName = baseName + "-" + (i++) + ".jar";
- if (oldJarNames.contains(newName))
- newName = null;
- }
-
- // upload
- uploadPath = new Path(coprocessorDir, newName);
- FileInputStream in = null;
- FSDataOutputStream out = null;
- try {
- in = new FileInputStream(localCoprocessorFile);
- out = fileSystem.create(uploadPath);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
-
- fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
-
- }
-
- uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
- return uploadPath;
- }
-
- private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
- return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
- }
-
- private static String getBaseFileName(String localCoprocessorJar) {
- File localJar = new File(localCoprocessorJar);
- String baseName = localJar.getName();
- if (baseName.endsWith(".jar"))
- baseName = baseName.substring(0, baseName.length() - ".jar".length());
- return baseName;
- }
-
- private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
- String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
- Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
- fileSystem.mkdirs(coprocessorDir);
- return coprocessorDir;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
index c9f5375..b691dc1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
@@ -29,15 +29,18 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +77,8 @@ public class IIDistinctColumnsJob extends AbstractHadoopJob {
setJobClasspath(job);
- setupMapper();
+
+ setupMapper(ii.getFirstSegment());
setupReducer(output);
return waitForCompletion(job);
@@ -97,16 +101,10 @@ public class IIDistinctColumnsJob extends AbstractHadoopJob {
return buf.toString();
}
- private void setupMapper() throws IOException {
-
- String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME);
- String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName);
-
- logger.info("setting hcat input format, db name {} , table name {}", dbTableNames[0], dbTableNames[1]);
-
- HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
- job.setInputFormatClass(HCatInputFormat.class);
+ private void setupMapper(IISegment segment) throws IOException {
+
+ IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
job.setMapperClass(IIDistinctColumnsMapper.class);
job.setCombinerClass(IIDistinctColumnsCombiner.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
index 0f0c731..c431ecd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
@@ -20,43 +20,52 @@ package org.apache.kylin.engine.mr.invertedindex;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
/**
* @author yangli9
*/
-public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ShortWritable, Text> {
+public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, Object, ShortWritable, Text> {
private ShortWritable outputKey = new ShortWritable();
private Text outputValue = new Text();
- private HCatSchema schema = null;
- private int columnSize = 0;
+ protected IMRInput.IMRTableInputFormat flatTableInputFormat;
+
@Override
protected void setup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
- schema = HCatInputFormat.getTableSchema(context.getConfiguration());
- columnSize = schema.getFields().size();
+ Configuration conf = context.getConfiguration();
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ String iiName = conf.get(BatchConstants.CFG_II_NAME);
+ IIInstance iiInstance = IIManager.getInstance(config).getII(iiName);
+ IISegment seg = iiInstance.getFirstSegment();
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(seg).getFlatTableInputFormat();
}
@Override
- public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+ public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
- HCatFieldSchema fieldSchema = null;
- for (short i = 0; i < columnSize; i++) {
+ String[] row = flatTableInputFormat.parseMapperInput(record);
+
+ for (short i = 0; i < row.length; i++) {
outputKey.set(i);
- fieldSchema = schema.get(i);
- Object fieldValue = record.get(fieldSchema.getName(), schema);
- if (fieldValue == null)
+ if (row[i] == null)
continue;
- byte[] bytes = Bytes.toBytes(fieldValue.toString());
+ byte[] bytes = Bytes.toBytes(row[i].toString());
outputValue.set(bytes, 0, bytes.length);
context.write(outputKey, outputValue);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aa4944d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
index 30e653b..86fedf0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
@@ -18,8 +18,15 @@
package org.apache.kylin.engine.mr.invertedindex;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
/**
*/
public class IIJob extends DefaultChainedExecutable {
@@ -47,4 +54,20 @@ public class IIJob extends DefaultChainedExecutable {
return getParam(SEGMENT_ID);
}
+
+ public static IIJob createBuildJob(IISegment seg, String submitter, JobEngineConfig config) {
+ return initialJob(seg, "BUILD", submitter, config);
+ }
+
+ private static IIJob initialJob(IISegment seg, String type, String submitter, JobEngineConfig config) {
+ IIJob result = new IIJob();
+ SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+ format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
+ result.setIIName(seg.getIIInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
+ result.setSubmitter(submitter);
+ return result;
+ }
+
}