You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/21 02:29:31 UTC
[6/7] incubator-kylin git commit: KYLIN-878 HBase storage abstraction
for cubing flow
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 79430f6..3217e4b 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,120 +23,117 @@ import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
import org.apache.kylin.job.common.HadoopShellExecutable;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
+import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
import com.google.common.base.Preconditions;
/**
- * Hold reusable methods for real builders.
+ * Hold reusable steps for builders.
*/
-abstract public class JobBuilderSupport {
+public class JobBuilderSupport {
- protected final JobEngineConfig config;
- protected final CubeSegment seg;
- protected final String submitter;
+ final protected JobEngineConfig config;
+ final protected CubeSegment seg;
+ final protected String submitter;
- protected JobBuilderSupport(CubeSegment seg, String submitter) {
+ public JobBuilderSupport(CubeSegment seg, String submitter) {
Preconditions.checkNotNull(seg, "segment cannot be null");
this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
this.seg = seg;
this.submitter = submitter;
}
- protected MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
- MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
- rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
+ public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
+ return createFactDistinctColumnsStep(jobId, false);
+ }
+
+ public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
+ return createFactDistinctColumnsStep(jobId, true);
+ }
+
+ private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+ result.setMapReduceJobClass(FactDistinctColumnsJob.class);
StringBuilder cmd = new StringBuilder();
-
appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+ appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+ appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
- rowkeyDistributionStep.setMapReduceParams(cmd.toString());
- rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
- return rowkeyDistributionStep;
+ result.setMapReduceParams(cmd.toString());
+ return result;
}
- protected HadoopShellExecutable createCreateHTableStep(String jobId) {
- HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
- createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+ public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
+ // base cuboid job
+ HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+ buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(config.isInMemCubing()));
-
- createHtableStep.setJobParams(cmd.toString());
- createHtableStep.setJobClass(CreateHTableJob.class);
+ appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
- return createHtableStep;
+ buildDictionaryStep.setJobParams(cmd.toString());
+ buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
+ return buildDictionaryStep;
}
- protected MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
- MapReduceExecutable createHFilesStep = new MapReduceExecutable();
- createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
-
- createHFilesStep.setMapReduceParams(cmd.toString());
- createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
- createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-
- return createHFilesStep;
+ public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
+ final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
+ updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+ updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
+ updateCubeInfoStep.setSegmentId(seg.getUuid());
+ updateCubeInfoStep.setCubingJobId(jobId);
+ return updateCubeInfoStep;
}
- protected HadoopShellExecutable createBulkLoadStep(String jobId) {
- HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
- bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-
- bulkLoadStep.setJobParams(cmd.toString());
- bulkLoadStep.setJobClass(BulkLoadJob.class);
-
- return bulkLoadStep;
+ public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
+ MergeDictionaryStep result = new MergeDictionaryStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ return result;
}
-
- protected GarbageCollectionStep createGarbageCollectionStep(List<String> oldHtables, String interimTable) {
- GarbageCollectionStep result = new GarbageCollectionStep();
- result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
- result.setOldHTables(oldHtables);
- result.setOldHiveTable(interimTable);
+
+ public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
+ UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
+ result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setCubingJobId(jobId);
return result;
}
- protected String getJobWorkingDir(String jobId) {
+ // ============================================================================
+
+ public String getJobWorkingDir(String jobId) {
return getJobWorkingDir(config, jobId);
}
- protected String getCuboidRootPath(String jobId) {
+ public String getCuboidRootPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
}
- protected String getCuboidRootPath(CubeSegment seg) {
+ public String getCuboidRootPath(CubeSegment seg) {
return getCuboidRootPath(seg.getLastBuildJobID());
}
- protected void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
+ public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
try {
String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
if (jobConf != null && jobConf.length() > 0) {
@@ -146,25 +143,16 @@ abstract public class JobBuilderSupport {
throw new RuntimeException(e);
}
}
-
- protected String getFactDistinctColumnsPath(String jobId) {
+
+ public String getFactDistinctColumnsPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
}
- protected String getStatisticsPath(String jobId) {
+ public String getStatisticsPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
}
-
- protected String getHFilePath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
- }
-
- protected String getRowkeyDistributionOutputPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
- }
-
// ============================================================================
// static methods also shared by other job flow participant
// ----------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index b374a99..61328c9 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -18,15 +18,9 @@
package org.apache.kylin.engine.mr;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.TableSourceFactory;
public class MRBatchCubingEngine implements IBatchCubingEngine {
@@ -40,25 +34,14 @@ public class MRBatchCubingEngine implements IBatchCubingEngine {
return new BatchMergeJobBuilder(mergeSegment, submitter).build();
}
- public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
- TableDesc tableDesc = getTableDesc(seg.getCubeDesc().getFactTable());
- return getMRInput(tableDesc).getBatchCubingInputSide(seg);
- }
-
- public static IMRTableInputFormat getTableInputFormat(String tableName) {
- return getTableInputFormat(getTableDesc(tableName));
- }
-
- public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
- return getMRInput(tableDesc).getTableInputFormat(tableDesc);
- }
-
- private static IMRInput getMRInput(TableDesc tableDesc) {
- return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class);
+ @Override
+ public Class<?> getSourceInterface() {
+ return IMRInput.class;
}
- private static TableDesc getTableDesc(String tableName) {
- return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+ @Override
+ public Class<?> getStorageInterface() {
+ return IMROutput.class;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..57ec128
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+ @Override
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return new BatchCubingJobBuilder2(newSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+ }
+
+ @Override
+ public Class<?> getSourceInterface() {
+ return IMRInput.class;
+ }
+
+ @Override
+ public Class<?> getStorageInterface() {
+ return IMROutput2.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
new file mode 100644
index 0000000..dc0533e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -0,0 +1,55 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.storage.StorageFactory2;
+
+public class MRUtil {
+
+ public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ return TableSourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
+ }
+
+ public static IMRTableInputFormat getTableInputFormat(String tableName) {
+ return getTableInputFormat(getTableDesc(tableName));
+ }
+
+ public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
+ return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
+ }
+
+ private static TableDesc getTableDesc(String tableName) {
+ return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+ }
+
+ public static IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchMergeOutputSide(seg);
+ }
+
+ public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
+ }
+
+ public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/engine/mr/MergeDictionaryStep.java
deleted file mode 100644
index 0010174..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/MergeDictionaryStep.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.io.IOException;
-import java.util.*;
-
-public class MergeDictionaryStep extends AbstractExecutable {
-
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
- public MergeDictionaryStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- KylinConfig conf = context.getConfig();
- final CubeManager mgr = CubeManager.getInstance(conf);
- final CubeInstance cube = mgr.getCube(getCubeName());
- final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
- final List<CubeSegment> mergingSegments = getMergingSegments(cube);
-
- Collections.sort(mergingSegments);
-
- try {
- checkLookupSnapshotsMustIncremental(mergingSegments);
-
- makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
- makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
- cubeBuilder.setToUpdateSegs(newSegment);
- mgr.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to merge dictionary or lookup snapshots", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- private List<CubeSegment> getMergingSegments(CubeInstance cube) {
- List<String> mergingSegmentIds = getMergingSegmentIds();
- List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
- for (String id : mergingSegmentIds) {
- result.add(cube.getSegmentById(id));
- }
- return result;
- }
-
- private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
-
- // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
- }
-
- /**
- * For the new segment, we need to create dictionaries for it, too. For
- * those dictionaries on fact table, create it by merging underlying
- * dictionaries For those dictionaries on lookup table, just copy it from
- * any one of the merging segments, it's guaranteed to be consistent(checked
- * in CubeSegmentValidator)
- *
- * @param cube
- * @param newSeg
- * @throws IOException
- */
- private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
- HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
- HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
- DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
-
- CubeDesc cubeDesc = cube.getDescriptor();
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- for (TblColRef col : dim.getColumnRefs()) {
- if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
- String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
- if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
- colsNeedMeringDict.add(col);
- } else {
- colsNeedCopyDict.add(col);
- }
- }
- }
- }
-
- for (TblColRef col : colsNeedMeringDict) {
- logger.info("Merging fact table dictionary on : " + col);
- List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
- for (CubeSegment segment : mergingSegments) {
- logger.info("Including fact table dictionary of segment : " + segment);
- if (segment.getDictResPath(col) != null) {
- DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
- dictInfos.add(dictInfo);
- }
- }
- mergeDictionaries(dictMgr, newSeg, dictInfos, col);
- }
-
- for (TblColRef col : colsNeedCopyDict) {
- String path = mergingSegments.get(0).getDictResPath(col);
- newSeg.putDictResPath(col, path);
- }
- }
-
- private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
- DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
- if (dictInfo != null)
- cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-
- return dictInfo;
- }
-
- /**
- * make snapshots for the new segment by copying from one of the underlying
- * merging segments. it's guaranteed to be consistent(checked in
- * CubeSegmentValidator)
- *
- * @param cube
- * @param newSeg
- */
- private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
- CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
- for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
- newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
- }
- }
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setMergingSegmentIds(List<String> ids) {
- setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
- }
-
- private List<String> getMergingSegmentIds() {
- final String ids = getParam(MERGING_SEGMENT_IDS);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id: splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterBuildStep.java
deleted file mode 100644
index 8361f7c..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterBuildStep.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
-
- private static final String SEGMENT_ID = "segmentId";
- private static final String CUBE_NAME = "cubeName";
- private static final String CUBING_JOB_ID = "cubingJobId";
-
- public UpdateCubeInfoAfterBuildStep() {
- super();
- }
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setCubingJobId(String id) {
- setParam(CUBING_JOB_ID, id);
- }
-
- private String getCubingJobId() {
- return getParam(CUBING_JOB_ID);
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(getCubeName());
- final CubeSegment segment = cube.getSegmentById(getSegmentId());
-
- CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
- long sourceCount = cubingJob.findSourceRecordCount();
- long sourceSizeBytes = cubingJob.findSourceSizeBytes();
- long cubeSizeBytes = cubingJob.findCubeSizeBytes();
- boolean segmentReady = cubeSizeBytes > 0; // for build+merge scenario, convert HFile not happen yet, so cube size is 0
-
- segment.setLastBuildJobID(getCubingJobId());
- segment.setLastBuildTime(System.currentTimeMillis());
- segment.setSizeKB(cubeSizeBytes / 1024);
- segment.setInputRecords(sourceCount);
- segment.setInputRecordsSize(sourceSizeBytes);
-
- try {
- if (segmentReady) {
- cubeManager.promoteNewlyBuiltSegments(cube, segment);
- } else {
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
- cubeBuilder.setToUpdateSegs(segment);
- cubeManager.updateCube(cubeBuilder);
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to update cube after build", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterMergeStep.java
deleted file mode 100644
index 7defec4..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterMergeStep.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
-
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
- private static final String CUBING_JOB_ID = "cubingJobId";
-
- private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- public UpdateCubeInfoAfterMergeStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeInstance cube = cubeManager.getCube(getCubeName());
-
- CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
- if (mergedSegment == null) {
- return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
- }
-
- CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
- long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-
- // collect source statistics
- List<String> mergingSegmentIds = getMergingSegmentIds();
- if (mergingSegmentIds.isEmpty()) {
- return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
- }
- long sourceCount = 0L;
- long sourceSize = 0L;
- for (String id : mergingSegmentIds) {
- CubeSegment segment = cube.getSegmentById(id);
- sourceCount += segment.getInputRecords();
- sourceSize += segment.getInputRecordsSize();
- }
-
- // update segment info
- mergedSegment.setSizeKB(cubeSizeBytes / 1024);
- mergedSegment.setInputRecords(sourceCount);
- mergedSegment.setInputRecordsSize(sourceSize);
- mergedSegment.setLastBuildJobID(getCubingJobId());
- mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
- try {
- cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
- return new ExecuteResult(ExecuteResult.State.SUCCEED);
- } catch (IOException e) {
- logger.error("fail to update cube after merge", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setMergingSegmentIds(List<String> ids) {
- setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
- }
-
- private List<String> getMergingSegmentIds() {
- final String ids = getParam(MERGING_SEGMENT_IDS);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
- public void setCubingJobId(String id) {
- setParam(CUBING_JOB_ID, id);
- }
-
- private String getCubingJobId() {
- return getParam(CUBING_JOB_ID);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
new file mode 100644
index 0000000..13175ec
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class InMemCuboidJob extends AbstractHadoopJob {
+
+ protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_JOB_FLOW_ID);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job);
+
+ // add metadata to distributed cache
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ long timeout = 1000*60*60L; // 1 hour
+ job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
+
+ // set input
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
+
+ // set mapper
+ job.setMapperClass(InMemCuboidMapper.class);
+ job.setMapOutputKeyClass(ByteArrayWritable.class);
+ job.setMapOutputValueClass(ByteArrayWritable.class);
+
+ // set output
+ IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
+ storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in CuboidJob", e);
+ printUsage(options);
+ throw e;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ InMemCuboidJob job = new InMemCuboidJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
new file mode 100644
index 0000000..5ecd26a
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -0,0 +1,122 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
+
+ private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
+ private CubeInstance cube;
+ private CubeDesc cubeDesc;
+ private CubeSegment cubeSegment;
+ private IMRTableInputFormat flatTableInputFormat;
+
+ private int counter;
+ private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
+ private Future<?> future;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+ cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+
+ Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+
+ for (DimensionDesc dim : cubeDesc.getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (cubeDesc.getRowkey().isUseDictionary(col)) {
+ Dictionary<?> dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ logger.warn("Dictionary for " + col + " was not found.");
+ }
+
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
+ }
+ }
+
+ DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+
+ }
+
+ @Override
+ public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+ // put each row to the queue
+ String[] row = flatTableInputFormat.parseMapperInput(record);
+ List<String> rowAsList = Arrays.asList(row);
+
+ while (!future.isDone()) {
+ if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+ break;
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ logger.info("Totally handled " + counter + " records!");
+
+ while (!future.isDone()) {
+ if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
+ break;
+ }
+ }
+
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+ }
+ queue.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
new file mode 100644
index 0000000..1b3e2b2
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -0,0 +1,95 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.mr.KylinReducer;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
+
+ private IMRStorageOutputFormat storageOutputFormat;
+ private MeasureCodec codec;
+ private MeasureAggregators aggs;
+
+ private int counter;
+ private Object[] input;
+ private Object[] result;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+ boolean isMerge = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE));
+
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ if (isMerge)
+ storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
+ else
+ storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
+
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
+
+ codec = new MeasureCodec(measuresDescs);
+ aggs = new MeasureAggregators(measuresDescs);
+
+ input = new Object[measuresDescs.size()];
+ result = new Object[measuresDescs.size()];
+ }
+
+ @Override
+ public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
+
+ aggs.reset();
+
+ for (ByteArrayWritable value : values) {
+ codec.decode(value.asBuffer(), input);
+ aggs.aggregate(input);
+ }
+ aggs.collectStates(result);
+
+ storageOutputFormat.doReducerOutput(key, result, context);
+
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
new file mode 100644
index 0000000..f2a5fcf
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -0,0 +1,96 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+
+/**
+ */
+public class MapContextGTRecordWriter implements ICuboidWriter {
+
+ private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
+ protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
+ private Long lastCuboidId;
+ protected CubeSegment cubeSegment;
+ protected CubeDesc cubeDesc;
+
+ private int bytesLength;
+ private int dimensions;
+ private int measureCount;
+ private byte[] keyBuf;
+ private int[] measureColumnsIndex;
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private ByteArrayWritable outputKey = new ByteArrayWritable();
+ private ByteArrayWritable outputValue = new ByteArrayWritable();
+ private long cuboidRowCount = 0;
+
+ public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ this.mapContext = mapContext;
+ this.cubeDesc = cubeDesc;
+ this.cubeSegment = cubeSegment;
+ this.measureCount = cubeDesc.getMeasures().size();
+
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+
+ if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+ // output another cuboid
+ initVariables(cuboidId);
+ if (lastCuboidId != null) {
+ logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+ cuboidRowCount = 0;
+ }
+ }
+
+ cuboidRowCount++;
+ int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+ for (int x = 0; x < dimensions; x++) {
+ System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
+ offSet += record.get(x).length();
+ }
+
+ //output measures
+ valueBuf.clear();
+ record.exportColumns(measureColumnsIndex, valueBuf);
+
+ outputKey.set(keyBuf, 0, offSet);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ try {
+ mapContext.write(outputKey, outputValue);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initVariables(Long cuboidId) {
+ bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+ Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+ for (TblColRef column : cuboid.getColumns()) {
+ bytesLength += cubeSegment.getColumnLength(column);
+ }
+
+ keyBuf = new byte[bytesLength];
+ dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+ measureColumnsIndex = new int[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureColumnsIndex[i] = dimensions + i;
+ }
+
+ System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
new file mode 100644
index 0000000..3d3b1f4
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.CuboidJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromStorageJob extends CuboidJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_JOB_FLOW_ID);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+ Configuration conf = this.getConf();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+ // start job
+ String jobName = getOptionValue(OPTION_JOB_NAME);
+ System.out.println("Starting: " + jobName);
+ job = Job.getInstance(conf, jobName);
+
+ setJobClasspath(job);
+
+ // add metadata to distributed cache
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
+
+ // configure mapper input
+ IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
+ storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
+
+ // configure reducer output
+ IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
+ storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in MergeCuboidFromHBaseJob", e);
+ printUsage(options);
+ throw e;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
new file mode 100644
index 0000000..6535777
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
+
+ private KylinConfig config;
+ private String cubeName;
+ private String segmentName;
+ private CubeManager cubeManager;
+ private CubeInstance cube;
+ private CubeDesc cubeDesc;
+ private CubeSegment mergedCubeSegment;
+ private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
+ private IMRStorageInputFormat storageInputFormat;
+
+ private ByteArrayWritable outputKey = new ByteArrayWritable();
+ private byte[] newKeyBuf;
+ private RowKeySplitter rowKeySplitter;
+
+ private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private MeasureCodec codec;
+ private ByteArrayWritable outputValue = new ByteArrayWritable();
+
+ private Boolean checkNeedMerging(TblColRef col) throws IOException {
+ Boolean ret = dictsNeedMerging.get(col);
+ if (ret != null)
+ return ret;
+ else {
+ ret = cubeDesc.getRowkey().isUseDictionary(col);
+ if (ret) {
+ String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+ ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+ }
+ dictsNeedMerging.put(col, ret);
+ return ret;
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+
+ cubeManager = CubeManager.getInstance(config);
+ cube = cubeManager.getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
+
+ newKeyBuf = new byte[256];// size will auto-grow
+
+ sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
+ logger.info(sourceCubeSegment.toString());
+
+ this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
+ codec = new MeasureCodec(measuresDescs);
+ }
+
+ @Override
+ public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
+ Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
+ ByteArrayWritable key = pair.getFirst();
+ Object[] value = pair.getSecond();
+
+ Preconditions.checkState(key.offset() == 0);
+
+ long cuboidID = rowKeySplitter.split(key.array(), key.length());
+ Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+
+ SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+ int bufOffset = 0;
+ BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
+ bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+ TblColRef col = cuboid.getColumns().get(i);
+
+ if (this.checkNeedMerging(col)) {
+ // if dictionary on fact table column, needs rewrite
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+ Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+ while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBuf;
+ newKeyBuf = new byte[2 * newKeyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+ }
+
+ int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
+
+ int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
+ int idInMergedDict;
+ if (size < 0) {
+ idInMergedDict = mergedDict.nullId();
+ } else {
+ idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+ }
+ BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
+
+ bufOffset += mergedDict.getSizeOfId();
+ } else {
+ // keep as it is
+ while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBuf;
+ newKeyBuf = new byte[2 * newKeyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+ }
+
+ System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
+ bufOffset += splittedByteses[i + 1].length;
+ }
+ }
+ byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
+ outputKey.set(newKey, 0, newKey.length);
+
+ valueBuf.clear();
+ codec.encode(value, valueBuf);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+
+ context.write(outputKey, outputValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
new file mode 100644
index 0000000..d99cb03
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.util.*;
+
+public class MergeDictionaryStep extends AbstractExecutable {
+
+ private static final String CUBE_NAME = "cubeName";
+ private static final String SEGMENT_ID = "segmentId";
+ private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
+
+ public MergeDictionaryStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig conf = context.getConfig();
+ final CubeManager mgr = CubeManager.getInstance(conf);
+ final CubeInstance cube = mgr.getCube(getCubeName());
+ final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+ final List<CubeSegment> mergingSegments = getMergingSegments(cube);
+
+ Collections.sort(mergingSegments);
+
+ try {
+ checkLookupSnapshotsMustIncremental(mergingSegments);
+
+ makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
+ makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(newSegment);
+ mgr.updateCube(cubeBuilder);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to merge dictionary or lookup snapshots", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+ private List<CubeSegment> getMergingSegments(CubeInstance cube) {
+ List<String> mergingSegmentIds = getMergingSegmentIds();
+ List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
+ for (String id : mergingSegmentIds) {
+ result.add(cube.getSegmentById(id));
+ }
+ return result;
+ }
+
+ private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
+
+ // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
+ }
+
+ /**
+ * For the new segment, we need to create dictionaries for it, too. For
+ * those dictionaries on fact table, create it by merging underlying
+ * dictionaries For those dictionaries on lookup table, just copy it from
+ * any one of the merging segments, it's guaranteed to be consistent(checked
+ * in CubeSegmentValidator)
+ *
+ * @param cube
+ * @param newSeg
+ * @throws IOException
+ */
+ private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
+ HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
+ HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
+ DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
+
+ CubeDesc cubeDesc = cube.getDescriptor();
+ for (DimensionDesc dim : cubeDesc.getDimensions()) {
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
+ String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+ if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
+ colsNeedMeringDict.add(col);
+ } else {
+ colsNeedCopyDict.add(col);
+ }
+ }
+ }
+ }
+
+ for (TblColRef col : colsNeedMeringDict) {
+ logger.info("Merging fact table dictionary on : " + col);
+ List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
+ for (CubeSegment segment : mergingSegments) {
+ logger.info("Including fact table dictionary of segment : " + segment);
+ if (segment.getDictResPath(col) != null) {
+ DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+ dictInfos.add(dictInfo);
+ }
+ }
+ mergeDictionaries(dictMgr, newSeg, dictInfos, col);
+ }
+
+ for (TblColRef col : colsNeedCopyDict) {
+ String path = mergingSegments.get(0).getDictResPath(col);
+ newSeg.putDictResPath(col, path);
+ }
+ }
+
+ private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
+ DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
+ if (dictInfo != null)
+ cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
+
+ return dictInfo;
+ }
+
+ /**
+ * make snapshots for the new segment by copying from one of the underlying
+ * merging segments. it's guaranteed to be consistent(checked in
+ * CubeSegmentValidator)
+ *
+ * @param cube
+ * @param newSeg
+ */
+ private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
+ CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
+ for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
+ newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public void setCubeName(String cubeName) {
+ this.setParam(CUBE_NAME, cubeName);
+ }
+
+ private String getCubeName() {
+ return getParam(CUBE_NAME);
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.setParam(SEGMENT_ID, segmentId);
+ }
+
+ private String getSegmentId() {
+ return getParam(SEGMENT_ID);
+ }
+
+ public void setMergingSegmentIds(List<String> ids) {
+ setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
+ }
+
+ private List<String> getMergingSegmentIds() {
+ final String ids = getParam(MERGING_SEGMENT_IDS);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id: splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
new file mode 100644
index 0000000..9a7f761
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MergeStatisticsStep extends AbstractExecutable {
+
+ private static final String CUBE_NAME = "cubeName";
+ private static final String SEGMENT_ID = "segmentId";
+ private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
+ private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
+ protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
+
+ public MergeStatisticsStep() {
+ super();
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig kylinConf = context.getConfig();
+ final CubeManager mgr = CubeManager.getInstance(kylinConf);
+ final CubeInstance cube = mgr.getCube(getCubeName());
+ final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+
+ Configuration conf = new Configuration();
+ ResourceStore rs = ResourceStore.getStore(kylinConf);
+ try {
+
+ int averageSamplingPercentage = 0;
+ for (String segmentId : this.getMergingSegmentIds()) {
+ String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
+ InputStream is = rs.getResource(fileKey);
+ File tempFile = null;
+ FileOutputStream tempFileStream = null;
+ try {
+ tempFile = File.createTempFile(segmentId, ".seq");
+ tempFileStream = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+ } finally {
+ IOUtils.closeStream(is);
+ IOUtils.closeStream(tempFileStream);
+ }
+
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+ LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ if (key.get() == 0l) {
+ // sampling percentage;
+ averageSamplingPercentage += Bytes.toInt(value.getBytes());
+ } else {
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ if (cuboidHLLMap.get(key.get()) != null) {
+ cuboidHLLMap.get(key.get()).merge(hll);
+ } else {
+ cuboidHLLMap.put(key.get(), hll);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+ averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
+ FactDistinctColumnsReducer.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
+ Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ FileSystem fs = statisticsFilePath.getFileSystem(conf);
+ FSDataInputStream is = fs.open(statisticsFilePath);
+ try {
+ // put the statistics to metadata store
+ String statisticsFileName = newSegment.getStatisticsResourcePath();
+ rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+ } finally {
+ IOUtils.closeStream(is);
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to merge cuboid statistics", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+
+ public void setCubeName(String cubeName) {
+ this.setParam(CUBE_NAME, cubeName);
+ }
+
+ private String getCubeName() {
+ return getParam(CUBE_NAME);
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.setParam(SEGMENT_ID, segmentId);
+ }
+
+ private String getSegmentId() {
+ return getParam(SEGMENT_ID);
+ }
+
+ public void setMergingSegmentIds(List<String> ids) {
+ setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
+ }
+
+ private List<String> getMergingSegmentIds() {
+ final String ids = getParam(MERGING_SEGMENT_IS);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public void setMergedStatisticsPath(String path) {
+ setParam(MERGED_STATISTICS_PATH, path);
+ }
+
+ private String getMergedStatisticsPath() {
+ return getParam(MERGED_STATISTICS_PATH);
+ }
+}