You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/21 12:53:15 UTC
[1/2] kylin git commit: KYLIN-1245 Put layer cubing and in-mem cubing
side by side, random switch between them
Repository: kylin
Updated Branches:
refs/heads/2.x-staging 2ef13d08e -> 3fc3883a4
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index ac304f5..4c2737d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -18,53 +18,12 @@
package org.apache.kylin.storage.hbase.steps;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.steps.MergeCuboidMapper;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
/**
* This "Transition" impl generates cuboid files and then convert to HFile.
* The additional step slows down build process, but the gains is merge
@@ -77,6 +36,7 @@ import com.google.common.collect.Lists;
*/
public class HBaseMROutput2Transition implements IMROutput2 {
+ @SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HBaseMROutput2Transition.class);
@Override
@@ -85,19 +45,12 @@ public class HBaseMROutput2Transition implements IMROutput2 {
HBaseMRSteps steps = new HBaseMRSteps(seg);
@Override
- public IMRStorageOutputFormat getStorageOutputFormat() {
- return new HBaseOutputFormat(seg);
- }
-
- @Override
public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
}
@Override
- public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
- String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
@@ -110,34 +63,17 @@ public class HBaseMROutput2Transition implements IMROutput2 {
}
@Override
- public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) {
- return new IMRBatchMergeInputSide2() {
- @Override
- public IMRStorageInputFormat getStorageInputFormat() {
- return new HBaseInputFormat(seg);
- }
- };
- }
-
- @Override
public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
return new IMRBatchMergeOutputSide2() {
HBaseMRSteps steps = new HBaseMRSteps(seg);
@Override
- public IMRStorageOutputFormat getStorageOutputFormat() {
- return new HBaseOutputFormat(seg);
- }
-
- @Override
public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
}
@Override
- public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
- String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
@@ -148,263 +84,4 @@ public class HBaseMROutput2Transition implements IMROutput2 {
}
};
}
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private static class HBaseInputFormat implements IMRStorageInputFormat {
- final CubeSegment seg;
-
- public HBaseInputFormat(CubeSegment seg) {
- this.seg = seg;
- }
-
- @Override
- public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
- // merge by cuboid files
- if (isMergeFromCuboidFiles(job.getConfiguration())) {
- logger.info("Merge from cuboid files for " + seg);
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- addCuboidInputDirs(job);
-
- job.setMapperClass(mapperClz);
- job.setMapOutputKeyClass(outputKeyClz);
- job.setMapOutputValueClass(outputValueClz);
- }
- // merge from HTable scan
- else {
- logger.info("Merge from HTables for " + seg);
-
- Configuration conf = job.getConfiguration();
- HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
- List<Scan> scans = new ArrayList<Scan>();
- for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
- Scan scan = new Scan();
- scan.setCaching(512); // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false); // don't set to true for MR jobs
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
- scans.add(scan);
- }
- TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
- TableMapReduceUtil.initCredentials(job);
- }
- }
-
- private void addCuboidInputDirs(Job job) throws IOException {
- List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
- HBaseMRSteps steps = new HBaseMRSteps(seg);
-
- String[] inputs = new String[mergingSegments.size()];
- for (int i = 0; i < mergingSegments.size(); i++) {
- CubeSegment mergingSeg = mergingSegments.get(i);
- String cuboidPath = steps.getCuboidRootPath(mergingSeg);
- inputs[i] = cuboidPath + (cuboidPath.endsWith("/") ? "" : "/") + "*";
- }
-
- AbstractHadoopJob.addInputDirs(inputs, job);
- }
-
- @Override
- public CubeSegment findSourceSegment(Context context) throws IOException {
- // merge by cuboid files
- if (isMergeFromCuboidFiles(context.getConfiguration())) {
- FileSplit fileSplit = (FileSplit) context.getInputSplit();
- return MergeCuboidMapper.findSourceSegment(fileSplit, seg.getCubeInstance());
- }
- // merge from HTable scan
- else {
- TableSplit currentSplit = (TableSplit) context.getInputSplit();
- byte[] tableName = currentSplit.getTableName();
- String htableName = Bytes.toString(tableName);
-
- // decide which source segment
- for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
- String segmentHtable = segment.getStorageLocationIdentifier();
- if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
- return segment;
- }
- }
- throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
- }
- }
-
- transient ByteArrayWritable parsedKey;
- transient Object[] parsedValue;
- transient Pair<ByteArrayWritable, Object[]> parsedPair;
-
- transient MeasureCodec measureCodec;
- transient RowValueDecoder[] rowValueDecoders;
-
- @Override
- public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
- // lazy init
- if (parsedPair == null) {
- parsedKey = new ByteArrayWritable();
- parsedValue = new Object[seg.getCubeDesc().getMeasures().size()];
- parsedPair = Pair.newPair(parsedKey, parsedValue);
- }
-
- // merge by cuboid files
- if (isMergeFromCuboidFiles(null)) {
- return parseMapperInputFromCuboidFile(inKey, inValue);
- }
- // merge from HTable scan
- else {
- return parseMapperInputFromHTable(inKey, inValue);
- }
- }
-
- private Pair<ByteArrayWritable, Object[]> parseMapperInputFromCuboidFile(Object inKey, Object inValue) {
- // lazy init
- if (measureCodec == null) {
- measureCodec = new MeasureCodec(seg.getCubeDesc().getMeasures());
- }
-
- Text key = (Text) inKey;
- parsedKey.set(key.getBytes(), 0, key.getLength());
-
- Text value = (Text) inValue;
- measureCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), parsedValue);
-
- return parsedPair;
- }
-
- private Pair<ByteArrayWritable, Object[]> parseMapperInputFromHTable(Object inKey, Object inValue) {
- // lazy init
- if (rowValueDecoders == null) {
- List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- valueDecoderList.add(new RowValueDecoder(colDesc));
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
- }
- }
- rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
- }
-
- ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
- parsedKey.set(key.get(), key.getOffset(), key.getLength());
-
- Result hbaseRow = (Result) inValue;
- for (int i = 0; i < rowValueDecoders.length; i++) {
- rowValueDecoders[i].decode(hbaseRow);
- rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
- }
-
- return parsedPair;
- }
-
- transient Boolean isMergeFromCuboidFiles;
-
- // merge from cuboid files is better than merge from HTable, because no workload on HBase region server
- private boolean isMergeFromCuboidFiles(Configuration jobConf) {
- // cache in this object?
- if (isMergeFromCuboidFiles != null)
- return isMergeFromCuboidFiles.booleanValue();
-
- final String confKey = "kylin.isMergeFromCuboidFiles";
-
- // cache in job configuration?
- if (jobConf != null) {
- String result = jobConf.get(confKey);
- if (result != null) {
- isMergeFromCuboidFiles = Boolean.valueOf(result);
- return isMergeFromCuboidFiles.booleanValue();
- }
- }
-
- boolean result = true;
-
- try {
- List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
- HBaseMRSteps steps = new HBaseMRSteps(seg);
- for (CubeSegment mergingSeg : mergingSegments) {
- String cuboidRootPath = steps.getCuboidRootPath(mergingSeg);
- FileSystem fs = HadoopUtil.getFileSystem(cuboidRootPath);
-
- boolean cuboidFileExist = fs.exists(new Path(cuboidRootPath));
- if (cuboidFileExist == false) {
- logger.info("Merge from HTable because " + cuboidRootPath + " does not exist");
- result = false;
- break;
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- // put in cache
- isMergeFromCuboidFiles = Boolean.valueOf(result);
- if (jobConf != null) {
- jobConf.set(confKey, "" + result);
- }
- return result;
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private static class HBaseOutputFormat implements IMRStorageOutputFormat {
- final CubeSegment seg;
-
- Text outputKey;
- Text outputValue;
- ByteBuffer valueBuf;
- MeasureCodec codec;
-
- public HBaseOutputFormat(CubeSegment seg) {
- this.seg = seg;
- }
-
- @Override
- public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
- job.setReducerClass(reducer);
-
- // estimate the reducer number
- String htableName = seg.getStorageLocationIdentifier();
- Configuration conf = HBaseConfiguration.create(job.getConfiguration());
- HTable htable = new HTable(conf, htableName);
- int regions = htable.getStartKeys().length + 1;
- htable.close();
-
- int reducerNum = regions * 3;
- reducerNum = Math.max(1, reducerNum);
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- reducerNum = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), reducerNum);
-
- job.setNumReduceTasks(reducerNum);
-
- // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
- FileOutputFormat.setOutputPath(job, output);
-
- HadoopUtil.deletePath(job.getConfiguration(), output);
- }
-
- @Override
- public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
- // lazy init
- if (outputKey == null) {
- outputKey = new Text();
- outputValue = new Text();
- valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- codec = new MeasureCodec(seg.getCubeDesc().getMeasures());
- }
-
- outputKey.set(key.array(), key.offset(), key.length());
-
- valueBuf.clear();
- codec.encode(value, valueBuf);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
- }
- }
-
-}
+}
\ No newline at end of file
[2/2] kylin git commit: KYLIN-1245 Put layer cubing and in-mem cubing
side by side, random switch between them
Posted by li...@apache.org.
KYLIN-1245 Put layer cubing and in-mem cubing side by side, random switch between them
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3fc3883a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3fc3883a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3fc3883a
Branch: refs/heads/2.x-staging
Commit: 3fc3883a41bfab899c31b300db4c0757bd5589d3
Parents: 2ef13d0
Author: Yang Li <li...@apache.org>
Authored: Mon Dec 21 19:52:31 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Dec 21 19:52:31 2015 +0800
----------------------------------------------------------------------
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 97 +++++-
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 35 +-
.../org/apache/kylin/engine/mr/IMROutput2.java | 104 +++---
.../java/org/apache/kylin/engine/mr/MRUtil.java | 6 -
.../engine/mr/common/AbstractHadoopJob.java | 1 -
.../kylin/engine/mr/steps/InMemCuboidJob.java | 201 ++++++++++-
.../engine/mr/steps/InMemCuboidReducer.java | 28 +-
.../mr/steps/MergeCuboidFromStorageJob.java | 94 ------
.../mr/steps/MergeCuboidFromStorageMapper.java | 239 -------------
.../apache/kylin/engine/spark/SparkCubing.java | 4 +-
.../storage/hbase/steps/CreateHTableJob.java | 143 +-------
.../storage/hbase/steps/HBaseMROutput2.java | 290 ----------------
.../hbase/steps/HBaseMROutput2Transition.java | 331 +------------------
13 files changed, 378 insertions(+), 1195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index f8fbc33..476a763 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -18,11 +18,16 @@
package org.apache.kylin.engine.mr;
+import java.util.Random;
+
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
@@ -37,14 +42,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
super(newSegment, submitter);
this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment)seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment) seg);
}
public CubingJob build() {
logger.info("MR_V2 new job to BUILD segment " + seg);
-
- final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
+
+ final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
@@ -56,8 +62,14 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
outputSide.addStepPhase2_BuildDictionary(result);
// Phase 3: Build Cube
- result.addTask(createInMemCubingStep(jobId));
- outputSide.addStepPhase3_BuildCube(result);
+ if (new Random().nextBoolean()) {
+ // layer cubing
+ addLayerCubingSteps(result, jobId, cuboidRootPath);
+ } else {
+ // or in-mem cubing
+ result.addTask(createInMemCubingStep(jobId, cuboidRootPath));
+ }
+ outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
@@ -67,6 +79,20 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
+ private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+ RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey();
+ final int groupRowkeyColumnsCount = ((CubeSegment) seg).getCubeDesc().getBuildLevel();
+ final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
+ final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+ // base cuboid step
+ result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
+ // n dim cuboid steps
+ for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnsCount - i;
+ result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
+ }
+ }
+
private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
SaveStatisticsStep result = new SaveStatisticsStep();
result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
@@ -76,19 +102,19 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
- private MapReduceExecutable createInMemCubingStep(String jobId) {
+ private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
// base cuboid job
MapReduceExecutable cubeStep = new MapReduceExecutable();
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "output", cuboidRootPath);
appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName());
- appendExecCmdParameters(cmd, "jobflowid", jobId);
cubeStep.setMapReduceParams(cmd.toString());
cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
@@ -96,4 +122,59 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return cubeStep;
}
+ private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
+ // base cuboid job
+ MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+
+ baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
+
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
+ appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "level", "0");
+
+ baseCuboidStep.setMapReduceParams(cmd.toString());
+ baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+ baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+ return baseCuboidStep;
+ }
+
+ private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
+ // ND cuboid job
+ MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
+
+ ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+ appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
+
+ ndCuboidStep.setMapReduceParams(cmd.toString());
+ ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
+ return ndCuboidStep;
+ }
+
+ private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+ String[] paths = new String[groupRowkeyColumnsCount + 1];
+ for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnCount - i;
+ if (dimNum == totalRowkeyColumnCount) {
+ paths[i] = cuboidRootPath + "base_cuboid";
+ } else {
+ paths[i] = cuboidRootPath + dimNum + "d_cuboid";
+ }
+ }
+ return paths;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 48a717f..008d489 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
@@ -39,23 +39,24 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment)seg);
+ this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg);
}
public CubingJob build() {
logger.info("MR_V2 new job to MERGE segment " + seg);
-
- final CubeSegment cubeSegment = (CubeSegment)seg;
+
+ final CubeSegment cubeSegment = (CubeSegment) seg;
final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
final List<String> mergingSegmentIds = Lists.newArrayList();
- final List<String> mergingHTables = Lists.newArrayList();
+ final List<String> mergingCuboidPaths = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
- mergingHTables.add(merging.getStorageLocationIdentifier());
+ mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
}
// Phase 1: Merge Dictionary
@@ -63,10 +64,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
- // Phase 2: Merge Cube
- String formattedTables = StringUtil.join(mergingHTables, ",");
- result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
- outputSide.addStepPhase2_BuildCube(result);
+ // Phase 2: Merge Cube Files
+ String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+ result.addTask(createMergeCuboidDataStep(cubeSegment, formattedPath, cuboidRootPath));
+ outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
// Phase 3: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
@@ -85,20 +86,20 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
return result;
}
- private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+ private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+ appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getRealization().getName() + "_Step");
- appendExecCmdParameters(cmd, "jobflowid", jobId);
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", outputPath);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
- mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
return mergeCuboidDataStep;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 3ad51c5..844eb07 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -1,13 +1,23 @@
-package org.apache.kylin.engine.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
-import java.io.IOException;
+package org.apache.kylin.engine.mr;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -17,93 +27,59 @@ public interface IMROutput2 {
public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
/**
- * Participate the batch cubing flow as the output side.
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 3.
*
* - Phase 1: Create Flat Table
* - Phase 2: Build Dictionary
- * - Phase 3: Build Cube (with StorageOutputFormat)
+ * - Phase 3: Build Cube
* - Phase 4: Update Metadata & Cleanup
*/
public interface IMRBatchCubingOutputSide2 {
- /** Return an output format for Phase 3: Build Cube MR */
- public IMRStorageOutputFormat getStorageOutputFormat();
-
/** Add step that executes after build dictionary and before build cube. */
public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
- /** Add step that executes after build cube. */
- public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
-
- /** Add step that does any necessary clean up. */
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
- }
-
- public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
-
- public interface IMRBatchMergeInputSide2 {
- public IMRStorageInputFormat getStorageInputFormat();
- }
-
- /** Read in a cube as input of merge. Configure the input file format of mapper. */
- @SuppressWarnings("rawtypes")
- public interface IMRStorageInputFormat {
-
- /** Configure MR mapper class and input file format. */
- public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException;
-
- /** Given a mapper context, figure out which segment the mapper reads from. */
- public CubeSegment findSourceSegment(Mapper.Context context) throws IOException;
-
/**
- * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
+ * Add step that saves cuboids from HDFS to storage.
*
- * @return <code>ByteArrayWritable</code> is the cuboid ID (8 bytes) + dimension values in dictionary encoding
- * <code>Object[]</code> is the measure values in order of <code>CubeDesc.getMeasures()</code>
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
*/
- public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
/** Return a helper to participate in batch merge job flow. */
public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
/**
- * Participate the batch merge flow as the output side.
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 2.
*
* - Phase 1: Merge Dictionary
- * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
+ * - Phase 2: Merge Cube
* - Phase 3: Update Metadata & Cleanup
*/
public interface IMRBatchMergeOutputSide2 {
- /** Return an input format for Phase 2: Merge Cube MR */
- public IMRStorageOutputFormat getStorageOutputFormat();
-
/** Add step that executes after merge dictionary and before merge cube. */
public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
- /** Add step that executes after merge cube. */
- public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
/** Add step that does any necessary clean up. */
public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
}
- /** Write out a cube. Configure the output file format of reducer and do the actual K-V output. */
- @SuppressWarnings("rawtypes")
- public interface IMRStorageOutputFormat {
-
- /** Configure MR reducer class and output file format. */
- public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
-
- /**
- * Write out a row of cuboid. Given the cuboid ID, dimensions, and measures, serialize in whatever
- * way and output to reducer context.
- *
- * @param key The cuboid ID (8 bytes) + dimension values in dictionary encoding
- * @param value The measure values in order of <code>CubeDesc.getMeasures()</code>
- * @param context The reducer context output goes to
- */
- public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 55fa9e2..41c8b6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -7,12 +7,10 @@ import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.IRealizationSegment;
import org.apache.kylin.source.SourceFactory;
import org.apache.kylin.storage.StorageFactory;
@@ -47,10 +45,6 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
}
- public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
- return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
- }
-
public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f031f76..21ceff7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -72,7 +72,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
- protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid");
protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 8f2b810..95eb725 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -18,23 +18,57 @@
package org.apache.kylin.engine.mr.steps;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
/**
*/
public class InMemCuboidJob extends AbstractHadoopJob {
@@ -47,13 +81,14 @@ public class InMemCuboidJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_JOB_FLOW_ID);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_OUTPUT_PATH);
parseOptions(options, args);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ String output = getOptionValue(OPTION_OUTPUT_PATH);
KylinConfig config = KylinConfig.getInstanceFromEnv();
CubeManager cubeMgr = CubeManager.getInstance(config);
@@ -84,9 +119,19 @@ public class InMemCuboidJob extends AbstractHadoopJob {
job.setMapOutputValueClass(ByteArrayWritable.class);
// set output
- IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
- storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+ job.setReducerClass(InMemCuboidReducer.class);
+ job.setNumReduceTasks(calculateReducerNum(cubeSeg));
+
+ // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ Path outputPath = new Path(output);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+
return waitForCompletion(job);
} catch (Exception e) {
logger.error("error in CuboidJob", e);
@@ -98,6 +143,154 @@ public class InMemCuboidJob extends AbstractHadoopJob {
}
}
+ private int calculateReducerNum(CubeSegment cubeSeg) throws IOException {
+ Configuration jobConf = job.getConfiguration();
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+ Map<Long, Double> cubeSizeMap = getCubeSizeMapFromCuboidStatistics(cubeSeg, kylinConfig, jobConf);
+ double totalSizeInM = 0;
+ for (Double cuboidSize : cubeSizeMap.values()) {
+ totalSizeInM += cuboidSize;
+ }
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB);
+
+ // at least 1 reducer
+ numReduceTasks = Math.max(1, numReduceTasks);
+ // no more than 5000 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ logger.info("Having total map input MB " + Math.round(totalSizeInM));
+ logger.info("Having per reduce MB " + perReduceInputMB);
+ logger.info("Setting " + "mapred.reduce.tasks" + "=" + numReduceTasks);
+ return numReduceTasks;
+ }
+
+ public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException {
+ ResourceStore rs = ResourceStore.getStore(kylinConfig);
+ String fileKey = cubeSegment.getStatisticsResourcePath();
+ InputStream is = rs.getResource(fileKey).inputStream;
+ File tempFile = null;
+ FileOutputStream tempFileStream = null;
+ try {
+ tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
+ tempFileStream = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+ } finally {
+ IOUtils.closeStream(is);
+ IOUtils.closeStream(tempFileStream);
+ }
+ Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
+
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+ int samplingPercentage = 25;
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+ LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ if (key.get() == 0L) {
+ samplingPercentage = Bytes.toInt(value.getBytes());
+ } else {
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+ counterMap.put(key.get(), hll);
+ }
+
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ IOUtils.closeStream(reader);
+ tempFile.delete();
+ }
+ return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage);
+ }
+
+ public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException {
+ Preconditions.checkArgument(samplingPercentage > 0);
+ return Maps.transformValues(counterMap, new Function<HyperLogLogPlusCounter, Long>() {
+ @Nullable
+ @Override
+ public Long apply(HyperLogLogPlusCounter input) {
+ return input.getCountEstimate() * 100 / samplingPercentage;
+ }
+ });
+ }
+
+ // return map of Cuboid ID => MB
+ public static Map<Long, Double> getCubeSizeMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException {
+ Map<Long, Long> rowCountMap = getCubeRowCountMapFromCuboidStatistics(cubeSegment, kylinConfig, conf);
+ Map<Long, Double> sizeMap = getCubeSizeMapFromRowCount(cubeSegment, rowCountMap);
+ return sizeMap;
+ }
+
+ public static Map<Long, Double> getCubeSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) {
+ final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+ final List<Integer> rowkeyColumnSize = Lists.newArrayList();
+ final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+ final List<TblColRef> columnList = baseCuboid.getColumns();
+
+ for (int i = 0; i < columnList.size(); i++) {
+ rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i)));
+ }
+
+ Map<Long, Double> sizeMap = Maps.newHashMap();
+ for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) {
+ sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize));
+ }
+ return sizeMap;
+ }
+
+ /**
+ * Estimate the cuboid's size
+ *
+ * @return the cuboid size in M bytes
+ */
+ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
+
+ int bytesLength = cubeSegment.getRowKeyPreambleSize();
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
+ for (int i = 0; i < parentCuboidIdActualLength; i++) {
+ if ((mask & cuboidId) > 0) {
+ bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i));
+ }
+ mask = mask >> 1;
+ }
+
+ // add the measure length
+ int space = 0;
+ boolean isMemoryHungry = false;
+ for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
+ if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) {
+ isMemoryHungry = true;
+ }
+ DataType returnType = measureDesc.getFunction().getReturnDataType();
+ space += returnType.getStorageBytesEstimate();
+ }
+ bytesLength += space;
+
+ double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L);
+ if (isMemoryHungry) {
+ logger.info("Cube is memory hungry, storage size estimation multiply 0.05");
+ ret *= 0.05;
+ } else {
+ logger.info("Cube is not memory hungry, storage size estimation multiply 0.25");
+ ret *= 0.25;
+ }
+ logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M.");
+ return ret;
+ }
+
public static void main(String[] args) throws Exception {
InMemCuboidJob job = new InMemCuboidJob();
int exitCode = ToolRunner.run(job, args);
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index c35e77f..9beacbb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -1,17 +1,18 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.MeasureAggregators;
@@ -27,13 +28,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
- private IMRStorageOutputFormat storageOutputFormat;
private MeasureCodec codec;
private MeasureAggregators aggs;
private int counter;
private Object[] input;
private Object[] result;
+
+ private Text outputKey;
+ private Text outputValue;
+ private ByteBuffer valueBuf;
@Override
protected void setup(Context context) throws IOException {
@@ -47,16 +51,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeDesc cubeDesc = cube.getDescriptor();
CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- if (isMerge)
- storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
- else
- storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
codec = new MeasureCodec(measuresDescs);
aggs = new MeasureAggregators(measuresDescs);
input = new Object[measuresDescs.size()];
result = new Object[measuresDescs.size()];
+
+ outputKey = new Text();
+ outputValue = new Text();
+ valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
}
@Override
@@ -70,8 +74,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
}
aggs.collectStates(result);
- storageOutputFormat.doReducerOutput(key, result, context);
+ // output key
+ outputKey.set(key.array(), key.offset(), key.length());
+ // output value
+ valueBuf.clear();
+ codec.encode(result, valueBuf);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+
+ context.write(outputKey, outputValue);
+
counter++;
if (counter % BatchConstants.COUNTER_MAX == 0) {
logger.info("Handled " + counter + " records!");
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
deleted file mode 100644
index 4485d17..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- */
-public class MergeCuboidFromStorageJob extends CuboidJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_JOB_FLOW_ID);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- parseOptions(options, args);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
- Configuration conf = this.getConf();
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- System.out.println("Starting: " + jobName);
- job = Job.getInstance(conf, jobName);
-
- setJobClasspath(job);
-
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
-
- // configure mapper input
- IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
- storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
-
- // configure reducer output
- IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
- storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- logger.error("error in MergeCuboidFromHBaseJob", e);
- printUsage(options);
- throw e;
- } finally {
- if (job != null)
- cleanupTempConfFile(job.getConfiguration());
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
deleted file mode 100644
index 18bce34..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyEncoder;
-import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * @author shaoshi
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
-
- private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
-
- private KylinConfig config;
- private String cubeName;
- private String segmentName;
- private CubeManager cubeManager;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment mergedCubeSegment;
- private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
- private IMRStorageInputFormat storageInputFormat;
-
- private ByteArrayWritable outputKey = new ByteArrayWritable();
- private byte[] newKeyBodyBuf;
- private ByteArray newKeyBuf;
- private RowKeySplitter rowKeySplitter;
- private RowKeyEncoderProvider rowKeyEncoderProvider;
-
- private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
-
- private List<MeasureDesc> measureDescs;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private MeasureCodec codec;
- private ByteArrayWritable outputValue = new ByteArrayWritable();
-
- private List<Pair<Integer, MeasureIngester>> dictMeasures;
- private Map<TblColRef, Dictionary<String>> oldDicts;
- private Map<TblColRef, Dictionary<String>> newDicts;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.bindCurrentConfiguration(context.getConfiguration());
- config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
- cubeManager = CubeManager.getInstance(config);
- cube = cubeManager.getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
-
- newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; // size will auto-grow
- newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
-
- sourceCubeSegment = storageInputFormat.findSourceSegment(context);
- logger.info("Source cube segment: " + sourceCubeSegment);
-
- rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
- rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
-
- measureDescs = cubeDesc.getMeasures();
- codec = new MeasureCodec(measureDescs);
-
- dictMeasures = Lists.newArrayList();
- for (int i = 0; i < measureDescs.size(); i++) {
- MeasureDesc measureDesc = measureDescs.get(i);
- MeasureType measureType = measureDesc.getFunction().getMeasureType();
- if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
- dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
- }
- }
- if (dictMeasures.size() > 0) {
- oldDicts = sourceCubeSegment.buildDictionaryMap();
- newDicts = mergedCubeSegment.buildDictionaryMap();
- }
- }
-
- @Override
- public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
- Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
- ByteArrayWritable key = pair.getFirst();
- Object[] value = pair.getSecond();
-
- Preconditions.checkState(key.offset() == 0);
-
- long cuboidID = rowKeySplitter.split(key.array());
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
- RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
-
- SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
- int bufOffset = 0;
- int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
-
- for (int i = 0; i < cuboid.getColumns().size(); ++i) {
- int useSplit = i + bodySplitOffset;
- TblColRef col = cuboid.getColumns().get(i);
-
- if (this.checkNeedMerging(col)) {
- // if dictionary on fact table column, needs rewrite
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
- Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
- while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfId() > newKeyBodyBuf.length - bufOffset) {
- //also use this buf to hold value before translating
- byte[] oldBuf = newKeyBodyBuf;
- newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
- }
-
- int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
- int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-
- int idInMergedDict;
- if (size < 0) {
- idInMergedDict = mergedDict.nullId();
- } else {
- idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
- }
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-
- bufOffset += mergedDict.getSizeOfId();
- } else {
- // keep as it is
- while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBodyBuf;
- newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
- }
-
- System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
- bufOffset += splittedByteses[useSplit].length;
- }
- }
-
- int fullKeySize = rowkeyEncoder.getBytesLength();
- while (newKeyBuf.array().length < fullKeySize) {
- newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
- }
- newKeyBuf.set(0, fullKeySize);
-
- rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
- outputKey.set(newKeyBuf.array(), 0, fullKeySize);
-
- // re-encode measures if dictionary is used
- if (dictMeasures.size() > 0) {
- reEncodeMeasure(value);
- }
-
- valueBuf.clear();
- codec.encode(value, valueBuf);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
- }
-
- private Boolean checkNeedMerging(TblColRef col) throws IOException {
- Boolean ret = dimensionsNeedDict.get(col);
- if (ret != null)
- return ret;
- else {
- ret = cubeDesc.getRowkey().isUseDictionary(col);
- if (ret) {
- String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().isUseDictionary(col), col).getTable();
- ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
- }
- dimensionsNeedDict.put(col, ret);
- return ret;
- }
- }
-
- private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
- for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
- int i = pair.getFirst();
- MeasureIngester ingester = pair.getSecond();
- measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index be81c2b..59a19d3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -56,6 +56,7 @@ import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.*;
import org.apache.kylin.cube.util.CubingUtils;
import org.apache.kylin.dict.*;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
import org.apache.kylin.engine.spark.util.IteratorUtils;
@@ -453,7 +454,8 @@ public class SparkCubing extends AbstractApplication {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- final Map<Long, Long> cubeSizeMap = CreateHTableJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100);
+ final Map<Long, Long> rowCountMap = InMemCuboidJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100);
+ final Map<Long, Double> cubeSizeMap = InMemCuboidJob.getCubeSizeMapFromRowCount(cubeSegment, rowCountMap);
System.out.println("cube size estimation:" + cubeSizeMap);
final byte[][] splitKeys = CreateHTableJob.getSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment);
CubeHTableUtil.createHTable(cubeSegment, splitKeys);
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 4fac4fc..85c9200 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -18,26 +18,19 @@
package org.apache.kylin.storage.hbase.steps;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
import org.apache.commons.cli.Options;
import org.apache.commons.math3.primes.Primes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -45,31 +38,22 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CuboidShardUtil;
-import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -110,7 +94,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
try {
byte[][] splitKeys;
if (statsEnabled) {
- final Map<Long, Long> cuboidSizeMap = getCubeRowCountMapFromCuboidStatistics(cubeSegment, kylinConfig, conf);
+ final Map<Long, Double> cuboidSizeMap = InMemCuboidJob.getCubeSizeMapFromCuboidStatistics(cubeSegment, kylinConfig, conf);
splitKeys = getSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment);
} else {
splitKeys = getSplits(conf, partitionFilePath);
@@ -160,50 +144,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
return retValue.length == 0 ? null : retValue;
}
- public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException {
- ResourceStore rs = ResourceStore.getStore(kylinConfig);
- String fileKey = cubeSegment.getStatisticsResourcePath();
- InputStream is = rs.getResource(fileKey).inputStream;
- File tempFile = null;
- FileOutputStream tempFileStream = null;
- try {
- tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
- tempFileStream = new FileOutputStream(tempFile);
- org.apache.commons.io.IOUtils.copy(is, tempFileStream);
- } finally {
- IOUtils.closeStream(is);
- IOUtils.closeStream(tempFileStream);
- }
- Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
-
- FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
- int samplingPercentage = 25;
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
- LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- if (key.get() == 0L) {
- samplingPercentage = Bytes.toInt(value.getBytes());
- } else {
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
- ByteArray byteArray = new ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
- counterMap.put(key.get(), hll);
- }
-
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- } finally {
- IOUtils.closeStream(reader);
- tempFile.delete();
- }
- return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage);
- }
-
//one region for one shard
private static byte[][] getSplitsByRegionCount(int regionCount) {
byte[][] result = new byte[regionCount - 1][];
@@ -215,51 +155,24 @@ public class CreateHTableJob extends AbstractHadoopJob {
return result;
}
- public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException {
- Preconditions.checkArgument(samplingPercentage > 0);
- return Maps.transformValues(counterMap, new Function<HyperLogLogPlusCounter, Long>() {
- @Nullable
- @Override
- public Long apply(HyperLogLogPlusCounter input) {
- return input.getCountEstimate() * 100 / samplingPercentage;
- }
- });
- }
-
- public static byte[][] getSplitsFromCuboidStatistics(final Map<Long, Long> cubeRowCountMap, KylinConfig kylinConfig, CubeSegment cubeSegment) throws IOException {
+ public static byte[][] getSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, KylinConfig kylinConfig, CubeSegment cubeSegment) throws IOException {
final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
- final List<Integer> rowkeyColumnSize = Lists.newArrayList();
- final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- List<TblColRef> columnList = baseCuboid.getColumns();
-
- for (int i = 0; i < columnList.size(); i++) {
- logger.info("Rowkey column " + i + " length " + cubeSegment.getColumnLength(columnList.get(i)));
- rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i)));
- }
-
DataModelDesc.RealizationCapacity cubeCapacity = cubeDesc.getModel().getCapacity();
int cut = kylinConfig.getHBaseRegionCut(cubeCapacity.toString());
logger.info("Cube capacity " + cubeCapacity.toString() + ", chosen cut for HTable is " + cut + "GB");
double totalSizeInM = 0;
-
- List<Long> allCuboids = Lists.newArrayList();
- allCuboids.addAll(cubeRowCountMap.keySet());
- Collections.sort(allCuboids);
-
- Map<Long, Double> cubeSizeMap = Maps.newHashMap();
- for (Map.Entry<Long, Long> entry : cubeRowCountMap.entrySet()) {
- cubeSizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize));
- }
-
for (Double cuboidSize : cubeSizeMap.values()) {
totalSizeInM += cuboidSize;
}
+ List<Long> allCuboids = Lists.newArrayList();
+ allCuboids.addAll(cubeSizeMap.keySet());
+ Collections.sort(allCuboids);
+
int nRegion = Math.round((float) (totalSizeInM / (cut * 1024L)));
nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion);
nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);
@@ -350,48 +263,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
}
- /**
- * Estimate the cuboid's size
- *
- * @return the cuboid size in M bytes
- */
- private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
-
- int bytesLength = cubeSegment.getRowKeyPreambleSize();
-
- long mask = Long.highestOneBit(baseCuboidId);
- long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
- for (int i = 0; i < parentCuboidIdActualLength; i++) {
- if ((mask & cuboidId) > 0) {
- bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i));
- }
- mask = mask >> 1;
- }
-
- // add the measure length
- int space = 0;
- boolean isMemoryHungry = false;
- for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
- if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) {
- isMemoryHungry = true;
- }
- DataType returnType = measureDesc.getFunction().getReturnDataType();
- space += returnType.getStorageBytesEstimate();
- }
- bytesLength += space;
-
- double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L);
- if (isMemoryHungry) {
- logger.info("Cube is memory hungry, storage size estimation multiply 0.05");
- ret *= 0.05;
- } else {
- logger.info("Cube is not memory hungry, storage size estimation multiply 0.25");
- ret *= 0.25;
- }
- logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M.");
- return ret;
- }
-
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new CreateHTableJob(), args);
System.exit(exitCode);
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
deleted file mode 100644
index 397f4fe..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.Lists;
-
-public class HBaseMROutput2 implements IMROutput2 {
-
- @Override
- public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
- return new IMRBatchCubingOutputSide2() {
- HBaseMRSteps steps = new HBaseMRSteps(seg);
-
- @Override
- public IMRStorageOutputFormat getStorageOutputFormat() {
- return new HBaseOutputFormat(seg);
- }
-
- @Override
- public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
- }
-
- @Override
- public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
- }
-
- @Override
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- // nothing to do
- }
- };
- }
-
- @Override
- public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) {
- return new IMRBatchMergeInputSide2() {
- @Override
- public IMRStorageInputFormat getStorageInputFormat() {
- return new HBaseInputFormat(seg);
- }
- };
- }
-
- @Override
- public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
- return new IMRBatchMergeOutputSide2() {
- HBaseMRSteps steps = new HBaseMRSteps(seg);
-
- @Override
- public IMRStorageOutputFormat getStorageOutputFormat() {
- return new HBaseOutputFormat(seg);
- }
-
- @Override
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
- }
-
- @Override
- public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
- }
-
- @Override
- public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createMergeGCStep());
- }
- };
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private static class HBaseInputFormat implements IMRStorageInputFormat {
- final CubeSegment seg;
-
- final RowValueDecoder[] rowValueDecoders;
- final ByteArrayWritable parsedKey;
- final Object[] parsedValue;
- final Pair<ByteArrayWritable, Object[]> parsedPair;
-
- public HBaseInputFormat(CubeSegment seg) {
- this.seg = seg;
-
- List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- valueDecoderList.add(new RowValueDecoder(colDesc));
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
- }
- }
- this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
-
- this.parsedKey = new ByteArrayWritable();
- this.parsedValue = new Object[measuresDescs.size()];
- this.parsedPair = Pair.newPair(parsedKey, parsedValue);
- }
-
- @Override
- public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
- Configuration conf = job.getConfiguration();
- HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
- List<Scan> scans = new ArrayList<Scan>();
- for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
- Scan scan = new Scan();
- scan.setCaching(512); // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false); // don't set to true for MR jobs
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
- scans.add(scan);
- }
-
- TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
- TableMapReduceUtil.initCredentials(job);
-
- }
-
- @Override
- public CubeSegment findSourceSegment(Context context) throws IOException {
- TableSplit currentSplit = (TableSplit) context.getInputSplit();
- byte[] tableName = currentSplit.getTableName();
- String htableName = Bytes.toString(tableName);
-
- // decide which source segment
- for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
- String segmentHtable = segment.getStorageLocationIdentifier();
- if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
- return segment;
- }
- }
- throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
- }
-
- @Override
- public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
- ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
- parsedKey.set(key.get(), key.getOffset(), key.getLength());
-
- Result hbaseRow = (Result) inValue;
- for (int i = 0; i < rowValueDecoders.length; i++) {
- rowValueDecoders[i].decode(hbaseRow);
- rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
- }
-
- return parsedPair;
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private static class HBaseOutputFormat implements IMRStorageOutputFormat {
- final CubeSegment seg;
-
- final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
- final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-
- public HBaseOutputFormat(CubeSegment seg) {
- this.seg = seg;
- }
-
- @Override
- public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
- Path hfilePath = new Path(new HBaseMRSteps(seg).getHFilePath(jobFlowId));
- FileOutputFormat.setOutputPath(job, hfilePath);
-
- String htableName = seg.getStorageLocationIdentifier();
- Configuration conf = HBaseConfiguration.create(job.getConfiguration());
- HTable htable = new HTable(conf, htableName);
- HFileOutputFormat.configureIncrementalLoad(job, htable);
-
- // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class
- job.setReducerClass(reducer);
-
- // kylin uses ByteArrayWritable instead of ImmutableBytesWritable as mapper output key
- rewriteTotalOrderPartitionerFile(job);
-
- HadoopUtil.deletePath(job.getConfiguration(), hfilePath);
- }
-
- @Override
- public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
- if (keyValueCreators.size() == 0) {
- for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new KeyValueCreator(seg.getCubeDesc(), colDesc));
- }
- }
- }
-
- outputKey.set(key.array(), key.offset(), key.length());
-
- KeyValue outputValue;
- for (int i = 0; i < keyValueCreators.size(); i++) {
- outputValue = keyValueCreators.get(i).create(key.array(), key.offset(), key.length(), value);
- context.write(outputKey, outputValue);
- }
- }
-
- private void rewriteTotalOrderPartitionerFile(Job job) throws IOException {
- Configuration conf = job.getConfiguration();
- String partitionsFile = TotalOrderPartitioner.getPartitionFile(conf);
- if (StringUtils.isBlank(partitionsFile))
- throw new IllegalStateException("HFileOutputFormat.configureIncrementalLoad don't configure TotalOrderPartitioner any more?");
-
- Path partitionsPath = new Path(partitionsFile);
-
- // read in partition file in ImmutableBytesWritable
- List<ByteArrayWritable> keys = Lists.newArrayList();
- Reader reader = new SequenceFile.Reader(conf, Reader.file(partitionsPath));
- try {
- ImmutableBytesWritable key = new ImmutableBytesWritable();
- while (reader.next(key, NullWritable.get())) {
- keys.add(new ByteArrayWritable(key.copyBytes()));
- }
- } finally {
- reader.close();
- }
-
- // write out again in ByteArrayWritable
- Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath), Writer.keyClass(ByteArrayWritable.class), Writer.valueClass(NullWritable.class));
- try {
- for (ByteArrayWritable key : keys) {
- writer.append(key, NullWritable.get());
- }
- } finally {
- writer.close();
- }
- }
-
- }
-
-}