You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/21 09:47:45 UTC
incubator-kylin git commit: KYLIN-878 fix ci
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 22dc57344 -> 7823dd055
KYLIN-878 fix ci
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7823dd05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7823dd05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7823dd05
Branch: refs/heads/0.8
Commit: 7823dd055fc6f9467342bddcb256541c55996c39
Parents: 22dc573
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Jul 21 15:47:25 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Jul 21 15:47:25 2015 +0800
----------------------------------------------------------------------
.../kylin/engine/mr/ByteArrayWritable.java | 2 +-
.../kylin/engine/mr/steps/InMemCuboidJob.java | 3 +-
.../kylin/storage/hbase/HBaseMROutput2.java | 56 +++++++++++++++++---
3 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7823dd05/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
index 88bb68c..37a8841 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
@@ -141,7 +141,7 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable>
return sb.length() > 0 ? sb.substring(1) : "";
}
- /** A Comparator optimized for ImmutableBytesWritable.
+ /** A Comparator optimized for ByteArrayWritable.
*/
public static class Comparator extends WritableComparator {
private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7823dd05/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 13175ec..aae5d89 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -31,7 +31,6 @@ import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,7 +86,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
// set output
IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-
+
return waitForCompletion(job);
} catch (Exception e) {
logger.error("error in CuboidJob", e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7823dd05/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
index 1209bc7..cd156b0 100644
--- a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
@@ -23,7 +23,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
@@ -36,6 +40,10 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -44,6 +52,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
@@ -173,7 +182,7 @@ public class HBaseMROutput2 implements IMROutput2 {
TableSplit currentSplit = (TableSplit) context.getInputSplit();
byte[] tableName = currentSplit.getTableName();
String htableName = Bytes.toString(tableName);
-
+
// decide which source segment
for (CubeSegment segment : cubeInstance.getSegments()) {
String segmentHtable = segment.getStorageLocationIdentifier();
@@ -206,7 +215,7 @@ public class HBaseMROutput2 implements IMROutput2 {
@SuppressWarnings({ "rawtypes", "unchecked" })
private static class HBaseOutputFormat implements IMRStorageOutputFormat {
final CubeSegment seg;
-
+
final List<InMemKeyValueCreator> keyValueCreators = Lists.newArrayList();
final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
@@ -215,7 +224,7 @@ public class HBaseMROutput2 implements IMROutput2 {
}
@Override
- public void configureOutput(Class<? extends Reducer> cls, String jobFlowId, Job job) throws IOException {
+ public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
Path hfilePath = new Path(new HBaseMRSteps(seg).getHFilePath(jobFlowId));
FileOutputFormat.setOutputPath(job, hfilePath);
@@ -225,10 +234,10 @@ public class HBaseMROutput2 implements IMROutput2 {
HFileOutputFormat.configureIncrementalLoad(job, htable);
// set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class
- job.setReducerClass(cls);
+ job.setReducerClass(reducer);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
+ // kylin uses ByteArrayWritable instead of ImmutableBytesWritable as mapper output key
+ rewriteTotalOrderPartitionerFile(job);
HadoopUtil.deletePath(job.getConfiguration(), hfilePath);
}
@@ -244,9 +253,9 @@ public class HBaseMROutput2 implements IMROutput2 {
}
}
}
-
+
outputKey.set(key.array(), key.offset(), key.length());
-
+
KeyValue outputValue;
for (int i = 0; i < keyValueCreators.size(); i++) {
outputValue = keyValueCreators.get(i).create(key.array(), key.offset(), key.length(), value);
@@ -254,6 +263,37 @@ public class HBaseMROutput2 implements IMROutput2 {
}
}
+ private void rewriteTotalOrderPartitionerFile(Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ String partitionsFile = TotalOrderPartitioner.getPartitionFile(conf);
+ if (StringUtils.isBlank(partitionsFile))
+ throw new IllegalStateException("HFileOutputFormat.configureIncrementalLoad don't configure TotalOrderPartitioner any more?");
+
+ Path partitionsPath = new Path(partitionsFile);
+
+ // read in partition file in ImmutableBytesWritable
+ List<ByteArrayWritable> keys = Lists.newArrayList();
+ Reader reader = new SequenceFile.Reader(conf, Reader.file(partitionsPath));
+ try {
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+ while (reader.next(key, NullWritable.get())) {
+ keys.add(new ByteArrayWritable(key.copyBytes()));
+ }
+ } finally {
+ reader.close();
+ }
+
+ // write out again in ByteArrayWritable
+ Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath), Writer.keyClass(ByteArrayWritable.class), Writer.valueClass(NullWritable.class));
+ try {
+ for (ByteArrayWritable key : keys) {
+ writer.append(key, NullWritable.get());
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
}
}