You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/21 09:47:45 UTC

incubator-kylin git commit: KYLIN-878 fix ci

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 22dc57344 -> 7823dd055


KYLIN-878 fix ci


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7823dd05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7823dd05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7823dd05

Branch: refs/heads/0.8
Commit: 7823dd055fc6f9467342bddcb256541c55996c39
Parents: 22dc573
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Jul 21 15:47:25 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Jul 21 15:47:25 2015 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/ByteArrayWritable.java      |  2 +-
 .../kylin/engine/mr/steps/InMemCuboidJob.java   |  3 +-
 .../kylin/storage/hbase/HBaseMROutput2.java     | 56 +++++++++++++++++---
 3 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7823dd05/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
index 88bb68c..37a8841 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
@@ -141,7 +141,7 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable>
         return sb.length() > 0 ? sb.substring(1) : "";
     }
 
-    /** A Comparator optimized for ImmutableBytesWritable.
+    /** A Comparator optimized for ByteArrayWritable.
      */
     public static class Comparator extends WritableComparator {
         private BytesWritable.Comparator comparator = new BytesWritable.Comparator();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7823dd05/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 13175ec..aae5d89 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -31,7 +31,6 @@ import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,7 +86,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             // set output
             IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
             storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-
+            
             return waitForCompletion(job);
         } catch (Exception e) {
             logger.error("error in CuboidJob", e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7823dd05/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
index 1209bc7..cd156b0 100644
--- a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
@@ -23,7 +23,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
@@ -36,6 +40,10 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -44,6 +52,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
@@ -173,7 +182,7 @@ public class HBaseMROutput2 implements IMROutput2 {
             TableSplit currentSplit = (TableSplit) context.getInputSplit();
             byte[] tableName = currentSplit.getTableName();
             String htableName = Bytes.toString(tableName);
-            
+
             // decide which source segment
             for (CubeSegment segment : cubeInstance.getSegments()) {
                 String segmentHtable = segment.getStorageLocationIdentifier();
@@ -206,7 +215,7 @@ public class HBaseMROutput2 implements IMROutput2 {
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private static class HBaseOutputFormat implements IMRStorageOutputFormat {
         final CubeSegment seg;
-        
+
         final List<InMemKeyValueCreator> keyValueCreators = Lists.newArrayList();
         final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
 
@@ -215,7 +224,7 @@ public class HBaseMROutput2 implements IMROutput2 {
         }
 
         @Override
-        public void configureOutput(Class<? extends Reducer> cls, String jobFlowId, Job job) throws IOException {
+        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
             Path hfilePath = new Path(new HBaseMRSteps(seg).getHFilePath(jobFlowId));
             FileOutputFormat.setOutputPath(job, hfilePath);
 
@@ -225,10 +234,10 @@ public class HBaseMROutput2 implements IMROutput2 {
             HFileOutputFormat.configureIncrementalLoad(job, htable);
 
             // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class
-            job.setReducerClass(cls);
+            job.setReducerClass(reducer);
 
-            job.setOutputKeyClass(ImmutableBytesWritable.class);
-            job.setOutputValueClass(KeyValue.class);
+            // kylin uses ByteArrayWritable instead of ImmutableBytesWritable as mapper output key
+            rewriteTotalOrderPartitionerFile(job);
 
             HadoopUtil.deletePath(job.getConfiguration(), hfilePath);
         }
@@ -244,9 +253,9 @@ public class HBaseMROutput2 implements IMROutput2 {
                     }
                 }
             }
-            
+
             outputKey.set(key.array(), key.offset(), key.length());
-            
+
             KeyValue outputValue;
             for (int i = 0; i < keyValueCreators.size(); i++) {
                 outputValue = keyValueCreators.get(i).create(key.array(), key.offset(), key.length(), value);
@@ -254,6 +263,37 @@ public class HBaseMROutput2 implements IMROutput2 {
             }
         }
 
+        private void rewriteTotalOrderPartitionerFile(Job job) throws IOException {
+            Configuration conf = job.getConfiguration();
+            String partitionsFile = TotalOrderPartitioner.getPartitionFile(conf);
+            if (StringUtils.isBlank(partitionsFile))
+                throw new IllegalStateException("HFileOutputFormat.configureIncrementalLoad don't configure TotalOrderPartitioner any more?");
+
+            Path partitionsPath = new Path(partitionsFile);
+
+            // read in partition file in ImmutableBytesWritable
+            List<ByteArrayWritable> keys = Lists.newArrayList();
+            Reader reader = new SequenceFile.Reader(conf, Reader.file(partitionsPath));
+            try {
+                ImmutableBytesWritable key = new ImmutableBytesWritable();
+                while (reader.next(key, NullWritable.get())) {
+                    keys.add(new ByteArrayWritable(key.copyBytes()));
+                }
+            } finally {
+                reader.close();
+            }
+
+            // write out again in ByteArrayWritable
+            Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath), Writer.keyClass(ByteArrayWritable.class), Writer.valueClass(NullWritable.class));
+            try {
+                for (ByteArrayWritable key : keys) {
+                    writer.append(key, NullWritable.get());
+                }
+            } finally {
+                writer.close();
+            }
+        }
+
     }
 
 }