You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/11 05:34:50 UTC
[3/3] incubator-kylin git commit: KYLIN-1007 Merge from cuboid files
if available, and from HTable if otherwise
KYLIN-1007 Merge from cuboid files if available, and from HTable if otherwise
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/acd8c0df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/acd8c0df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/acd8c0df
Branch: refs/heads/2.x-staging
Commit: acd8c0dfb6a3e7a422b00e943bf0c66d7c9de2b8
Parents: f156d13
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Sep 11 11:27:00 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:07 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/engine/mr/IMROutput2.java | 5 +-
.../engine/mr/common/AbstractHadoopJob.java | 8 +-
.../mr/steps/MergeCuboidFromStorageMapper.java | 2 +-
.../engine/mr/steps/MergeCuboidMapper.java | 62 +++--
.../storage/hbase/steps/HBaseMROutput2.java | 19 +-
.../hbase/steps/HBaseMROutput2Transition.java | 237 ++++++++++++++-----
6 files changed, 230 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index b4aff5d..3ad51c5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -8,7 +8,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -51,10 +50,10 @@ public interface IMROutput2 {
public interface IMRStorageInputFormat {
/** Configure MR mapper class and input file format. */
- public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+ public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException;
/** Given a mapper context, figure out which segment the mapper reads from. */
- public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+ public CubeSegment findSourceSegment(Mapper.Context context) throws IOException;
/**
* Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 50b0cc1..cd9393a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -200,8 +200,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
return classpath;
}
- public void addInputDirs(String input, Job job) throws IOException {
- for (String inp : StringSplitter.split(input, ",")) {
+ public static void addInputDirs(String input, Job job) throws IOException {
+ addInputDirs(StringSplitter.split(input, ","), job);
+ }
+
+ public static void addInputDirs(String[] inputs, Job job) throws IOException {
+ for (String inp : inputs) {
inp = inp.trim();
if (inp.endsWith("/*")) {
inp = inp.substring(0, inp.length() - 2);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index fa575ca..4598673 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -108,7 +108,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
newKeyBuf = new byte[256]; // size will auto-grow
- sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
+ sourceCubeSegment = storageInputFormat.findSourceSegment(context);
logger.info("Source cube segment: " + sourceCubeSegment);
rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index a09ffe1..45f0d32 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -25,7 +25,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
@@ -66,8 +65,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
- private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
private Boolean checkNeedMerging(TblColRef col) throws IOException {
Boolean ret = dictsNeedMerging.get(col);
if (ret != null)
@@ -83,28 +80,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
}
}
- private String extractJobIDFromPath(String path) {
- Matcher matcher = JOB_NAME_PATTERN.matcher(path);
- // check the first occurance
- if (matcher.find()) {
- return matcher.group(1);
- } else {
- throw new IllegalStateException("Can not extract job ID from file path : " + path);
- }
- }
-
- private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String lastBuildJobID = segment.getLastBuildJobID();
- if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
- return segment;
- }
- }
-
- throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-
- }
-
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.bindCurrentConfiguration(context.getConfiguration());
@@ -123,15 +98,38 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
newKeyBuf = new byte[256];// size will auto-grow
// decide which source segment
- InputSplit inputSplit = context.getInputSplit();
- String filePath = ((FileSplit) inputSplit).getPath().toString();
- System.out.println("filePath:" + filePath);
+ FileSplit fileSplit = (FileSplit) context.getInputSplit();
+ sourceCubeSegment = findSourceSegment(fileSplit, cube);
+
+ rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+ }
+
+ private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+
+ public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
+ String filePath = fileSplit.getPath().toString();
String jobID = extractJobIDFromPath(filePath);
- System.out.println("jobID:" + jobID);
- sourceCubeSegment = findSegmentWithUuid(jobID, cube);
- System.out.println(sourceCubeSegment);
+ return findSegmentWithUuid(jobID, cube);
+ }
+
+ private static String extractJobIDFromPath(String path) {
+ Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+ // check the first occurrence
+ if (matcher.find()) {
+ return matcher.group(1);
+ } else {
+ throw new IllegalStateException("Can not extract job ID from file path : " + path);
+ }
+ }
- this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+ private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ String lastBuildJobID = segment.getLastBuildJobID();
+ if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
+ return segment;
+ }
+ }
+ throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 1e414be..b468009 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
@@ -129,16 +128,16 @@ public class HBaseMROutput2 implements IMROutput2 {
@SuppressWarnings({ "rawtypes", "unchecked" })
private static class HBaseInputFormat implements IMRStorageInputFormat {
- final Iterable<String> htables;
-
+ final CubeSegment seg;
+
final RowValueDecoder[] rowValueDecoders;
final ByteArrayWritable parsedKey;
final Object[] parsedValue;
final Pair<ByteArrayWritable, Object[]> parsedPair;
public HBaseInputFormat(CubeSegment seg) {
- this.htables = new HBaseMRSteps(seg).getMergingHTables();
-
+ this.seg = seg;
+
List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
List<MeasureDesc> measuresDescs = Lists.newArrayList();
for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
@@ -157,29 +156,29 @@ public class HBaseMROutput2 implements IMROutput2 {
}
@Override
- public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
+ public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
List<Scan> scans = new ArrayList<Scan>();
- for (String htable : htables) {
+ for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
scans.add(scan);
}
- TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+ TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
}
@Override
- public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
+ public CubeSegment findSourceSegment(Context context) throws IOException {
TableSplit currentSplit = (TableSplit) context.getInputSplit();
byte[] tableName = currentSplit.getTableName();
String htableName = Bytes.toString(tableName);
// decide which source segment
- for (CubeSegment segment : cubeInstance.getSegments()) {
+ for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
String segmentHtable = segment.getStorageLocationIdentifier();
if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
return segment;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 047315b..237f0c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
@@ -40,10 +41,11 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -51,24 +53,30 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidMapper;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
- * This "Transition" MR output generates cuboid files and then convert to HFile.
- * The additional step slows down build process slightly, but the gains is merge
+ * This "Transition" impl generates cuboid files and then convert to HFile.
+ * The additional step slows down build process, but the gains is merge
* can read from HDFS instead of over HBase region server. See KYLIN-1007.
*
* This is transitional because finally we want to merge from HTable snapshot.
- * However MR input with multiple snapshots is only supported by HBase 1.x.
+ * However multiple snapshots as MR input is only supported by HBase 1.x.
* Before most users upgrade to latest HBase, they can only use this transitional
* cuboid file solution.
*/
public class HBaseMROutput2Transition implements IMROutput2 {
+ private static final Logger logger = LoggerFactory.getLogger(HBaseMROutput2Transition.class);
+
@Override
public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
return new IMRBatchCubingOutputSide2() {
@@ -87,7 +95,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-
+
jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
@@ -127,7 +135,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-
+
jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
@@ -141,67 +149,139 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@SuppressWarnings({ "rawtypes", "unchecked" })
private static class HBaseInputFormat implements IMRStorageInputFormat {
- final Iterable<String> htables;
-
- final RowValueDecoder[] rowValueDecoders;
- final ByteArrayWritable parsedKey;
- final Object[] parsedValue;
- final Pair<ByteArrayWritable, Object[]> parsedPair;
+ final CubeSegment seg;
public HBaseInputFormat(CubeSegment seg) {
- this.htables = new HBaseMRSteps(seg).getMergingHTables();
-
- List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- valueDecoderList.add(new RowValueDecoder(colDesc));
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
+ this.seg = seg;
+ }
+
+ @Override
+ public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
+ // merge by cuboid files
+ if (isMergeFromCuboidFiles(job.getConfiguration())) {
+ logger.info("Merge from cuboid files for " + seg);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ addCuboidInputDirs(job);
+
+ job.setMapperClass(mapperClz);
+ job.setMapOutputKeyClass(outputKeyClz);
+ job.setMapOutputValueClass(outputValueClz);
+ }
+ // merge from HTable scan
+ else {
+ logger.info("Merge from HTables for " + seg);
+
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+ List<Scan> scans = new ArrayList<Scan>();
+ for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
+ Scan scan = new Scan();
+ scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+ scan.setCacheBlocks(false); // don't set to true for MR jobs
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+ scans.add(scan);
}
+ TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
}
- this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
-
- this.parsedKey = new ByteArrayWritable();
- this.parsedValue = new Object[measuresDescs.size()];
- this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
}
- @Override
- public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
- Configuration conf = job.getConfiguration();
- HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
- List<Scan> scans = new ArrayList<Scan>();
- for (String htable : htables) {
- Scan scan = new Scan();
- scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false); // don't set to true for MR jobs
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
- scans.add(scan);
+ private void addCuboidInputDirs(Job job) throws IOException {
+ List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ String[] inputs = new String[mergingSegments.size()];
+ for (int i = 0; i < mergingSegments.size(); i++) {
+ CubeSegment mergingSeg = mergingSegments.get(i);
+ String cuboidPath = steps.getCuboidRootPath(mergingSeg);
+ inputs[i] = cuboidPath + (cuboidPath.endsWith("/") ? "" : "/") + "*";
}
- TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+
+ AbstractHadoopJob.addInputDirs(inputs, job);
}
@Override
- public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
- TableSplit currentSplit = (TableSplit) context.getInputSplit();
- byte[] tableName = currentSplit.getTableName();
- String htableName = Bytes.toString(tableName);
-
- // decide which source segment
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String segmentHtable = segment.getStorageLocationIdentifier();
- if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
- return segment;
+ public CubeSegment findSourceSegment(Context context) throws IOException {
+ // merge by cuboid files
+ if (isMergeFromCuboidFiles(context.getConfiguration())) {
+ FileSplit fileSplit = (FileSplit) context.getInputSplit();
+ return MergeCuboidMapper.findSourceSegment(fileSplit, seg.getCubeInstance());
+ }
+ // merge from HTable scan
+ else {
+ TableSplit currentSplit = (TableSplit) context.getInputSplit();
+ byte[] tableName = currentSplit.getTableName();
+ String htableName = Bytes.toString(tableName);
+
+ // decide which source segment
+ for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
+ String segmentHtable = segment.getStorageLocationIdentifier();
+ if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
+ return segment;
+ }
}
+ throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
}
- throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
}
+ transient ByteArrayWritable parsedKey;
+ transient Object[] parsedValue;
+ transient Pair<ByteArrayWritable, Object[]> parsedPair;
+
+ transient MeasureCodec measureCodec;
+ transient RowValueDecoder[] rowValueDecoders;
+
@Override
public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
+ // lazy init
+ if (parsedPair == null) {
+ parsedKey = new ByteArrayWritable();
+ parsedValue = new Object[seg.getCubeDesc().getMeasures().size()];
+ parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+ }
+
+ // merge by cuboid files
+ if (isMergeFromCuboidFiles(null)) {
+ return parseMapperInputFromCuboidFile(inKey, inValue);
+ }
+ // merge from HTable scan
+ else {
+ return parseMapperInputFromHTable(inKey, inValue);
+ }
+ }
+
+ private Pair<ByteArrayWritable, Object[]> parseMapperInputFromCuboidFile(Object inKey, Object inValue) {
+ // lazy init
+ if (measureCodec == null) {
+ measureCodec = new MeasureCodec(seg.getCubeDesc().getMeasures());
+ }
+
+ Text key = (Text) inKey;
+ parsedKey.set(key.getBytes(), 0, key.getLength());
+
+ Text value = (Text) inValue;
+ measureCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), parsedValue);
+
+ return parsedPair;
+ }
+
+ private Pair<ByteArrayWritable, Object[]> parseMapperInputFromHTable(Object inKey, Object inValue) {
+ // lazy init
+ if (rowValueDecoders == null) {
+ List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ valueDecoderList.add(new RowValueDecoder(colDesc));
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
+ rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+ }
+
ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
parsedKey.set(key.get(), key.getOffset(), key.getLength());
@@ -213,6 +293,53 @@ public class HBaseMROutput2Transition implements IMROutput2 {
return parsedPair;
}
+
+ transient Boolean isMergeFromCuboidFiles;
+
+ // merge from cuboid files is better than merge from HTable, because no workload on HBase region server
+ private boolean isMergeFromCuboidFiles(Configuration jobConf) {
+ // cache in this object?
+ if (isMergeFromCuboidFiles != null)
+ return isMergeFromCuboidFiles.booleanValue();
+
+ final String confKey = "kylin.isMergeFromCuboidFiles";
+
+ // cache in job configuration?
+ if (jobConf != null) {
+ String result = jobConf.get(confKey);
+ if (result != null) {
+ isMergeFromCuboidFiles = Boolean.valueOf(result);
+ return isMergeFromCuboidFiles.booleanValue();
+ }
+ }
+
+ boolean result = true;
+
+ try {
+ List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+ for (CubeSegment mergingSeg : mergingSegments) {
+ String cuboidRootPath = steps.getCuboidRootPath(mergingSeg);
+ FileSystem fs = HadoopUtil.getFileSystem(cuboidRootPath);
+
+ boolean cuboidFileExist = fs.exists(new Path(cuboidRootPath));
+ if (cuboidFileExist == false) {
+ logger.info("Merge from HTable because " + cuboidRootPath + " does not exist");
+ result = false;
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // put in cache
+ isMergeFromCuboidFiles = Boolean.valueOf(result);
+ if (jobConf != null) {
+ jobConf.set(confKey, "" + result);
+ }
+ return result;
+ }
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -236,10 +363,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
-
+
Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
FileOutputFormat.setOutputPath(job, output);
-
+
HadoopUtil.deletePath(job.getConfiguration(), output);
}
@@ -254,11 +381,11 @@ public class HBaseMROutput2Transition implements IMROutput2 {
}
outputKey.set(key.array(), key.offset(), key.length());
-
+
valueBuf.clear();
codec.encode(value, valueBuf);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
+
context.write(outputKey, outputValue);
}
}