You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/11 05:34:48 UTC

[1/3] incubator-kylin git commit: KYLIN-1007 Use CubeDesc.getMeasures() as the order of measures on GridTable, as well as on MR KV

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 0739cc227 -> acd8c0dfb


KYLIN-1007 Use CubeDesc.getMeasures() as the order of measures on
GridTable, as well as on MR KV


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/fe9e02cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/fe9e02cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/fe9e02cd

Branch: refs/heads/2.x-staging
Commit: fe9e02cd3b18d7e25232ca9d50d37637d5721ae8
Parents: 0739cc2
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Sep 7 18:00:07 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:01 2015 +0800

----------------------------------------------------------------------
 .../gridtable/CuboidToGridTableMapping.java     | 66 ++++++++++--------
 .../cube/inmemcubing/DoggedCubeBuilder.java     |  7 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      | 33 ++-------
 .../org/apache/kylin/cube/model/CubeDesc.java   | 17 +++--
 .../kylin/cube/model/HBaseColumnDesc.java       | 37 +++++++---
 .../org/apache/kylin/engine/mr/IMROutput.java   |  8 +--
 .../org/apache/kylin/engine/mr/IMROutput2.java  | 22 ++++++
 .../engine/mr/steps/InMemCuboidReducer.java     | 15 +----
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 21 ++----
 .../engine/mr/steps/MergeCuboidMapper.java      |  3 +-
 .../storage/hbase/cube/v1/CubeStorageQuery.java |  4 +-
 .../hbase/cube/v1/CubeTupleConverter.java       |  2 +-
 .../storage/hbase/steps/CubeHFileMapper.java    |  3 +-
 .../storage/hbase/steps/HBaseCuboidWriter.java  | 18 ++---
 .../storage/hbase/steps/HBaseMROutput2.java     | 19 ++----
 .../hbase/steps/InMemKeyValueCreator.java       | 71 --------------------
 .../storage/hbase/steps/KeyValueCreator.java    | 32 ++-------
 .../storage/hbase/steps/RowValueDecoder.java    | 38 +++++++----
 .../hbase/steps/RowValueDecoderTest.java        |  6 +-
 19 files changed, 167 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index ab4efca..12eb5f8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.cube.gridtable;
 
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
@@ -59,29 +60,48 @@ public class CuboidToGridTableMapping {
         nDimensions = gtColIdx;
         assert nDimensions == cuboid.getColumns().size();
 
+        // column blocks of metrics
+        ArrayList<BitSet> metricsColBlocks = Lists.newArrayList();
+        for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
+            for (int i = 0; i < familyDesc.getColumns().length; i++) {
+                metricsColBlocks.add(new BitSet());
+            }
+        }
+        
         // metrics
         metrics2gt = LinkedListMultimap.create();
-        for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                BitSet colBlock = new BitSet();
-                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    // Count distinct & holistic count distinct are equals() but different.
-                    // Ensure the holistic version if exists is always the first.
-                    FunctionDesc func = measure.getFunction();
-                    if (func.isHolisticCountDistinct()) {
-                        List<Integer> existing = metrics2gt.removeAll(func);
-                        metrics2gt.put(func, gtColIdx);
-                        metrics2gt.putAll(func, existing);
-                    } else {
-                        metrics2gt.put(func, gtColIdx);
+        for (MeasureDesc measure :cuboid.getCube().getMeasures()) {
+            // Count distinct & holistic count distinct are equals() but different.
+            // Ensure the holistic version if exists is always the first.
+            FunctionDesc func = measure.getFunction();
+            if (func.isHolisticCountDistinct()) {
+                List<Integer> existing = metrics2gt.removeAll(func);
+                metrics2gt.put(func, gtColIdx);
+                metrics2gt.putAll(func, existing);
+            } else {
+                metrics2gt.put(func, gtColIdx);
+            }
+            
+            gtDataTypes.add(func.getReturnDataType());
+            
+            // map to column block
+            int cbIdx = 0;
+            for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
+                for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+                    if (hbaseColDesc.containsMeasure(measure.getName())) {
+                        metricsColBlocks.get(cbIdx).set(gtColIdx);
                     }
-                    gtDataTypes.add(func.getReturnDataType());
-                    colBlock.set(gtColIdx);
-                    gtColIdx++;
+                    cbIdx++;
                 }
-                gtColBlocks.add(new ImmutableBitSet(colBlock));
             }
+            
+            gtColIdx++;
         }
+        
+        for (BitSet set : metricsColBlocks) {
+            gtColBlocks.add(new ImmutableBitSet(set));
+        }
+        
         nMetrics = gtColIdx - nDimensions;
         assert nMetrics == cuboid.getCube().getMeasures().size();
     }
@@ -157,16 +177,4 @@ public class CuboidToGridTableMapping {
         return result.isEmpty() ? Collections.<Integer, Integer> emptyMap() : result;
     }
 
-    public static MeasureDesc[] getMeasureSequenceOnGridTable(CubeDesc cube) {
-        MeasureDesc[] result = new MeasureDesc[cube.getMeasures().size()];
-        int i = 0;
-        for (HBaseColumnFamilyDesc familyDesc : cube.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                for (MeasureDesc m : hbaseColDesc.getMeasures()) {
-                    result[i++] = m;
-                }
-            }
-        }
-        return result;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 30e1e95..d565ab5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -32,14 +32,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -286,9 +284,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
         ImmutableBitSet lastMetricsColumns;
 
         Merger() {
-            MeasureDesc[] measures = CuboidToGridTableMapping.getMeasureSequenceOnGridTable(cubeDesc);
-            reuseAggrs = new MeasureAggregators(measures);
-            reuseMetricsArray = new Object[measures.length];
+            reuseAggrs = new MeasureAggregators(cubeDesc.getMeasures());
+            reuseMetricsArray = new Object[cubeDesc.getMeasures().size()];
         }
 
         public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 465dd99..662416e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -38,8 +38,6 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTAggregateScanner;
 import org.apache.kylin.gridtable.GTBuilder;
@@ -75,7 +73,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private final CubeJoinedFlatTableDesc intermediateTableDesc;
     private final MeasureCodec measureCodec;
     private final String[] metricsAggrFuncs;
-    private final int[] hbaseMeasureRefIndex;
     private final MeasureDesc[] measureDescs;
     private final int measureCount;
 
@@ -97,35 +94,18 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
         this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
 
+        this.measureCount = cubeDesc.getMeasures().size();
+        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+        
         Map<String, Integer> measureIndexMap = Maps.newHashMap();
         List<String> metricsAggrFuncsList = Lists.newArrayList();
-        measureCount = cubeDesc.getMeasures().size();
-
-        List<MeasureDesc> measureDescsList = Lists.newArrayList();
-        hbaseMeasureRefIndex = new int[measureCount];
-        int measureRef = 0;
-        for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    for (int j = 0; j < measureCount; j++) {
-                        if (cubeDesc.getMeasures().get(j).equals(measure)) {
-                            measureDescsList.add(measure);
-                            hbaseMeasureRefIndex[measureRef] = j;
-                            break;
-                        }
-                    }
-                    measureRef++;
-                }
-            }
-        }
 
         for (int i = 0; i < measureCount; i++) {
-            MeasureDesc measureDesc = measureDescsList.get(i);
+            MeasureDesc measureDesc = measureDescs[i];
             metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
             measureIndexMap.put(measureDesc.getName(), i);
         }
         this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
     }
 
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
@@ -605,8 +585,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             Object[] values = new Object[measureCount];
             MeasureDesc measureDesc = null;
 
-            for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
-                int i = hbaseMeasureRefIndex[position];
+            for (int i = 0; i < measureCount; i++) {
                 measureDesc = measureDescs[i];
 
                 Object value = null;
@@ -635,7 +614,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                     }
                     value = measureCodec.getSerializer(i).valueOf(result);
                 }
-                values[position] = value;
+                values[i] = value;
             }
             return values;
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 84cf842..0060e09 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -651,17 +651,24 @@ public class CubeDesc extends RootPersistentEntity {
         if (measures == null || measures.size() == 0)
             return;
 
-        Map<String, MeasureDesc> measureCache = new HashMap<String, MeasureDesc>();
+        Map<String, MeasureDesc> measureLookup = new HashMap<String, MeasureDesc>();
         for (MeasureDesc m : measures)
-            measureCache.put(m.getName(), m);
+            measureLookup.put(m.getName(), m);
+        Map<String, Integer> measureIndexLookup = new HashMap<String, Integer>();
+        for (int i = 0; i < measures.size(); i++)
+            measureIndexLookup.put(measures.get(i).getName(), i);
 
         for (HBaseColumnFamilyDesc cf : getHBaseMapping().getColumnFamily()) {
             for (HBaseColumnDesc c : cf.getColumns()) {
-                MeasureDesc[] measureDescs = new MeasureDesc[c.getMeasureRefs().length];
-                for (int i = 0; i < c.getMeasureRefs().length; i++) {
-                    measureDescs[i] = measureCache.get(c.getMeasureRefs()[i]);
+                String[] colMeasureRefs = c.getMeasureRefs();
+                MeasureDesc[] measureDescs = new MeasureDesc[colMeasureRefs.length];
+                int[] measureIndex = new int[colMeasureRefs.length];
+                for (int i = 0; i < colMeasureRefs.length; i++) {
+                    measureDescs[i] = measureLookup.get(colMeasureRefs[i]);
+                    measureIndex[i] = measureIndexLookup.get(colMeasureRefs[i]);
                 }
                 c.setMeasures(measureDescs);
+                c.setMeasureIndex(measureIndex);
                 c.setColumnFamilyName(cf.getName());
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
index 9aa6727..e82ad53 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
@@ -37,8 +37,9 @@ public class HBaseColumnDesc {
     @JsonProperty("measure_refs")
     private String[] measureRefs;
 
-    // these two will be assemble at runtime
+    // these two will be assembled at runtime
     private MeasureDesc[] measures;
+    private int[] measureIndex; // the index on CubeDesc.getMeasures()
     private String columnFamilyName;
 
     public String getQualifier() {
@@ -56,20 +57,19 @@ public class HBaseColumnDesc {
     public void setMeasureRefs(String[] measureRefs) {
         this.measureRefs = measureRefs;
     }
+    
+    public int[] getMeasureIndex() {
+        return measureIndex;
+    }
+    
+    public void setMeasureIndex(int[] index) {
+        this.measureIndex = index;
+    }
 
     public MeasureDesc[] getMeasures() {
         return measures;
     }
 
-    public int findMeasureIndex(FunctionDesc function) {
-        for (int i = 0; i < measures.length; i++) {
-            if (measures[i].getFunction().equals(function)) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
     public void setMeasures(MeasureDesc[] measures) {
         this.measures = measures;
     }
@@ -82,6 +82,23 @@ public class HBaseColumnDesc {
         this.columnFamilyName = columnFamilyName;
     }
 
+    public int findMeasure(FunctionDesc function) {
+        for (int i = 0; i < measures.length; i++) {
+            if (measures[i].getFunction().equals(function)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    public boolean containsMeasure(String refName) {
+        for (String ref : measureRefs) {
+            if (ref.equals(refName))
+                return true;
+        }
+        return false;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index e6282f7..577a836 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -40,8 +40,8 @@ public interface IMROutput {
         /**
          * Add step that saves cuboid output from HDFS to storage.
          * 
-         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
-         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, 
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
          * dictionary encoding; Mx is measure value serialization form.
          */
         public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
@@ -66,8 +66,8 @@ public interface IMROutput {
         /**
          * Add step that saves cuboid output from HDFS to storage.
          * 
-         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
-         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, 
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
          * dictionary encoding; Mx is measure value serialization form.
          */
         public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 456df82..b4aff5d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -27,6 +27,7 @@ public interface IMROutput2 {
      */
     public interface IMRBatchCubingOutputSide2 {
 
+        /** Return an output format for Phase 3: Build Cube MR */
         public IMRStorageOutputFormat getStorageOutputFormat();
 
         /** Add step that executes after build dictionary and before build cube. */
@@ -45,13 +46,22 @@ public interface IMROutput2 {
         public IMRStorageInputFormat getStorageInputFormat();
     }
 
+    /** Read in a cube as input of merge. Configure the input file format of mapper. */
     @SuppressWarnings("rawtypes")
     public interface IMRStorageInputFormat {
 
+        /** Configure MR mapper class and input file format. */
         public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
 
+        /** Given a mapper context, figure out which segment the mapper reads from. */
         public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
 
+        /**
+         * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
+         * 
+         * @return <code>ByteArrayWritable</code> is the cuboid ID (8 bytes) + dimension values in dictionary encoding
+         *         <code>Object[]</code> is the measure values in order of <code>CubeDesc.getMeasures()</code>
+         */
         public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
     }
 
@@ -67,6 +77,7 @@ public interface IMROutput2 {
      */
     public interface IMRBatchMergeOutputSide2 {
 
+        /** Return an input format for Phase 2: Merge Cube MR */
         public IMRStorageOutputFormat getStorageOutputFormat();
 
         /** Add step that executes after merge dictionary and before merge cube. */
@@ -79,10 +90,21 @@ public interface IMROutput2 {
         public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
+    /** Write out a cube. Configure the output file format of reducer and do the actual K-V output. */
     @SuppressWarnings("rawtypes")
     public interface IMRStorageOutputFormat {
+        
+        /** Configure MR reducer class and output file format. */
         public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
 
+        /**
+         * Write out a row of cuboid. Given the cuboid ID, dimensions, and measures, serialize in whatever
+         * way and output to reducer context.
+         * 
+         * @param key     The cuboid ID (8 bytes) + dimension values in dictionary encoding
+         * @param value   The measure values in order of <code>CubeDesc.getMeasures()</code>
+         * @param context The reducer context output goes to
+         */
         public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index 443ecd8..db254f6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -8,8 +8,6 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
 import org.apache.kylin.engine.mr.KylinReducer;
@@ -23,8 +21,6 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 /**
  */
 public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
@@ -56,18 +52,9 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
         else
             storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
 
-        List<MeasureDesc> measuresDescs = Lists.newArrayList();
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                for (MeasureDesc measure : colDesc.getMeasures()) {
-                    measuresDescs.add(measure);
-                }
-            }
-        }
-
+        List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
         codec = new MeasureCodec(measuresDescs);
         aggs = new MeasureAggregators(measuresDescs);
-
         input = new Object[measuresDescs.size()];
         result = new Object[measuresDescs.size()];
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b57e484..fa575ca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesUtil;
@@ -35,8 +34,6 @@ import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
@@ -46,14 +43,12 @@ import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 /**
  * @author shaoshi
@@ -111,22 +106,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
         storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
 
-        newKeyBuf = new byte[256];// size will auto-grow
+        newKeyBuf = new byte[256]; // size will auto-grow
 
         sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
-        logger.info(sourceCubeSegment.toString());
+        logger.info("Source cube segment: " + sourceCubeSegment);
 
-        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+        rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
 
-        List<MeasureDesc> measuresDescs = Lists.newArrayList();
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                for (MeasureDesc measure : colDesc.getMeasures()) {
-                    measuresDescs.add(measure);
-                }
-            }
-        }
-        codec = new MeasureCodec(measuresDescs);
+        codec = new MeasureCodec(cubeDesc.getMeasures());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 5327dc4..a09ffe1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -57,8 +57,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private CubeInstance cube;
     private CubeDesc cubeDesc;
     private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
-    // life cycle
+    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
 
     private Text outputKey = new Text();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 0514b45..836f142 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -373,7 +373,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             int bestIndex = -1;
             for (HBaseColumnDesc hbCol : hbCols) {
                 bestHBCol = hbCol;
-                bestIndex = hbCol.findMeasureIndex(aggrFunc);
+                bestIndex = hbCol.findMeasure(aggrFunc);
                 MeasureDesc measure = hbCol.getMeasures()[bestIndex];
                 // criteria for holistic measure: Exact Aggregation && Exact Cuboid
                 if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
@@ -387,7 +387,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
                 codec = new RowValueDecoder(bestHBCol);
                 codecMap.put(bestHBCol, codec);
             }
-            codec.setIndex(bestIndex);
+            codec.setProjectIndex(bestIndex);
         }
         return new ArrayList<RowValueDecoder>(codecMap.values());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index 27f4f10..e569cbd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -118,7 +118,7 @@ public class CubeTupleConverter {
         // measures
         for (int i = 0; i < rowValueDecoders.size(); i++) {
             RowValueDecoder rowValueDecoder = rowValueDecoders.get(i);
-            rowValueDecoder.decode(hbaseRow);
+            rowValueDecoder.decodeAndConvertJavaObj(hbaseRow);
             Object[] measureValues = rowValueDecoder.getValues();
 
             int[] measureIdx = metricsMeasureIdx[i];

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
index 65b56b9..9f97b0e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
@@ -79,8 +79,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita
         KeyValue outputValue;
 
         int n = keyValueCreators.size();
-        if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for
-                                                            // simple full copy
+        if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for simple full copy
 
             outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
             context.write(outputKey, outputValue);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index d313233..1271070 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -45,7 +45,6 @@ import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -64,26 +63,26 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
 
     private static final int BATCH_PUT_THRESHOLD = 10000;
 
-    private final List<InMemKeyValueCreator> keyValueCreators;
+    private final List<KeyValueCreator> keyValueCreators;
     private final int nColumns;
     private final HTableInterface hTable;
     private final ByteBuffer byteBuffer;
     private final CubeDesc cubeDesc;
+    private final Object[] measureValues;
     private List<Put> puts = Lists.newArrayList();
 
     public HBaseCuboidWriter(CubeDesc cubeDesc, HTableInterface hTable) {
         this.keyValueCreators = Lists.newArrayList();
         this.cubeDesc = cubeDesc;
-        int startPosition = 0;
         for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
             for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
-                startPosition += colDesc.getMeasures().length;
+                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
             }
         }
         this.nColumns = keyValueCreators.size();
         this.hTable = hTable;
         this.byteBuffer = ByteBuffer.allocate(1 << 20);
+        this.measureValues = new Object[cubeDesc.getMeasures().size()];
     }
 
     private byte[] copy(byte[] array, int offset, int length) {
@@ -106,10 +105,13 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
     @Override
     public void write(long cuboidId, GTRecord record) throws IOException {
         final ByteBuffer key = createKey(cuboidId, record);
-        final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
-        final ImmutableBitSet bitSet = new ImmutableBitSet(mapping.getDimensionCount(), mapping.getColumnCount());
+        final Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+        final int nDims = cuboid.getColumns().size();
+        final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + cubeDesc.getMeasures().size());
+        
         for (int i = 0; i < nColumns; i++) {
-            final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
+            final Object[] values = record.getValues(bitSet, measureValues);
+            final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), values);
             final Put put = new Put(copy(key.array(), 0, key.position()));
             byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
             byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 6cbcec3..1e414be 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -58,9 +58,6 @@ import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.storage.hbase.steps.HBaseMRSteps;
-import org.apache.kylin.storage.hbase.steps.InMemKeyValueCreator;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
 
 import com.google.common.collect.Lists;
 
@@ -196,14 +193,10 @@ public class HBaseMROutput2 implements IMROutput2 {
             ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
             parsedKey.set(key.get(), key.getOffset(), key.getLength());
 
-            Result value = (Result) inValue;
-            int position = 0;
+            Result hbaseRow = (Result) inValue;
             for (int i = 0; i < rowValueDecoders.length; i++) {
-                rowValueDecoders[i].decode(value, false);
-                Object[] measureValues = rowValueDecoders[i].getValues();
-                for (int j = 0; j < measureValues.length; j++) {
-                    parsedValue[position++] = measureValues[j];
-                }
+                rowValueDecoders[i].decode(hbaseRow);
+                rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
             }
 
             return parsedPair;
@@ -214,7 +207,7 @@ public class HBaseMROutput2 implements IMROutput2 {
     private static class HBaseOutputFormat implements IMRStorageOutputFormat {
         final CubeSegment seg;
 
-        final List<InMemKeyValueCreator> keyValueCreators = Lists.newArrayList();
+        final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
         final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
 
         public HBaseOutputFormat(CubeSegment seg) {
@@ -243,11 +236,9 @@ public class HBaseMROutput2 implements IMROutput2 {
         @Override
         public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
             if (keyValueCreators.size() == 0) {
-                int startPosition = 0;
                 for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
                     for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                        keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
-                        startPosition += colDesc.getMeasures().length;
+                        keyValueCreators.add(new KeyValueCreator(seg.getCubeDesc(), colDesc));
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
deleted file mode 100644
index d96dfcc..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.kylin.storage.hbase.steps;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.Lists;
-
-public class InMemKeyValueCreator {
-    byte[] cfBytes;
-    byte[] qBytes;
-    long timestamp;
-
-    MeasureCodec codec;
-    Object[] colValues;
-    ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-    int startPosition = 0;
-
-    public InMemKeyValueCreator(HBaseColumnDesc colDesc, int startPosition) {
-
-        cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
-        qBytes = Bytes.toBytes(colDesc.getQualifier());
-        timestamp = System.currentTimeMillis();
-
-        List<MeasureDesc> measures = Lists.newArrayList();
-        for (MeasureDesc measure : colDesc.getMeasures()) {
-            measures.add(measure);
-        }
-        codec = new MeasureCodec(measures);
-        colValues = new Object[measures.size()];
-
-        this.startPosition = startPosition;
-
-    }
-
-    public KeyValue create(Text key, Object[] measureValues) {
-        return create(key.getBytes(), 0, key.getLength(), measureValues);
-    }
-
-    public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
-        for (int i = 0; i < colValues.length; i++) {
-            colValues[i] = measureValues[startPosition + i];
-        }
-
-        valueBuf.clear();
-        codec.encode(colValues, valueBuf);
-
-        return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
-    }
-
-    public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
-        return new KeyValue(keyBytes, keyOffset, keyLength, //
-                cfBytes, 0, cfBytes.length, //
-                qBytes, 0, qBytes.length, //
-                timestamp, KeyValue.Type.Put, //
-                value, voffset, vlen);
-    }
-
-    public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
-        return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
index a24fe39..7f40259 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -1,7 +1,6 @@
 package org.apache.kylin.storage.hbase.steps;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
@@ -36,21 +35,14 @@ public class KeyValueCreator {
         qBytes = Bytes.toBytes(colDesc.getQualifier());
         timestamp = System.currentTimeMillis();
 
-        List<MeasureDesc> measures = cubeDesc.getMeasures();
-        String[] measureNames = getMeasureNames(cubeDesc);
-        String[] refs = colDesc.getMeasureRefs();
-
-        refIndex = new int[refs.length];
-        refMeasures = new MeasureDesc[refs.length];
-        for (int i = 0; i < refs.length; i++) {
-            refIndex[i] = indexOf(measureNames, refs[i]);
-            refMeasures[i] = measures.get(refIndex[i]);
-        }
+        refIndex = colDesc.getMeasureIndex();
+        refMeasures = colDesc.getMeasures();
 
         codec = new MeasureCodec(refMeasures);
-        colValues = new Object[refs.length];
+        colValues = new Object[refMeasures.length];
 
         isFullCopy = true;
+        List<MeasureDesc> measures = cubeDesc.getMeasures();
         for (int i = 0; i < measures.size(); i++) {
             if (refIndex.length <= i || refIndex[i] != i)
                 isFullCopy = false;
@@ -84,20 +76,4 @@ public class KeyValueCreator {
         return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
     }
 
-    private int indexOf(String[] measureNames, String ref) {
-        for (int i = 0; i < measureNames.length; i++)
-            if (measureNames[i].equalsIgnoreCase(ref))
-                return i;
-
-        throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
-    }
-
-    private String[] getMeasureNames(CubeDesc cubeDesc) {
-        List<MeasureDesc> measures = cubeDesc.getMeasures();
-        String[] result = new String[measures.size()];
-        for (int i = 0; i < measures.size(); i++)
-            result[i] = measures.get(i).getName();
-        return result;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 9fd4e22..7c1a19f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -42,7 +42,7 @@ public class RowValueDecoder implements Cloneable {
     private final MeasureCodec codec;
     private final BitSet projectionIndex;
     private final MeasureDesc[] measures;
-    private Object[] values;
+    private final Object[] values;
 
     public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
         this.hbaseColumn = hbaseColumn;
@@ -54,22 +54,26 @@ public class RowValueDecoder implements Cloneable {
         this.values = new Object[measures.length];
     }
 
-    public void decode(Result hbaseRow) {
+    public void decodeAndConvertJavaObj(Result hbaseRow) {
         decode(hbaseRow, true);
     }
+    
+    public void decode(Result hbaseRow) {
+        decode(hbaseRow, false);
+    }
 
-    public void decode(Result hbaseRow, boolean convertToJavaObject) {
+    private void decode(Result hbaseRow, boolean convertToJavaObject) {
         decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
     }
 
-    public void decode(byte[] bytes) {
-        decode(bytes, true);
+    public void decodeAndConvertJavaObj(byte[] bytes) {
+        decode(ByteBuffer.wrap(bytes), true);
     }
 
-    public void decode(byte[] bytes, boolean convertToJavaObject) {
-        decode(ByteBuffer.wrap(bytes), convertToJavaObject);
+    public void decode(byte[] bytes) {
+        decode(ByteBuffer.wrap(bytes), false);
     }
-
+    
     private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
         codec.decode(buffer, values);
         if (convertToJavaObject) {
@@ -90,18 +94,18 @@ public class RowValueDecoder implements Cloneable {
         }
     }
 
-    public void setIndex(int bitIndex) {
+    public void setProjectIndex(int bitIndex) {
         projectionIndex.set(bitIndex);
     }
 
-    public HBaseColumnDesc getHBaseColumn() {
-        return hbaseColumn;
-    }
-
     public BitSet getProjectionIndex() {
         return projectionIndex;
     }
 
+    public HBaseColumnDesc getHBaseColumn() {
+        return hbaseColumn;
+    }
+
     public Object[] getValues() {
         return values;
     }
@@ -109,6 +113,14 @@ public class RowValueDecoder implements Cloneable {
     public MeasureDesc[] getMeasures() {
         return measures;
     }
+    
+    // result is in order of <code>CubeDesc.getMeasures()</code>
+    public void loadCubeMeasureArray(Object result[]) {
+        int[] measureIndex = hbaseColumn.getMeasureIndex();
+        for (int i = 0; i < measureIndex.length; i++) {
+            result[measureIndex[i]] = values[i];
+        }
+    }
 
     public boolean hasMemHungryCountDistinct() {
         for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
index d62da80..ba79305 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
@@ -73,11 +73,11 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase {
         RowValueDecoder rowValueDecoder = new RowValueDecoder(hbaseCol);
         for (MeasureDesc measure : cubeDesc.getMeasures()) {
             FunctionDesc aggrFunc = measure.getFunction();
-            int index = hbaseCol.findMeasureIndex(aggrFunc);
-            rowValueDecoder.setIndex(index);
+            int index = hbaseCol.findMeasure(aggrFunc);
+            rowValueDecoder.setProjectIndex(index);
         }
 
-        rowValueDecoder.decode(valueBytes);
+        rowValueDecoder.decodeAndConvertJavaObj(valueBytes);
         Object[] measureValues = rowValueDecoder.getValues();
         //BigDecimal.ROUND_HALF_EVEN in BigDecimalSerializer
         assertEquals("[333.1235, 333.1111, 333.2000, 2, 100]", Arrays.toString(measureValues));


[2/3] incubator-kylin git commit: KYLIN-1007 Generates cuboid file in build and merge

Posted by li...@apache.org.
KYLIN-1007 Generates cuboid file in build and merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f156d138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f156d138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f156d138

Branch: refs/heads/2.x-staging
Commit: f156d138d34b58e5c620c0d1c8678029b5aa41a0
Parents: fe9e02c
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Sep 10 14:31:03 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:04 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/CubingJob.java   |  24 +-
 .../kylin/storage/hbase/HBaseStorage.java       |   3 +-
 .../hbase/steps/HBaseMROutput2Transition.java   | 266 +++++++++++++++++++
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  12 +-
 4 files changed, 297 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index b2f0a06..ed5c036 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -21,6 +21,8 @@ package org.apache.kylin.engine.mr;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.TimeZone;
@@ -179,11 +181,26 @@ public class CubingJob extends DefaultChainedExecutable {
     }
 
     public long findCubeSizeBytes() {
-        return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
+        // look for the info BACKWARD, let the last step that claims the cube size win
+        return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
+    }
+    
+    public String findExtraInfo(String key, String dft) {
+        return findExtraInfo(key, dft, false);
     }
 
-    private String findExtraInfo(String key, String dft) {
-        for (AbstractExecutable child : getTasks()) {
+    public String findExtraInfoBackward(String key, String dft) {
+        return findExtraInfo(key, dft, true);
+    }
+    
+    private String findExtraInfo(String key, String dft, boolean backward) {
+        ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
+        
+        if (backward) {
+            Collections.reverse(tasks);
+        }
+        
+        for (AbstractExecutable child : tasks) {
             Output output = executableManager.getOutput(child.getId());
             String value = output.getExtra().get(key);
             if (value != null)
@@ -191,4 +208,5 @@ public class CubingJob extends DefaultChainedExecutable {
         }
         return dft;
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 1d8c8c8..421f648 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -36,6 +36,7 @@ import org.apache.kylin.storage.cache.CacheFledgedDynamicQuery;
 import org.apache.kylin.storage.cache.CacheFledgedStaticQuery;
 import org.apache.kylin.storage.hbase.steps.HBaseMROutput;
 import org.apache.kylin.storage.hbase.steps.HBaseMROutput2;
+import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.apache.kylin.storage.hybrid.HybridStorageQuery;
 
@@ -105,7 +106,7 @@ public class HBaseStorage implements IStorage {
         if (engineInterface == IMROutput.class) {
             return (I) new HBaseMROutput();
         } else if (engineInterface == IMROutput2.class) {
-            return (I) new HBaseMROutput2();
+            return (I) new HBaseMROutput2Transition();
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
new file mode 100644
index 0000000..047315b
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This "Transition" MR output generates cuboid files and then convert to HFile.
+ * The additional step slows down build process slightly, but the gains is merge
+ * can read from HDFS instead of over HBase region server. See KYLIN-1007.
+ * 
+ * This is transitional because finally we want to merge from HTable snapshot.
+ * However MR input with multiple snapshots is only supported by HBase 1.x.
+ * Before most users upgrade to latest HBase, they can only use this transitional
+ * cuboid file solution.
+ */
+public class HBaseMROutput2Transition implements IMROutput2 {
+
+    @Override
+    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+        return new IMRBatchCubingOutputSide2() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public IMRStorageOutputFormat getStorageOutputFormat() {
+                return new HBaseOutputFormat(seg);
+            }
+
+            @Override
+            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
+                
+                jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                // nothing to do
+            }
+        };
+    }
+
+    @Override
+    public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) {
+        return new IMRBatchMergeInputSide2() {
+            @Override
+            public IMRStorageInputFormat getStorageInputFormat() {
+                return new HBaseInputFormat(seg);
+            }
+        };
+    }
+
+    @Override
+    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
+        return new IMRBatchMergeOutputSide2() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public IMRStorageOutputFormat getStorageOutputFormat() {
+                return new HBaseOutputFormat(seg);
+            }
+
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
+                String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
+                
+                jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createMergeGCStep());
+            }
+        };
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private static class HBaseInputFormat implements IMRStorageInputFormat {
+        final Iterable<String> htables;
+
+        final RowValueDecoder[] rowValueDecoders;
+        final ByteArrayWritable parsedKey;
+        final Object[] parsedValue;
+        final Pair<ByteArrayWritable, Object[]> parsedPair;
+
+        public HBaseInputFormat(CubeSegment seg) {
+            this.htables = new HBaseMRSteps(seg).getMergingHTables();
+
+            List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+            List<MeasureDesc> measuresDescs = Lists.newArrayList();
+            for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
+                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                    valueDecoderList.add(new RowValueDecoder(colDesc));
+                    for (MeasureDesc measure : colDesc.getMeasures()) {
+                        measuresDescs.add(measure);
+                    }
+                }
+            }
+            this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+
+            this.parsedKey = new ByteArrayWritable();
+            this.parsedValue = new Object[measuresDescs.size()];
+            this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+        }
+
+        @Override
+        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
+            Configuration conf = job.getConfiguration();
+            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+            List<Scan> scans = new ArrayList<Scan>();
+            for (String htable : htables) {
+                Scan scan = new Scan();
+                scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+                scan.setCacheBlocks(false); // don't set to true for MR jobs
+                scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+                scans.add(scan);
+            }
+            TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+        }
+
+        @Override
+        public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
+            TableSplit currentSplit = (TableSplit) context.getInputSplit();
+            byte[] tableName = currentSplit.getTableName();
+            String htableName = Bytes.toString(tableName);
+
+            // decide which source segment
+            for (CubeSegment segment : cubeInstance.getSegments()) {
+                String segmentHtable = segment.getStorageLocationIdentifier();
+                if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
+                    return segment;
+                }
+            }
+            throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
+        }
+
+        @Override
+        public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
+            ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
+            parsedKey.set(key.get(), key.getOffset(), key.getLength());
+
+            Result hbaseRow = (Result) inValue;
+            for (int i = 0; i < rowValueDecoders.length; i++) {
+                rowValueDecoders[i].decode(hbaseRow);
+                rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
+            }
+
+            return parsedPair;
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private static class HBaseOutputFormat implements IMRStorageOutputFormat {
+        final CubeSegment seg;
+
+        Text outputKey;
+        Text outputValue;
+        ByteBuffer valueBuf;
+        MeasureCodec codec;
+
+        public HBaseOutputFormat(CubeSegment seg) {
+            this.seg = seg;
+        }
+
+        @Override
+        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
+            job.setReducerClass(reducer);
+
+            // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+            
+            Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
+            FileOutputFormat.setOutputPath(job, output);
+            
+            HadoopUtil.deletePath(job.getConfiguration(), output);
+        }
+
+        @Override
+        public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
+            // lazy init
+            if (outputKey == null) {
+                outputKey = new Text();
+                outputValue = new Text();
+                valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+                codec = new MeasureCodec(seg.getCubeDesc().getMeasures());
+            }
+
+            outputKey.set(key.array(), key.offset(), key.length());
+         
+            valueBuf.clear();
+            codec.encode(value, valueBuf);
+            outputValue.set(valueBuf.array(), 0, valueBuf.position());
+            
+            context.write(outputKey, outputValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 64374e6..03b4361 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -24,16 +24,18 @@ public class HBaseMRSteps extends JobBuilderSupport {
         String jobId = jobFlow.getId();
 
         // calculate key distribution
-        jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
+        jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath, jobId));
         // create htable step
         jobFlow.addTask(createCreateHTableStep(jobId));
         // generate hfiles step
-        jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
+        jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath, jobId));
         // bulk load step
         jobFlow.addTask(createBulkLoadStep(jobId));
     }
 
-    public MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
+    public MapReduceExecutable createRangeRowkeyDistributionStep(String cuboidRootPath, String jobId) {
+        String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
+        
         MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
         rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
         StringBuilder cmd = new StringBuilder();
@@ -73,7 +75,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return createHtableStep;
     }
 
-    public MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
+    public MapReduceExecutable createConvertCuboidToHfileStep(String cuboidRootPath, String jobId) {
+        String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
+        
         MapReduceExecutable createHFilesStep = new MapReduceExecutable();
         createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
         StringBuilder cmd = new StringBuilder();


[3/3] incubator-kylin git commit: KYLIN-1007 Merge from cuboid files if available, and from HTable if otherwise

Posted by li...@apache.org.
KYLIN-1007 Merge from cuboid files if available, and from HTable if otherwise


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/acd8c0df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/acd8c0df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/acd8c0df

Branch: refs/heads/2.x-staging
Commit: acd8c0dfb6a3e7a422b00e943bf0c66d7c9de2b8
Parents: f156d13
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Sep 11 11:27:00 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:07 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/IMROutput2.java  |   5 +-
 .../engine/mr/common/AbstractHadoopJob.java     |   8 +-
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |  62 +++--
 .../storage/hbase/steps/HBaseMROutput2.java     |  19 +-
 .../hbase/steps/HBaseMROutput2Transition.java   | 237 ++++++++++++++-----
 6 files changed, 230 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index b4aff5d..3ad51c5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -8,7 +8,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
@@ -51,10 +50,10 @@ public interface IMROutput2 {
     public interface IMRStorageInputFormat {
 
         /** Configure MR mapper class and input file format. */
-        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException;
 
         /** Given a mapper context, figure out which segment the mapper reads from. */
-        public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+        public CubeSegment findSourceSegment(Mapper.Context context) throws IOException;
 
         /**
          * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 50b0cc1..cd9393a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -200,8 +200,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         return classpath;
     }
 
-    public void addInputDirs(String input, Job job) throws IOException {
-        for (String inp : StringSplitter.split(input, ",")) {
+    public static void addInputDirs(String input, Job job) throws IOException {
+        addInputDirs(StringSplitter.split(input, ","), job);
+    }
+    
+    public static void addInputDirs(String[] inputs, Job job) throws IOException {
+        for (String inp : inputs) {
             inp = inp.trim();
             if (inp.endsWith("/*")) {
                 inp = inp.substring(0, inp.length() - 2);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index fa575ca..4598673 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -108,7 +108,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
 
         newKeyBuf = new byte[256]; // size will auto-grow
 
-        sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
+        sourceCubeSegment = storageInputFormat.findSourceSegment(context);
         logger.info("Source cube segment: " + sourceCubeSegment);
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index a09ffe1..45f0d32 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -25,7 +25,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesUtil;
@@ -66,8 +65,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
     private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
 
-    private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
     private Boolean checkNeedMerging(TblColRef col) throws IOException {
         Boolean ret = dictsNeedMerging.get(col);
         if (ret != null)
@@ -83,28 +80,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         }
     }
 
-    private String extractJobIDFromPath(String path) {
-        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
-        // check the first occurance
-        if (matcher.find()) {
-            return matcher.group(1);
-        } else {
-            throw new IllegalStateException("Can not extract job ID from file path : " + path);
-        }
-    }
-
-    private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
-        for (CubeSegment segment : cubeInstance.getSegments()) {
-            String lastBuildJobID = segment.getLastBuildJobID();
-            if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
-                return segment;
-            }
-        }
-
-        throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-
-    }
-
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
         super.bindCurrentConfiguration(context.getConfiguration());
@@ -123,15 +98,38 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         newKeyBuf = new byte[256];// size will auto-grow
 
         // decide which source segment
-        InputSplit inputSplit = context.getInputSplit();
-        String filePath = ((FileSplit) inputSplit).getPath().toString();
-        System.out.println("filePath:" + filePath);
+        FileSplit fileSplit = (FileSplit) context.getInputSplit();
+        sourceCubeSegment = findSourceSegment(fileSplit, cube);
+
+        rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+    }
+    
+    private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+
+    public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
+        String filePath = fileSplit.getPath().toString();
         String jobID = extractJobIDFromPath(filePath);
-        System.out.println("jobID:" + jobID);
-        sourceCubeSegment = findSegmentWithUuid(jobID, cube);
-        System.out.println(sourceCubeSegment);
+        return findSegmentWithUuid(jobID, cube);
+    }
+    
+    private static String extractJobIDFromPath(String path) {
+        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+        // check the first occurrence
+        if (matcher.find()) {
+            return matcher.group(1);
+        } else {
+            throw new IllegalStateException("Can not extract job ID from file path : " + path);
+        }
+    }
 
-        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+    private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
+        for (CubeSegment segment : cubeInstance.getSegments()) {
+            String lastBuildJobID = segment.getLastBuildJobID();
+            if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
+                return segment;
+            }
+        }
+        throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 1e414be..b468009 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
@@ -129,16 +128,16 @@ public class HBaseMROutput2 implements IMROutput2 {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private static class HBaseInputFormat implements IMRStorageInputFormat {
-        final Iterable<String> htables;
-
+        final CubeSegment seg;
+        
         final RowValueDecoder[] rowValueDecoders;
         final ByteArrayWritable parsedKey;
         final Object[] parsedValue;
         final Pair<ByteArrayWritable, Object[]> parsedPair;
 
         public HBaseInputFormat(CubeSegment seg) {
-            this.htables = new HBaseMRSteps(seg).getMergingHTables();
-
+            this.seg = seg;
+            
             List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
             List<MeasureDesc> measuresDescs = Lists.newArrayList();
             for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
@@ -157,29 +156,29 @@ public class HBaseMROutput2 implements IMROutput2 {
         }
 
         @Override
-        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
+        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
             Configuration conf = job.getConfiguration();
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
 
             List<Scan> scans = new ArrayList<Scan>();
-            for (String htable : htables) {
+            for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
                 Scan scan = new Scan();
                 scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
                 scan.setCacheBlocks(false); // don't set to true for MR jobs
                 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
                 scans.add(scan);
             }
-            TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+            TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
         }
 
         @Override
-        public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
+        public CubeSegment findSourceSegment(Context context) throws IOException {
             TableSplit currentSplit = (TableSplit) context.getInputSplit();
             byte[] tableName = currentSplit.getTableName();
             String htableName = Bytes.toString(tableName);
 
             // decide which source segment
-            for (CubeSegment segment : cubeInstance.getSegments()) {
+            for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
                 String segmentHtable = segment.getStorageLocationIdentifier();
                 if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
                     return segment;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 047315b..237f0c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
@@ -40,10 +41,11 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -51,24 +53,30 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidMapper;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
 /**
- * This "Transition" MR output generates cuboid files and then convert to HFile.
- * The additional step slows down build process slightly, but the gains is merge
+ * This "Transition" impl generates cuboid files and then convert to HFile.
+ * The additional step slows down build process, but the gains is merge
  * can read from HDFS instead of over HBase region server. See KYLIN-1007.
  * 
  * This is transitional because finally we want to merge from HTable snapshot.
- * However MR input with multiple snapshots is only supported by HBase 1.x.
+ * However multiple snapshots as MR input is only supported by HBase 1.x.
  * Before most users upgrade to latest HBase, they can only use this transitional
  * cuboid file solution.
  */
 public class HBaseMROutput2Transition implements IMROutput2 {
 
+    private static final Logger logger = LoggerFactory.getLogger(HBaseMROutput2Transition.class);
+
     @Override
     public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
         return new IMRBatchCubingOutputSide2() {
@@ -87,7 +95,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             @Override
             public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
                 String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-                
+
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
@@ -127,7 +135,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             @Override
             public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
                 String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-                
+
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
@@ -141,67 +149,139 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private static class HBaseInputFormat implements IMRStorageInputFormat {
-        final Iterable<String> htables;
-
-        final RowValueDecoder[] rowValueDecoders;
-        final ByteArrayWritable parsedKey;
-        final Object[] parsedValue;
-        final Pair<ByteArrayWritable, Object[]> parsedPair;
+        final CubeSegment seg;
 
         public HBaseInputFormat(CubeSegment seg) {
-            this.htables = new HBaseMRSteps(seg).getMergingHTables();
-
-            List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
-            List<MeasureDesc> measuresDescs = Lists.newArrayList();
-            for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
-                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                    valueDecoderList.add(new RowValueDecoder(colDesc));
-                    for (MeasureDesc measure : colDesc.getMeasures()) {
-                        measuresDescs.add(measure);
-                    }
+            this.seg = seg;
+        }
+
+        @Override
+        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
+            // merge by cuboid files
+            if (isMergeFromCuboidFiles(job.getConfiguration())) {
+                logger.info("Merge from cuboid files for " + seg);
+                
+                job.setInputFormatClass(SequenceFileInputFormat.class);
+                addCuboidInputDirs(job);
+                
+                job.setMapperClass(mapperClz);
+                job.setMapOutputKeyClass(outputKeyClz);
+                job.setMapOutputValueClass(outputValueClz);
+            }
+            // merge from HTable scan
+            else {
+                logger.info("Merge from HTables for " + seg);
+                
+                Configuration conf = job.getConfiguration();
+                HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+                List<Scan> scans = new ArrayList<Scan>();
+                for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
+                    Scan scan = new Scan();
+                    scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+                    scan.setCacheBlocks(false); // don't set to true for MR jobs
+                    scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+                    scans.add(scan);
                 }
+                TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
             }
-            this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
-
-            this.parsedKey = new ByteArrayWritable();
-            this.parsedValue = new Object[measuresDescs.size()];
-            this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
         }
 
-        @Override
-        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
-            Configuration conf = job.getConfiguration();
-            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
-            List<Scan> scans = new ArrayList<Scan>();
-            for (String htable : htables) {
-                Scan scan = new Scan();
-                scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
-                scan.setCacheBlocks(false); // don't set to true for MR jobs
-                scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
-                scans.add(scan);
+        private void addCuboidInputDirs(Job job) throws IOException {
+            List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+            
+            String[] inputs = new String[mergingSegments.size()];
+            for (int i = 0; i < mergingSegments.size(); i++) {
+                CubeSegment mergingSeg = mergingSegments.get(i);
+                String cuboidPath = steps.getCuboidRootPath(mergingSeg);
+                inputs[i] = cuboidPath + (cuboidPath.endsWith("/") ? "" : "/") + "*";
             }
-            TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+            
+            AbstractHadoopJob.addInputDirs(inputs, job);
         }
 
         @Override
-        public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
-            TableSplit currentSplit = (TableSplit) context.getInputSplit();
-            byte[] tableName = currentSplit.getTableName();
-            String htableName = Bytes.toString(tableName);
-
-            // decide which source segment
-            for (CubeSegment segment : cubeInstance.getSegments()) {
-                String segmentHtable = segment.getStorageLocationIdentifier();
-                if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
-                    return segment;
+        public CubeSegment findSourceSegment(Context context) throws IOException {
+            // merge by cuboid files
+            if (isMergeFromCuboidFiles(context.getConfiguration())) {
+                FileSplit fileSplit = (FileSplit) context.getInputSplit();
+                return MergeCuboidMapper.findSourceSegment(fileSplit, seg.getCubeInstance());
+            }
+            // merge from HTable scan
+            else {
+                TableSplit currentSplit = (TableSplit) context.getInputSplit();
+                byte[] tableName = currentSplit.getTableName();
+                String htableName = Bytes.toString(tableName);
+
+                // decide which source segment
+                for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
+                    String segmentHtable = segment.getStorageLocationIdentifier();
+                    if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
+                        return segment;
+                    }
                 }
+                throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
             }
-            throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
         }
 
+        transient ByteArrayWritable parsedKey;
+        transient Object[] parsedValue;
+        transient Pair<ByteArrayWritable, Object[]> parsedPair;
+        
+        transient MeasureCodec measureCodec;
+        transient RowValueDecoder[] rowValueDecoders;
+
         @Override
         public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
+            // lazy init
+            if (parsedPair == null) {
+                parsedKey = new ByteArrayWritable();
+                parsedValue = new Object[seg.getCubeDesc().getMeasures().size()];
+                parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+            }
+            
+            // merge by cuboid files
+            if (isMergeFromCuboidFiles(null)) {
+                return parseMapperInputFromCuboidFile(inKey, inValue);
+            }
+            // merge from HTable scan
+            else {
+                return parseMapperInputFromHTable(inKey, inValue);
+            }
+        }
+
+        private Pair<ByteArrayWritable, Object[]> parseMapperInputFromCuboidFile(Object inKey, Object inValue) {
+            // lazy init
+            if (measureCodec == null) {
+                measureCodec = new MeasureCodec(seg.getCubeDesc().getMeasures());
+            }
+            
+            Text key = (Text) inKey;
+            parsedKey.set(key.getBytes(), 0, key.getLength());
+            
+            Text value = (Text) inValue;
+            measureCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), parsedValue);
+            
+            return parsedPair;
+        }
+
+        private Pair<ByteArrayWritable, Object[]> parseMapperInputFromHTable(Object inKey, Object inValue) {
+            // lazy init
+            if (rowValueDecoders == null) {
+                List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+                List<MeasureDesc> measuresDescs = Lists.newArrayList();
+                for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
+                    for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                        valueDecoderList.add(new RowValueDecoder(colDesc));
+                        for (MeasureDesc measure : colDesc.getMeasures()) {
+                            measuresDescs.add(measure);
+                        }
+                    }
+                }
+                rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+            }
+
             ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
             parsedKey.set(key.get(), key.getOffset(), key.getLength());
 
@@ -213,6 +293,53 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
             return parsedPair;
         }
+
+        transient Boolean isMergeFromCuboidFiles;
+
+        // merge from cuboid files is better than merge from HTable, because no workload on HBase region server
+        private boolean isMergeFromCuboidFiles(Configuration jobConf) {
+            // cache in this object?
+            if (isMergeFromCuboidFiles != null)
+                return isMergeFromCuboidFiles.booleanValue();
+
+            final String confKey = "kylin.isMergeFromCuboidFiles";
+
+            // cache in job configuration?
+            if (jobConf != null) {
+                String result = jobConf.get(confKey);
+                if (result != null) {
+                    isMergeFromCuboidFiles = Boolean.valueOf(result);
+                    return isMergeFromCuboidFiles.booleanValue();
+                }
+            }
+
+            boolean result = true;
+
+            try {
+                List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+                HBaseMRSteps steps = new HBaseMRSteps(seg);
+                for (CubeSegment mergingSeg : mergingSegments) {
+                    String cuboidRootPath = steps.getCuboidRootPath(mergingSeg);
+                    FileSystem fs = HadoopUtil.getFileSystem(cuboidRootPath);
+
+                    boolean cuboidFileExist = fs.exists(new Path(cuboidRootPath));
+                    if (cuboidFileExist == false) {
+                        logger.info("Merge from HTable because " + cuboidRootPath + " does not exist");
+                        result = false;
+                        break;
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            // put in cache
+            isMergeFromCuboidFiles = Boolean.valueOf(result);
+            if (jobConf != null) {
+                jobConf.set(confKey, "" + result);
+            }
+            return result;
+        }
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -236,10 +363,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             job.setOutputFormatClass(SequenceFileOutputFormat.class);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
-            
+
             Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
             FileOutputFormat.setOutputPath(job, output);
-            
+
             HadoopUtil.deletePath(job.getConfiguration(), output);
         }
 
@@ -254,11 +381,11 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             }
 
             outputKey.set(key.array(), key.offset(), key.length());
-         
+
             valueBuf.clear();
             codec.encode(value, valueBuf);
             outputValue.set(valueBuf.array(), 0, valueBuf.position());
-            
+
             context.write(outputKey, outputValue);
         }
     }