You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:14 UTC

[37/50] incubator-kylin git commit: KYLIN-653 add ii2basecuboid mapper

KYLIN-653 add ii2basecuboid mapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/929b986d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/929b986d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/929b986d

Branch: refs/heads/streaming-localdict
Commit: 929b986d6d7396204d443aa6e420dd745a217611
Parents: d1c115d
Author: honma <ho...@ebay.com>
Authored: Fri Mar 27 15:56:10 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Mar 27 15:56:10 2015 +0800

----------------------------------------------------------------------
 .../invertedindex/index/RawTableRecord.java     |   2 +
 .../kylin/job/hadoop/cube/BaseCuboidJob.java    |   2 +-
 .../kylin/job/hadoop/cube/BaseCuboidMapper.java | 246 -------------------
 .../job/hadoop/cube/BaseCuboidMapperBase.java   | 205 ++++++++++++++++
 .../job/hadoop/cube/HiveToBaseCuboidMapper.java |  49 ++++
 .../job/hadoop/cube/IIToBaseCuboidMapper.java   | 109 ++++++++
 .../kylin/job/hadoop/cubev2/InMemCuboidJob.java |   5 -
 .../cube/BaseCuboidMapperPerformanceTest.java   |  65 -----
 .../job/hadoop/cube/BaseCuboidMapperTest.java   | 145 -----------
 .../HiveToBaseCuboidMapperPerformanceTest.java  |  65 +++++
 .../hadoop/cube/HiveToBaseCuboidMapperTest.java | 145 +++++++++++
 11 files changed, 576 insertions(+), 462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
index 895fd4f..ccfc5b1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.invertedindex.index;
 
+import com.google.common.base.Preconditions;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
@@ -100,6 +101,7 @@ public class RawTableRecord implements Cloneable {
         bytes.set(buf, digest.offset(col), digest.length(col));
     }
 
+
     @Override
     public Object clone() {
         return new RawTableRecord(this);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
index 5f7802a..06046c5 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.util.ToolRunner;
 
 public class BaseCuboidJob extends CuboidJob {
     public BaseCuboidJob() {
-        this.setMapperClass(BaseCuboidMapper.class);
+        this.setMapperClass(HiveToBaseCuboidMapper.class);
     }
 
     public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
deleted file mode 100644
index a023c0c..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.common.util.BytesSplitter;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author George Song (ysong1)
- */
-public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> {
-
-    private static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapper.class);
-
-    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
-    public static final byte[] ONE = Bytes.toBytes("1");
-
-    private String cubeName;
-    private String segmentName;
-    private Cuboid baseCuboid;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment cubeSegment;
-    private List<byte[]> nullBytes;
-
-    private CubeJoinedFlatTableDesc intermediateTableDesc;
-    private String intermediateTableRowDelimiter;
-    private byte byteRowDelimiter;
-
-    private int counter;
-    private int errorRecordCounter;
-    private Text outputKey = new Text();
-    private Text outputValue = new Text();
-    private Object[] measures;
-    private byte[][] keyBytesBuf;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-    private BytesSplitter bytesSplitter;
-    private AbstractRowKeyEncoder rowKeyEncoder;
-    private MeasureCodec measureCodec;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
-        if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
-            throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
-        }
-
-        byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
-
-        bytesSplitter = new BytesSplitter(200, 4096);
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
-        measureCodec = new MeasureCodec(cubeDesc.getMeasures());
-        measures = new Object[cubeDesc.getMeasures().size()];
-
-        int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        keyBytesBuf = new byte[colCount][];
-
-        initNullBytes();
-    }
-
-    private void initNullBytes() {
-        nullBytes = Lists.newArrayList();
-        nullBytes.add(HIVE_NULL);
-        String[] nullStrings = cubeDesc.getNullStrings();
-        if (nullStrings != null) {
-            for (String s : nullStrings) {
-                nullBytes.add(Bytes.toBytes(s));
-            }
-        }
-    }
-
-    private boolean isNull(byte[] v) {
-        for (byte[] nullByte : nullBytes) {
-            if (Bytes.equals(v, nullByte))
-                return true;
-        }
-        return false;
-    }
-
-    private byte[] buildKey(SplittedBytes[] splitBuffers) {
-        int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
-        for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
-            int index = rowKeyColumnIndexes[i];
-            keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
-            if (isNull(keyBytesBuf[i])) {
-                keyBytesBuf[i] = null;
-            }
-        }
-        return rowKeyEncoder.encode(keyBytesBuf);
-    }
-
-    private void buildValue(SplittedBytes[] splitBuffers) {
-
-        for (int i = 0; i < measures.length; i++) {
-            byte[] valueBytes = getValueBytes(splitBuffers, i);
-            measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
-        }
-
-        valueBuf.clear();
-        measureCodec.encode(measures, valueBuf);
-    }
-
-    private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
-        MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
-        FunctionDesc func = desc.getFunction();
-        ParameterDesc paramDesc = func.getParameter();
-        int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-
-        byte[] result = null;
-
-        // constant
-        if (flatTableIdx == null) {
-            result = Bytes.toBytes(paramDesc.getValue());
-        }
-        // column values
-        else {
-            // for multiple columns, their values are joined
-            for (int i = 0; i < flatTableIdx.length; i++) {
-                SplittedBytes split = splitBuffers[flatTableIdx[i]];
-                if (result == null) {
-                    result = Arrays.copyOf(split.value, split.length);
-                } else {
-                    byte[] newResult = new byte[result.length + split.length];
-                    System.arraycopy(result, 0, newResult, 0, result.length);
-                    System.arraycopy(split.value, 0, newResult, result.length, split.length);
-                    result = newResult;
-                }
-            }
-        }
-
-        if (func.isCount() || func.isHolisticCountDistinct()) {
-            // note for holistic count distinct, this value will be ignored
-            result = ONE;
-        }
-
-        if (isNull(result)) {
-            result = null;
-        }
-
-        return result;
-    }
-
-    @Override
-    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-
-        try {
-            bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-            outputKV(context);
-
-        } catch (Exception ex) {
-            handleErrorRecord(bytesSplitter, ex);
-        }
-    }
-
-    private void outputKV(Context context) throws IOException, InterruptedException {
-        intermediateTableDesc.sanityCheck(bytesSplitter);
-
-        byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
-        outputKey.set(rowKey, 0, rowKey.length);
-
-        buildValue(bytesSplitter.getSplitBuffers());
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
-        context.write(outputKey, outputValue);
-    }
-
-    private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
-
-        System.err.println("Insane record: " + bytesSplitter);
-        ex.printStackTrace(System.err);
-
-        errorRecordCounter++;
-        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
-            if (ex instanceof IOException)
-                throw (IOException) ex;
-            else if (ex instanceof RuntimeException)
-                throw (RuntimeException) ex;
-            else
-                throw new RuntimeException("", ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperBase.java
new file mode 100644
index 0000000..e2972dc
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperBase.java
@@ -0,0 +1,205 @@
+package org.apache.kylin.job.hadoop.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/27/15.
+ */
+public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
+    protected static final Logger logger = LoggerFactory.getLogger(HiveToBaseCuboidMapper.class);
+    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
+    public static final byte[] ONE = Bytes.toBytes("1");
+    protected String cubeName;
+    protected String segmentName;
+    protected Cuboid baseCuboid;
+    protected CubeInstance cube;
+    protected CubeDesc cubeDesc;
+    protected CubeSegment cubeSegment;
+    protected List<byte[]> nullBytes;
+    protected CubeJoinedFlatTableDesc intermediateTableDesc;
+    protected String intermediateTableRowDelimiter;
+    protected byte byteRowDelimiter;
+    protected int counter;
+    protected Object[] measures;
+    protected byte[][] keyBytesBuf;
+    protected BytesSplitter bytesSplitter;
+    protected AbstractRowKeyEncoder rowKeyEncoder;
+    protected MeasureCodec measureCodec;
+    private int errorRecordCounter;
+    private Text outputKey = new Text();
+    private Text outputValue = new Text();
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.publishConfiguration(context.getConfiguration());
+
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+        intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
+        if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
+            throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
+        }
+
+        byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+        intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+
+        bytesSplitter = new BytesSplitter(200, 4096);
+        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
+
+        measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+        measures = new Object[cubeDesc.getMeasures().size()];
+
+        int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+        keyBytesBuf = new byte[colCount][];
+
+        initNullBytes();
+    }
+
+    private void initNullBytes() {
+        nullBytes = Lists.newArrayList();
+        nullBytes.add(HIVE_NULL);
+        String[] nullStrings = cubeDesc.getNullStrings();
+        if (nullStrings != null) {
+            for (String s : nullStrings) {
+                nullBytes.add(Bytes.toBytes(s));
+            }
+        }
+    }
+
+    private boolean isNull(byte[] v) {
+        for (byte[] nullByte : nullBytes) {
+            if (Bytes.equals(v, nullByte))
+                return true;
+        }
+        return false;
+    }
+
+    private byte[] buildKey(SplittedBytes[] splitBuffers) {
+        int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+        for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
+            int index = rowKeyColumnIndexes[i];
+            keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
+            if (isNull(keyBytesBuf[i])) {
+                keyBytesBuf[i] = null;
+            }
+        }
+        return rowKeyEncoder.encode(keyBytesBuf);
+    }
+
+    private void buildValue(SplittedBytes[] splitBuffers) {
+
+        for (int i = 0; i < measures.length; i++) {
+            byte[] valueBytes = getValueBytes(splitBuffers, i);
+            measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+        }
+
+        valueBuf.clear();
+        measureCodec.encode(measures, valueBuf);
+    }
+
+    private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
+        MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
+        FunctionDesc func = desc.getFunction();
+        ParameterDesc paramDesc = func.getParameter();
+        int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+
+        byte[] result = null;
+
+        // constant
+        if (flatTableIdx == null) {
+            result = Bytes.toBytes(paramDesc.getValue());
+        }
+        // column values
+        else {
+            // for multiple columns, their values are joined
+            for (int i = 0; i < flatTableIdx.length; i++) {
+                SplittedBytes split = splitBuffers[flatTableIdx[i]];
+                if (result == null) {
+                    result = Arrays.copyOf(split.value, split.length);
+                } else {
+                    byte[] newResult = new byte[result.length + split.length];
+                    System.arraycopy(result, 0, newResult, 0, result.length);
+                    System.arraycopy(split.value, 0, newResult, result.length, split.length);
+                    result = newResult;
+                }
+            }
+        }
+
+        if (func.isCount() || func.isHolisticCountDistinct()) {
+            // note for holistic count distinct, this value will be ignored
+            result = ONE;
+        }
+
+        if (isNull(result)) {
+            result = null;
+        }
+
+        return result;
+    }
+
+    protected void outputKV(Context context) throws IOException, InterruptedException {
+        intermediateTableDesc.sanityCheck(bytesSplitter);
+
+        byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
+        outputKey.set(rowKey, 0, rowKey.length);
+
+        buildValue(bytesSplitter.getSplitBuffers());
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        context.write(outputKey, outputValue);
+    }
+
+    protected void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
+
+        System.err.println("Insane record: " + bytesSplitter);
+        ex.printStackTrace(System.err);
+
+        errorRecordCounter++;
+        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+            if (ex instanceof IOException)
+                throw (IOException) ex;
+            else if (ex instanceof RuntimeException)
+                throw (RuntimeException) ex;
+            else
+                throw new RuntimeException("", ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
new file mode 100644
index 0000000..599dde8
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.job.constant.BatchConstants;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Text> {
+
+    @Override
+    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+        counter++;
+        if (counter % BatchConstants.COUNTER_MAX == 0) {
+            logger.info("Handled " + counter + " records!");
+        }
+
+        try {
+            //put a record into the shared bytesSplitter
+            bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
+            //take care of the data in bytesSplitter
+            outputKV(context);
+
+        } catch (Exception ex) {
+            handleErrorRecord(bytesSplitter, ex);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/IIToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/IIToBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/IIToBaseCuboidMapper.java
new file mode 100644
index 0000000..68886c0
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/IIToBaseCuboidMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Queue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.FIFOIterable;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.*;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+
+/**
+ * honma
+ */
+public class IIToBaseCuboidMapper extends BaseCuboidMapperBase<ImmutableBytesWritable, Result> {
+    private Queue<IIRow> buffer = Lists.newLinkedList();
+    private Iterator<Slice> slices;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+
+        Configuration conf = context.getConfiguration();
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        String iiName = conf.get(BatchConstants.CFG_II_NAME);
+        IIInstance ii = IIManager.getInstance(config).getII(iiName);
+        IIDesc iiDesc = ii.getDescriptor();
+
+        TableRecordInfo info = new TableRecordInfo(iiDesc);
+        KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
+        slices = codec.decodeKeyValue(new FIFOIterable<IIRow>(buffer)).iterator();
+    }
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result cells, Context context) throws IOException, InterruptedException {
+        try {
+            IIRow iiRow = new IIRow();
+            for (Cell c : cells.rawCells()) {
+                iiRow.updateWith(c);
+            }
+            buffer.add(iiRow);
+
+            if (slices.hasNext()) {
+                Slice slice = slices.next();
+                TableRecordInfoDigest localDigest = slice.getInfo();
+                for (RawTableRecord record : slice) {
+
+                    counter++;
+                    if (counter % BatchConstants.COUNTER_MAX == 0) {
+                        logger.info("Handled " + counter + " records!");
+                    }
+
+                    for (int indexInRecord = 0; indexInRecord < localDigest.getColumnCount(); ++indexInRecord) {
+                        SplittedBytes columnBuffer = bytesSplitter.getSplitBuffer(indexInRecord);
+                        if (!localDigest.isMetrics(indexInRecord)) {
+                            String v = record.getValueMetric(indexInRecord);
+                            byte[] metricBytes = v.getBytes();
+                            System.arraycopy(metricBytes, 0, columnBuffer.value, 0, metricBytes.length);
+                            columnBuffer.length = metricBytes.length;
+                        } else {
+                            Dictionary<?> dictionary = slice.getLocalDictionaries().get(indexInRecord);
+                            Preconditions.checkArgument(columnBuffer.value.length > dictionary.getSizeOfValue(), "Column length too big");
+                            int vid = record.getValueID(indexInRecord);
+                            columnBuffer.length = dictionary.getValueBytesFromId(vid, columnBuffer.value, 0);
+                        }
+                    }
+
+                    outputKV(context);
+                }
+            }
+        } catch (Exception ex) {
+            handleErrorRecord(bytesSplitter, ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index f83e9d7..7a7c62e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -23,10 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
@@ -40,8 +37,6 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidMapper;
-import org.apache.kylin.job.hadoop.cube.CuboidJob;
 import org.apache.kylin.job.hadoop.cube.CuboidReducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperPerformanceTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperPerformanceTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperPerformanceTest.java
deleted file mode 100644
index 7826e86..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperPerformanceTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author yangli9
- * 
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class BaseCuboidMapperPerformanceTest {
-
-    String metadataUrl = "hbase:yadesk00:2181:/hbase-unsecure";
-    String cubeName = "test_kylin_cube_with_slr";
-    Path srcPath = new Path("/download/test_kylin_cube_with_slr_intermediate_table_64mb.seq");
-
-    @Ignore("convenient trial tool for dev")
-    @Test
-    public void test() throws IOException, InterruptedException {
-        Configuration hconf = new Configuration();
-        BaseCuboidMapper mapper = new BaseCuboidMapper();
-        Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
-
-        mapper.setup(context);
-
-        Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
-        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
-        Text value = new Text();
-
-        while (reader.next(key, value)) {
-            mapper.map(key, value, context);
-        }
-
-        reader.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
deleted file mode 100644
index c3632b7..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.math.BigDecimal;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowKeyDecoder;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class BaseCuboidMapperTest extends LocalFileMetadataTestCase {
-
-    MapDriver<Text, Text, Text, Text> mapDriver;
-    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
-    @Before
-    public void setUp() throws Exception {
-        createTestMetadata();
-
-        // hack for distributed cache
-        FileUtils.deleteDirectory(new File("../job/meta"));
-        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta"));
-
-        BaseCuboidMapper<Text> mapper = new BaseCuboidMapper<Text>();
-        mapDriver = MapDriver.newMapDriver(mapper);
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-        FileUtils.deleteDirectory(new File("../job/meta"));
-    }
-
-    @Test
-    public void testMapperWithHeader() throws Exception {
-        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
-        String segmentName = "20130331080000_20131212080000";
-        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-        // mapDriver.getConfiguration().set(BatchConstants.CFG_METADATA_URL,
-        // metadata);
-        mapDriver.withInput(new Text("key"), new Text("2012-12-15118480Health & BeautyFragrancesWomenAuction15123456789132.33"));
-        List<Pair<Text, Text>> result = mapDriver.run();
-
-        CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
-        CubeInstance cube = cubeMgr.getCube(cubeName);
-
-        assertEquals(1, result.size());
-        Text rowkey = result.get(0).getFirst();
-        byte[] key = rowkey.getBytes();
-        byte[] header = Bytes.head(key, 26);
-        byte[] sellerId = Bytes.tail(header, 18);
-        byte[] cuboidId = Bytes.head(header, 8);
-        byte[] restKey = Bytes.tail(key, rowkey.getLength() - 26);
-
-        RowKeyDecoder decoder = new RowKeyDecoder(cube.getFirstSegment());
-        decoder.decode(key);
-        assertEquals("[123456789, 2012-12-15, 11848, Health & Beauty, Fragrances, Women, Auction, 0, 15]", decoder.getValues().toString());
-
-        assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
-        assertEquals(511, Bytes.toLong(cuboidId));
-        assertEquals(22, restKey.length);
-
-        verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "132.33", "132.33", "132.33");
-    }
-
-    private void verifyMeasures(List<MeasureDesc> measures, Text valueBytes, String m1, String m2, String m3) {
-        MeasureCodec codec = new MeasureCodec(measures);
-        Object[] values = new Object[measures.size()];
-        codec.decode(valueBytes, values);
-        assertTrue(new BigDecimal(m1).equals(values[0]));
-        assertTrue(new BigDecimal(m2).equals(values[1]));
-        assertTrue(new BigDecimal(m3).equals(values[2]));
-    }
-
-    @Test
-    public void testMapperWithNull() throws Exception {
-        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
-        String segmentName = "20130331080000_20131212080000";
-        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-        // mapDriver.getConfiguration().set(BatchConstants.CFG_METADATA_URL,
-        // metadata);
-        mapDriver.withInput(new Text("key"), new Text("2012-12-15118480Health & BeautyFragrances\\NAuction15123456789\\N"));
-        List<Pair<Text, Text>> result = mapDriver.run();
-
-        CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
-        CubeInstance cube = cubeMgr.getCube(cubeName);
-
-        assertEquals(1, result.size());
-        Text rowkey = result.get(0).getFirst();
-        byte[] key = rowkey.getBytes();
-        byte[] header = Bytes.head(key, 26);
-        byte[] sellerId = Bytes.tail(header, 18);
-        byte[] cuboidId = Bytes.head(header, 8);
-        byte[] restKey = Bytes.tail(key, rowkey.getLength() - 26);
-
-        RowKeyDecoder decoder = new RowKeyDecoder(cube.getFirstSegment());
-        decoder.decode(key);
-        assertEquals("[123456789, 2012-12-15, 11848, Health & Beauty, Fragrances, null, Auction, 0, 15]", decoder.getValues().toString());
-
-        assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
-        assertEquals(511, Bytes.toLong(cuboidId));
-        assertEquals(22, restKey.length);
-
-        verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "0", "0", "0");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperPerformanceTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperPerformanceTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperPerformanceTest.java
new file mode 100644
index 0000000..cf9cfe0
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperPerformanceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class HiveToBaseCuboidMapperPerformanceTest {
+
+    String metadataUrl = "hbase:yadesk00:2181:/hbase-unsecure";
+    String cubeName = "test_kylin_cube_with_slr";
+    Path srcPath = new Path("/download/test_kylin_cube_with_slr_intermediate_table_64mb.seq");
+
+    @Ignore("convenient trial tool for dev")
+    @Test
+    public void test() throws IOException, InterruptedException {
+        Configuration hconf = new Configuration();
+        HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
+        Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
+
+        mapper.setup(context);
+
+        Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
+        Text value = new Text();
+
+        while (reader.next(key, value)) {
+            mapper.map(key, value, context);
+        }
+
+        reader.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperTest.java
new file mode 100644
index 0000000..f906fcb
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public class HiveToBaseCuboidMapperTest extends LocalFileMetadataTestCase {
+
+    MapDriver<Text, Text, Text, Text> mapDriver;
+    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
+
+    @Before
+    public void setUp() throws Exception {
+        createTestMetadata();
+
+        // hack for distributed cache
+        FileUtils.deleteDirectory(new File("../job/meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta"));
+
+        HiveToBaseCuboidMapper<Text> mapper = new HiveToBaseCuboidMapper<Text>();
+        mapDriver = MapDriver.newMapDriver(mapper);
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+        FileUtils.deleteDirectory(new File("../job/meta"));
+    }
+
+    @Test
+    public void testMapperWithHeader() throws Exception {
+        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        String segmentName = "20130331080000_20131212080000";
+        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+        // mapDriver.getConfiguration().set(BatchConstants.CFG_METADATA_URL,
+        // metadata);
+        mapDriver.withInput(new Text("key"), new Text("2012-12-15118480Health & BeautyFragrancesWomenAuction15123456789132.33"));
+        List<Pair<Text, Text>> result = mapDriver.run();
+
+        CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
+        CubeInstance cube = cubeMgr.getCube(cubeName);
+
+        assertEquals(1, result.size());
+        Text rowkey = result.get(0).getFirst();
+        byte[] key = rowkey.getBytes();
+        byte[] header = Bytes.head(key, 26);
+        byte[] sellerId = Bytes.tail(header, 18);
+        byte[] cuboidId = Bytes.head(header, 8);
+        byte[] restKey = Bytes.tail(key, rowkey.getLength() - 26);
+
+        RowKeyDecoder decoder = new RowKeyDecoder(cube.getFirstSegment());
+        decoder.decode(key);
+        assertEquals("[123456789, 2012-12-15, 11848, Health & Beauty, Fragrances, Women, Auction, 0, 15]", decoder.getValues().toString());
+
+        assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
+        assertEquals(511, Bytes.toLong(cuboidId));
+        assertEquals(22, restKey.length);
+
+        verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "132.33", "132.33", "132.33");
+    }
+
+    private void verifyMeasures(List<MeasureDesc> measures, Text valueBytes, String m1, String m2, String m3) {
+        MeasureCodec codec = new MeasureCodec(measures);
+        Object[] values = new Object[measures.size()];
+        codec.decode(valueBytes, values);
+        assertTrue(new BigDecimal(m1).equals(values[0]));
+        assertTrue(new BigDecimal(m2).equals(values[1]));
+        assertTrue(new BigDecimal(m3).equals(values[2]));
+    }
+
+    @Test
+    public void testMapperWithNull() throws Exception {
+        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        String segmentName = "20130331080000_20131212080000";
+        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+        // mapDriver.getConfiguration().set(BatchConstants.CFG_METADATA_URL,
+        // metadata);
+        mapDriver.withInput(new Text("key"), new Text("2012-12-15118480Health & BeautyFragrances\\NAuction15123456789\\N"));
+        List<Pair<Text, Text>> result = mapDriver.run();
+
+        CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
+        CubeInstance cube = cubeMgr.getCube(cubeName);
+
+        assertEquals(1, result.size());
+        Text rowkey = result.get(0).getFirst();
+        byte[] key = rowkey.getBytes();
+        byte[] header = Bytes.head(key, 26);
+        byte[] sellerId = Bytes.tail(header, 18);
+        byte[] cuboidId = Bytes.head(header, 8);
+        byte[] restKey = Bytes.tail(key, rowkey.getLength() - 26);
+
+        RowKeyDecoder decoder = new RowKeyDecoder(cube.getFirstSegment());
+        decoder.decode(key);
+        assertEquals("[123456789, 2012-12-15, 11848, Health & Beauty, Fragrances, null, Auction, 0, 15]", decoder.getValues().toString());
+
+        assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
+        assertEquals(511, Bytes.toLong(cuboidId));
+        assertEquals(22, restKey.length);
+
+        verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "0", "0", "0");
+    }
+}