You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/11 05:34:50 UTC

[3/3] incubator-kylin git commit: KYLIN-1007 Merge from cuboid files if available, and from HTable if otherwise

KYLIN-1007 Merge from cuboid files if available, and from HTable if otherwise


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/acd8c0df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/acd8c0df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/acd8c0df

Branch: refs/heads/2.x-staging
Commit: acd8c0dfb6a3e7a422b00e943bf0c66d7c9de2b8
Parents: f156d13
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Sep 11 11:27:00 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:07 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/IMROutput2.java  |   5 +-
 .../engine/mr/common/AbstractHadoopJob.java     |   8 +-
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |  62 +++--
 .../storage/hbase/steps/HBaseMROutput2.java     |  19 +-
 .../hbase/steps/HBaseMROutput2Transition.java   | 237 ++++++++++++++-----
 6 files changed, 230 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index b4aff5d..3ad51c5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -8,7 +8,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
@@ -51,10 +50,10 @@ public interface IMROutput2 {
     public interface IMRStorageInputFormat {
 
         /** Configure MR mapper class and input file format. */
-        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException;
 
         /** Given a mapper context, figure out which segment the mapper reads from. */
-        public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+        public CubeSegment findSourceSegment(Mapper.Context context) throws IOException;
 
         /**
          * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 50b0cc1..cd9393a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -200,8 +200,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         return classpath;
     }
 
-    public void addInputDirs(String input, Job job) throws IOException {
-        for (String inp : StringSplitter.split(input, ",")) {
+    public static void addInputDirs(String input, Job job) throws IOException {
+        addInputDirs(StringSplitter.split(input, ","), job);
+    }
+    
+    public static void addInputDirs(String[] inputs, Job job) throws IOException {
+        for (String inp : inputs) {
             inp = inp.trim();
             if (inp.endsWith("/*")) {
                 inp = inp.substring(0, inp.length() - 2);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index fa575ca..4598673 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -108,7 +108,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
 
         newKeyBuf = new byte[256]; // size will auto-grow
 
-        sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
+        sourceCubeSegment = storageInputFormat.findSourceSegment(context);
         logger.info("Source cube segment: " + sourceCubeSegment);
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index a09ffe1..45f0d32 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -25,7 +25,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesUtil;
@@ -66,8 +65,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
     private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
 
-    private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
     private Boolean checkNeedMerging(TblColRef col) throws IOException {
         Boolean ret = dictsNeedMerging.get(col);
         if (ret != null)
@@ -83,28 +80,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         }
     }
 
-    private String extractJobIDFromPath(String path) {
-        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
-        // check the first occurance
-        if (matcher.find()) {
-            return matcher.group(1);
-        } else {
-            throw new IllegalStateException("Can not extract job ID from file path : " + path);
-        }
-    }
-
-    private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
-        for (CubeSegment segment : cubeInstance.getSegments()) {
-            String lastBuildJobID = segment.getLastBuildJobID();
-            if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
-                return segment;
-            }
-        }
-
-        throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-
-    }
-
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
         super.bindCurrentConfiguration(context.getConfiguration());
@@ -123,15 +98,38 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         newKeyBuf = new byte[256];// size will auto-grow
 
         // decide which source segment
-        InputSplit inputSplit = context.getInputSplit();
-        String filePath = ((FileSplit) inputSplit).getPath().toString();
-        System.out.println("filePath:" + filePath);
+        FileSplit fileSplit = (FileSplit) context.getInputSplit();
+        sourceCubeSegment = findSourceSegment(fileSplit, cube);
+
+        rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+    }
+    
+    private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+
+    public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
+        String filePath = fileSplit.getPath().toString();
         String jobID = extractJobIDFromPath(filePath);
-        System.out.println("jobID:" + jobID);
-        sourceCubeSegment = findSegmentWithUuid(jobID, cube);
-        System.out.println(sourceCubeSegment);
+        return findSegmentWithUuid(jobID, cube);
+    }
+    
+    private static String extractJobIDFromPath(String path) {
+        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+        // check the first occurrence
+        if (matcher.find()) {
+            return matcher.group(1);
+        } else {
+            throw new IllegalStateException("Can not extract job ID from file path : " + path);
+        }
+    }
 
-        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+    private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
+        for (CubeSegment segment : cubeInstance.getSegments()) {
+            String lastBuildJobID = segment.getLastBuildJobID();
+            if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
+                return segment;
+            }
+        }
+        throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 1e414be..b468009 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
@@ -129,16 +128,16 @@ public class HBaseMROutput2 implements IMROutput2 {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private static class HBaseInputFormat implements IMRStorageInputFormat {
-        final Iterable<String> htables;
-
+        final CubeSegment seg;
+        
         final RowValueDecoder[] rowValueDecoders;
         final ByteArrayWritable parsedKey;
         final Object[] parsedValue;
         final Pair<ByteArrayWritable, Object[]> parsedPair;
 
         public HBaseInputFormat(CubeSegment seg) {
-            this.htables = new HBaseMRSteps(seg).getMergingHTables();
-
+            this.seg = seg;
+            
             List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
             List<MeasureDesc> measuresDescs = Lists.newArrayList();
             for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
@@ -157,29 +156,29 @@ public class HBaseMROutput2 implements IMROutput2 {
         }
 
         @Override
-        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
+        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
             Configuration conf = job.getConfiguration();
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
 
             List<Scan> scans = new ArrayList<Scan>();
-            for (String htable : htables) {
+            for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
                 Scan scan = new Scan();
                 scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
                 scan.setCacheBlocks(false); // don't set to true for MR jobs
                 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
                 scans.add(scan);
             }
-            TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+            TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
         }
 
         @Override
-        public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
+        public CubeSegment findSourceSegment(Context context) throws IOException {
             TableSplit currentSplit = (TableSplit) context.getInputSplit();
             byte[] tableName = currentSplit.getTableName();
             String htableName = Bytes.toString(tableName);
 
             // decide which source segment
-            for (CubeSegment segment : cubeInstance.getSegments()) {
+            for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
                 String segmentHtable = segment.getStorageLocationIdentifier();
                 if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
                     return segment;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 047315b..237f0c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
@@ -40,10 +41,11 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -51,24 +53,30 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidMapper;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
 /**
- * This "Transition" MR output generates cuboid files and then convert to HFile.
- * The additional step slows down build process slightly, but the gains is merge
+ * This "Transition" impl generates cuboid files and then convert to HFile.
+ * The additional step slows down build process, but the gains is merge
  * can read from HDFS instead of over HBase region server. See KYLIN-1007.
  * 
  * This is transitional because finally we want to merge from HTable snapshot.
- * However MR input with multiple snapshots is only supported by HBase 1.x.
+ * However multiple snapshots as MR input is only supported by HBase 1.x.
  * Before most users upgrade to latest HBase, they can only use this transitional
  * cuboid file solution.
  */
 public class HBaseMROutput2Transition implements IMROutput2 {
 
+    private static final Logger logger = LoggerFactory.getLogger(HBaseMROutput2Transition.class);
+
     @Override
     public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
         return new IMRBatchCubingOutputSide2() {
@@ -87,7 +95,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             @Override
             public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
                 String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-                
+
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
@@ -127,7 +135,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             @Override
             public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
                 String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-                
+
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
@@ -141,67 +149,139 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private static class HBaseInputFormat implements IMRStorageInputFormat {
-        final Iterable<String> htables;
-
-        final RowValueDecoder[] rowValueDecoders;
-        final ByteArrayWritable parsedKey;
-        final Object[] parsedValue;
-        final Pair<ByteArrayWritable, Object[]> parsedPair;
+        final CubeSegment seg;
 
         public HBaseInputFormat(CubeSegment seg) {
-            this.htables = new HBaseMRSteps(seg).getMergingHTables();
-
-            List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
-            List<MeasureDesc> measuresDescs = Lists.newArrayList();
-            for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
-                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                    valueDecoderList.add(new RowValueDecoder(colDesc));
-                    for (MeasureDesc measure : colDesc.getMeasures()) {
-                        measuresDescs.add(measure);
-                    }
+            this.seg = seg;
+        }
+
+        @Override
+        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
+            // merge by cuboid files
+            if (isMergeFromCuboidFiles(job.getConfiguration())) {
+                logger.info("Merge from cuboid files for " + seg);
+                
+                job.setInputFormatClass(SequenceFileInputFormat.class);
+                addCuboidInputDirs(job);
+                
+                job.setMapperClass(mapperClz);
+                job.setMapOutputKeyClass(outputKeyClz);
+                job.setMapOutputValueClass(outputValueClz);
+            }
+            // merge from HTable scan
+            else {
+                logger.info("Merge from HTables for " + seg);
+                
+                Configuration conf = job.getConfiguration();
+                HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+                List<Scan> scans = new ArrayList<Scan>();
+                for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
+                    Scan scan = new Scan();
+                    scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+                    scan.setCacheBlocks(false); // don't set to true for MR jobs
+                    scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+                    scans.add(scan);
                 }
+                TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
             }
-            this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
-
-            this.parsedKey = new ByteArrayWritable();
-            this.parsedValue = new Object[measuresDescs.size()];
-            this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
         }
 
-        @Override
-        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
-            Configuration conf = job.getConfiguration();
-            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
-            List<Scan> scans = new ArrayList<Scan>();
-            for (String htable : htables) {
-                Scan scan = new Scan();
-                scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
-                scan.setCacheBlocks(false); // don't set to true for MR jobs
-                scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
-                scans.add(scan);
+        private void addCuboidInputDirs(Job job) throws IOException {
+            List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+            
+            String[] inputs = new String[mergingSegments.size()];
+            for (int i = 0; i < mergingSegments.size(); i++) {
+                CubeSegment mergingSeg = mergingSegments.get(i);
+                String cuboidPath = steps.getCuboidRootPath(mergingSeg);
+                inputs[i] = cuboidPath + (cuboidPath.endsWith("/") ? "" : "/") + "*";
             }
-            TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+            
+            AbstractHadoopJob.addInputDirs(inputs, job);
         }
 
         @Override
-        public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
-            TableSplit currentSplit = (TableSplit) context.getInputSplit();
-            byte[] tableName = currentSplit.getTableName();
-            String htableName = Bytes.toString(tableName);
-
-            // decide which source segment
-            for (CubeSegment segment : cubeInstance.getSegments()) {
-                String segmentHtable = segment.getStorageLocationIdentifier();
-                if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
-                    return segment;
+        public CubeSegment findSourceSegment(Context context) throws IOException {
+            // merge by cuboid files
+            if (isMergeFromCuboidFiles(context.getConfiguration())) {
+                FileSplit fileSplit = (FileSplit) context.getInputSplit();
+                return MergeCuboidMapper.findSourceSegment(fileSplit, seg.getCubeInstance());
+            }
+            // merge from HTable scan
+            else {
+                TableSplit currentSplit = (TableSplit) context.getInputSplit();
+                byte[] tableName = currentSplit.getTableName();
+                String htableName = Bytes.toString(tableName);
+
+                // decide which source segment
+                for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
+                    String segmentHtable = segment.getStorageLocationIdentifier();
+                    if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
+                        return segment;
+                    }
                 }
+                throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
             }
-            throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
         }
 
+        transient ByteArrayWritable parsedKey;
+        transient Object[] parsedValue;
+        transient Pair<ByteArrayWritable, Object[]> parsedPair;
+        
+        transient MeasureCodec measureCodec;
+        transient RowValueDecoder[] rowValueDecoders;
+
         @Override
         public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
+            // lazy init
+            if (parsedPair == null) {
+                parsedKey = new ByteArrayWritable();
+                parsedValue = new Object[seg.getCubeDesc().getMeasures().size()];
+                parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+            }
+            
+            // merge by cuboid files
+            if (isMergeFromCuboidFiles(null)) {
+                return parseMapperInputFromCuboidFile(inKey, inValue);
+            }
+            // merge from HTable scan
+            else {
+                return parseMapperInputFromHTable(inKey, inValue);
+            }
+        }
+
+        private Pair<ByteArrayWritable, Object[]> parseMapperInputFromCuboidFile(Object inKey, Object inValue) {
+            // lazy init
+            if (measureCodec == null) {
+                measureCodec = new MeasureCodec(seg.getCubeDesc().getMeasures());
+            }
+            
+            Text key = (Text) inKey;
+            parsedKey.set(key.getBytes(), 0, key.getLength());
+            
+            Text value = (Text) inValue;
+            measureCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), parsedValue);
+            
+            return parsedPair;
+        }
+
+        private Pair<ByteArrayWritable, Object[]> parseMapperInputFromHTable(Object inKey, Object inValue) {
+            // lazy init
+            if (rowValueDecoders == null) {
+                List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+                List<MeasureDesc> measuresDescs = Lists.newArrayList();
+                for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
+                    for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                        valueDecoderList.add(new RowValueDecoder(colDesc));
+                        for (MeasureDesc measure : colDesc.getMeasures()) {
+                            measuresDescs.add(measure);
+                        }
+                    }
+                }
+                rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+            }
+
             ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
             parsedKey.set(key.get(), key.getOffset(), key.getLength());
 
@@ -213,6 +293,53 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
             return parsedPair;
         }
+
+        transient Boolean isMergeFromCuboidFiles;
+
+        // merge from cuboid files is better than merge from HTable, because no workload on HBase region server
+        private boolean isMergeFromCuboidFiles(Configuration jobConf) {
+            // cache in this object?
+            if (isMergeFromCuboidFiles != null)
+                return isMergeFromCuboidFiles.booleanValue();
+
+            final String confKey = "kylin.isMergeFromCuboidFiles";
+
+            // cache in job configuration?
+            if (jobConf != null) {
+                String result = jobConf.get(confKey);
+                if (result != null) {
+                    isMergeFromCuboidFiles = Boolean.valueOf(result);
+                    return isMergeFromCuboidFiles.booleanValue();
+                }
+            }
+
+            boolean result = true;
+
+            try {
+                List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+                HBaseMRSteps steps = new HBaseMRSteps(seg);
+                for (CubeSegment mergingSeg : mergingSegments) {
+                    String cuboidRootPath = steps.getCuboidRootPath(mergingSeg);
+                    FileSystem fs = HadoopUtil.getFileSystem(cuboidRootPath);
+
+                    boolean cuboidFileExist = fs.exists(new Path(cuboidRootPath));
+                    if (cuboidFileExist == false) {
+                        logger.info("Merge from HTable because " + cuboidRootPath + " does not exist");
+                        result = false;
+                        break;
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            // put in cache
+            isMergeFromCuboidFiles = Boolean.valueOf(result);
+            if (jobConf != null) {
+                jobConf.set(confKey, "" + result);
+            }
+            return result;
+        }
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -236,10 +363,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             job.setOutputFormatClass(SequenceFileOutputFormat.class);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
-            
+
             Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
             FileOutputFormat.setOutputPath(job, output);
-            
+
             HadoopUtil.deletePath(job.getConfiguration(), output);
         }
 
@@ -254,11 +381,11 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             }
 
             outputKey.set(key.array(), key.offset(), key.length());
-         
+
             valueBuf.clear();
             codec.encode(value, valueBuf);
             outputValue.set(valueBuf.array(), 0, valueBuf.position());
-            
+
             context.write(outputKey, outputValue);
         }
     }