You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:36 UTC

[24/50] [abbrv] incubator-kylin git commit: KYLIN-750 Merge cube segments from HBase table

KYLIN-750 Merge cube segments from HBase table


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/39ada70f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/39ada70f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/39ada70f

Branch: refs/heads/streaming-localdict
Commit: 39ada70f4732203fa599a1cf323f6ddecd73a8ff
Parents: 32ceeee
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Thu May 7 15:05:29 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Thu May 7 17:25:56 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/cube/kv/RowValueDecoder.java   |  34 ++-
 .../apache/kylin/job/cube/CubingJobBuilder.java |  56 ++++-
 .../hadoop/cubev2/MergeCuboidFromHBaseJob.java  | 123 ++++++++++
 .../cubev2/MergeCuboidFromHBaseMapper.java      | 246 +++++++++++++++++++
 .../kylin/metadata/model/MeasureDesc.java       |   2 +-
 5 files changed, 440 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java b/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
index ace347a..99c2485 100644
--- a/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
+++ b/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
@@ -18,10 +18,6 @@
 
 package org.apache.kylin.cube.kv;
 
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Collection;
-
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
@@ -33,17 +29,19 @@ import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+
 /**
- * 
  * @author xjiang
- * 
  */
 public class RowValueDecoder implements Cloneable {
 
     private final HBaseColumnDesc hbaseColumn;
     private final byte[] hbaseColumnFamily;
     private final byte[] hbaseColumnQualifier;
-    
+
     private final MeasureCodec codec;
     private final BitSet projectionIndex;
     private final MeasureDesc[] measures;
@@ -60,19 +58,29 @@ public class RowValueDecoder implements Cloneable {
     }
 
     public void decode(Result hbaseRow) {
-        decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier));
+        decode(hbaseRow, true);
+    }
+
+    public void decode(Result hbaseRow, boolean convertToJavaObject) {
+        decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
     }
-    
+
     public void decode(byte[] bytes) {
-        decode(ByteBuffer.wrap(bytes));
+        decode(bytes, true);
+    }
+
+    public void decode(byte[] bytes, boolean convertToJavaObject) {
+        decode(ByteBuffer.wrap(bytes), convertToJavaObject);
     }
 
-    private void decode(ByteBuffer buffer) {
+    private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
         codec.decode(buffer, values);
-        convertToJavaObjects(values, values);
+        if (convertToJavaObject) {
+            convertToJavaObjects(values, values, convertToJavaObject);
+        }
     }
 
-    private void convertToJavaObjects(Object[] mapredObjs, Object[] results) {
+    private void convertToJavaObjects(Object[] mapredObjs, Object[] results, boolean convertToJavaObject) {
         for (int i = 0; i < mapredObjs.length; i++) {
             Object o = mapredObjs[i];
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 27613df..e4a0fae 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -32,6 +32,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.hadoop.cube.*;
 import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
 import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
 import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
@@ -69,8 +70,6 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
 
         if (this.inMemoryCubing()) {
-            // bulk load step
-            result.addTask(createBulkLoadStep(seg, result.getId()));
             result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, baseCuboidStepId, jobId));
         } else {
             // convert htable
@@ -103,16 +102,24 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         List<String> mergingSegmentIds = Lists.newArrayList();
         List<String> mergingCuboidPaths = Lists.newArrayList();
+        List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
+            mergingHTables.add(merging.getStorageLocationIdentifier());
             if (merging.equals(appendSegment))
                 mergingCuboidPaths.add(appendRootPath + "*");
             else
                 mergingCuboidPaths.add(getPathToMerge(merging));
         }
 
-        // merge cuboid
-        addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
+        if(this.inMemoryCubing()) {
+            // merge from HTable
+            addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
+        } else {
+            // merge cuboid
+            addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
+        }
+
 
         // convert htable
         AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
@@ -134,14 +141,20 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         List<String> mergingSegmentIds = Lists.newArrayList();
         List<String> mergingCuboidPaths = Lists.newArrayList();
+        List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
             mergingCuboidPaths.add(getPathToMerge(merging));
+            mergingHTables.add(merging.getStorageLocationIdentifier());
         }
 
-        // merge cuboid
-        addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
-
+        if(this.inMemoryCubing()) {
+            // merge from HTable
+            addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
+        } else {
+            // merge cuboid
+            addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
+        }
         // convert htable
         AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
 
@@ -159,6 +172,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath));
     }
 
+
+    void addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
+
+        result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
+
+        String formattedTables = StringUtils.join(mergingHTables, ",");
+        result.addTask(createMergeCuboidDataFromHBaseStep(seg, formattedTables, mergedCuboidPath));
+    }
+
     Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
         final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
         final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
@@ -190,6 +212,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             result.addTask(createCreateHTableStep(seg, jobId));
             baseCuboidStep = createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId());
             result.addTask(baseCuboidStep);
+            // bulk load step
+            result.addTask(createBulkLoadStep(seg, result.getId()));
         }
 
         return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep);
@@ -480,6 +504,24 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         return mergeCuboidDataStep;
     }
 
+
+    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(CubeSegment seg, String inputPath, String outputPath) {
+        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", outputPath);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+        return mergeCuboidDataStep;
+    }
+
     private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String convertToHFileStepId, String jobId) {
         UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
new file mode 100644
index 0000000..07d16f8
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.CuboidJob;
+import org.apache.kylin.job.hadoop.cube.CuboidReducer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromHBaseJob extends CuboidJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+
+
+            Configuration conf = this.getConf();
+            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+            // start job
+            String jobName = getOptionValue(OPTION_JOB_NAME);
+            System.out.println("Starting: " + jobName);
+            job = Job.getInstance(conf, jobName);
+
+            setJobClasspath(job);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            FileOutputFormat.setOutputPath(job, output);
+
+
+            List<Scan> scans = new ArrayList<Scan>();
+            for (String htable : StringSplitter.split(getOptionValue(OPTION_INPUT_PATH), ",")) {
+                Scan scan = new Scan();
+                scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
+                scan.setCacheBlocks(false);  // don't set to true for MR jobs
+                scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+                scans.add(scan);
+            }
+
+            TableMapReduceUtil.initTableMapperJob(scans, MergeCuboidFromHBaseMapper.class, Text.class,
+                    Text.class, job);
+
+            // Reducer - only one
+            job.setReducerClass(CuboidReducer.class);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+
+            // add metadata to distributed cache
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            setReduceTaskNum(job, config, cubeName, 0);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (
+                Exception e
+                )
+
+        {
+            logger.error("error in MergeCuboidFromHBaseJob", e);
+            printUsage(options);
+            throw e;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
new file mode 100644
index 0000000..cc71351
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowValueDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.job.hadoop.cube.KeyValueCreator;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromHBaseMapper extends TableMapper<Text, Text> {
+
+    private KylinConfig config;
+    private String cubeName;
+    private String segmentName;
+    private CubeManager cubeManager;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private CubeSegment mergedCubeSegment;
+    private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
+    // life cycle
+
+    private Text outputKey = new Text();
+
+    private byte[] newKeyBuf;
+    private RowKeySplitter rowKeySplitter;
+
+    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+    private Text outputValue = new Text();
+    private RowValueDecoder[] rowValueDecoders;
+    private boolean simpleFullCopy = false;
+    private Object[] result;
+    private MeasureCodec codec;
+    private int[][] hbaseColumnToMeasureMapping;
+
+    private Boolean checkNeedMerging(TblColRef col) throws IOException {
+        Boolean ret = dictsNeedMerging.get(col);
+        if (ret != null)
+            return ret;
+        else {
+            ret = cubeDesc.getRowkey().isUseDictionary(col);
+            if (ret) {
+                String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+            }
+            dictsNeedMerging.put(col, ret);
+            return ret;
+        }
+    }
+
+
+    private CubeSegment findSegmentWithHTable(String htable, CubeInstance cubeInstance) {
+        for (CubeSegment segment : cubeInstance.getSegments()) {
+            String segmentHtable = segment.getStorageLocationIdentifier();
+            if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htable)) {
+                return segment;
+            }
+        }
+
+        throw new IllegalStateException("No merging segment's storage location identifier equals " + htable);
+
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        HadoopUtil.setCurrentConfiguration(context.getConfiguration());
+
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+
+        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeManager = CubeManager.getInstance(config);
+        cube = cubeManager.getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+        newKeyBuf = new byte[256];// size will auto-grow
+
+        TableSplit currentSplit = (TableSplit) context.getInputSplit();
+        byte[] tableName = currentSplit.getTableName();
+        String htableName = Bytes.toString(tableName);
+        // decide which source segment
+        System.out.println("htable:" + htableName);
+        sourceCubeSegment = findSegmentWithHTable(htableName, cube);
+        System.out.println(sourceCubeSegment);
+
+        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+
+        List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+        List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                valueDecoderList.add(new RowValueDecoder(colDesc));
+                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+            }
+        }
+
+        rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+
+        simpleFullCopy = (keyValueCreators.size() == 1 && keyValueCreators.get(0).isFullCopy);
+        result = new Object[cubeDesc.getMeasures().size()];
+        codec = new MeasureCodec(cubeDesc.getMeasures());
+
+        hbaseColumnToMeasureMapping = new int[rowValueDecoders.length][];
+
+        for (int i = 0; i < rowValueDecoders.length; i++) {
+            hbaseColumnToMeasureMapping[i] = new int[rowValueDecoders[i].getMeasures().length];
+            for (int j = 0; j < rowValueDecoders[i].getMeasures().length; j++) {
+                int positionInCubeMeasures = 0;
+                for (MeasureDesc m : cubeDesc.getMeasures()) {
+                    if (m.equals(rowValueDecoders[i].getMeasures()[j])) {
+                        hbaseColumnToMeasureMapping[i][j] = positionInCubeMeasures;
+                        break;
+                    }
+                    positionInCubeMeasures++;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
+        long cuboidID = rowKeySplitter.split(key.get(), key.get().length);
+        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+
+        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+        int bufOffset = 0;
+        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
+        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+            TblColRef col = cuboid.getColumns().get(i);
+
+            if (this.checkNeedMerging(col)) {
+                // if dictionary on fact table column, needs rewrite
+                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+                while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBuf;
+                    newKeyBuf = new byte[2 * newKeyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                }
+
+                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
+                int idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
+
+                bufOffset += mergedDict.getSizeOfId();
+            } else {
+                // keep as it is
+                while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBuf;
+                    newKeyBuf = new byte[2 * newKeyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                }
+
+                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
+                bufOffset += splittedByteses[i + 1].length;
+            }
+        }
+        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
+        outputKey.set(newKey, 0, newKey.length);
+
+        valueBuf.clear();
+        if (simpleFullCopy) {
+            // simple case, should only 1 hbase column and the bytes sequence is same
+            for (Cell cell : value.rawCells()) {
+                valueBuf.put(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+            }
+        } else {
+            // complex case, need re-arrange the bytes sequence
+            for (int i = 0; i < rowValueDecoders.length; i++) {
+                rowValueDecoders[i].decode(value, false);
+                Object[] measureValues = rowValueDecoders[i].getValues();
+
+                for (int j = 0; j < measureValues.length; j++) {
+                    result[hbaseColumnToMeasureMapping[i][j]] = measureValues[j];
+                }
+            }
+            codec.encode(result, valueBuf);
+        }
+
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        context.write(outputKey, outputValue);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index fd42d45..ee4a972 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -80,7 +80,7 @@ public class MeasureDesc {
 
         MeasureDesc that = (MeasureDesc) o;
 
-        if (id != that.id)
+        if (!name.equalsIgnoreCase(that.getName()))
             return false;
 
         return true;