You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/23 08:17:17 UTC

[2/3] incubator-kylin git commit: add shard as header

add shard as header


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1d351433
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1d351433
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1d351433

Branch: refs/heads/KYLIN-942
Commit: 1d35143323efefdb681fc50c50e6f67f764b9d09
Parents: ec307f6
Author: honma <ho...@ebay.com>
Authored: Tue Sep 22 18:44:26 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Sep 23 14:19:45 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BytesUtil.java | 20 ++++++-
 .../apache/kylin/common/util/ShardingHash.java  | 23 +++++++-
 .../org/apache/kylin/common/util/BasicTest.java | 20 ++-----
 .../java/org/apache/kylin/cube/CubeSegment.java | 45 +++++++++++++---
 .../kylin/cube/common/RowKeySplitter.java       | 31 +++++++----
 .../org/apache/kylin/cube/cuboid/Cuboid.java    |  4 +-
 .../kylin/cube/kv/AbstractRowKeyEncoder.java    |  5 ++
 .../apache/kylin/cube/kv/FuzzyMaskEncoder.java  |  6 +--
 .../org/apache/kylin/cube/kv/RowConstants.java  |  3 ++
 .../org/apache/kylin/cube/kv/RowKeyDecoder.java |  4 +-
 .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 56 +++++++++-----------
 .../kylin/cube/common/RowKeySplitterTest.java   | 20 +++----
 .../apache/kylin/cube/kv/RowKeyDecoderTest.java | 10 ++--
 .../apache/kylin/cube/kv/RowKeyEncoderTest.java | 32 +++++------
 .../kylin/storage/translate/HBaseKeyRange.java  |  6 ++-
 .../kylin/engine/mr/common/CuboidShardUtil.java |  9 ++--
 .../mr/steps/MapContextGTRecordWriter.java      | 24 ++++-----
 .../mr/steps/MergeCuboidFromStorageMapper.java  |  7 ++-
 .../engine/mr/steps/MergeCuboidMapper.java      |  9 ++--
 .../kylin/engine/mr/steps/NDCuboidMapper.java   | 25 +++++++--
 .../coprocessor/CoprocessorProjector.java       |  2 +-
 .../common/coprocessor/CoprocessorRowType.java  |  2 +-
 .../storage/hbase/cube/v1/CubeStorageQuery.java | 50 +++++++++++++++--
 .../storage/hbase/steps/CreateHTableJob.java    |  2 +-
 24 files changed, 271 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0880da1..0d4dba9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -25,14 +25,23 @@ public class BytesUtil {
 
     public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
 
-    public static void writeLong(long num, byte[] bytes, int offset, int size) {
+    public static void writeShort(short num, byte[] bytes, int offset, int size) {
         for (int i = offset + size - 1; i >= offset; i--) {
             bytes[i] = (byte) num;
             num >>>= 8;
         }
     }
 
-    public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+    public static long readShort(byte[] bytes, int offset, int size) {
+        short num = 0;
+        for (int i = offset, n = offset + size; i < n; i++) {
+            num <<= 8;
+            num |= (short) bytes[i] & 0xFF;
+        }
+        return num;
+    }
+
+    public static void writeLong(long num, byte[] bytes, int offset, int size) {
         for (int i = offset + size - 1; i >= offset; i--) {
             bytes[i] = (byte) num;
             num >>>= 8;
@@ -48,6 +57,13 @@ public class BytesUtil {
         return integer;
     }
 
+    public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+        for (int i = offset + size - 1; i >= offset; i--) {
+            bytes[i] = (byte) num;
+            num >>>= 8;
+        }
+    }
+
     public static int readUnsigned(byte[] bytes, int offset, int size) {
         int integer = 0;
         for (int i = offset, n = offset + size; i < n; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
index 42eb443..97feda1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
@@ -26,21 +26,42 @@ public class ShardingHash {
     static HashFunction hashFunc = Hashing.murmur3_128();
 
     public static short getShard(int integerValue, int totalShards) {
+        if (totalShards <= 1) {
+            return 0;
+        }
         long hash = hashFunc.hashInt(integerValue).asLong();
         return _getShard(hash, totalShards);
     }
 
     public static short getShard(long longValue, int totalShards) {
+        if (totalShards <= 1) {
+            return 0;
+        }
         long hash = hashFunc.hashLong(longValue).asLong();
         return _getShard(hash, totalShards);
     }
 
     public static short getShard(byte[] byteValues, int offset, int length, int totalShards) {
+        if (totalShards <= 1) {
+            return 0;
+        }
+
         long hash = hashFunc.hashBytes(byteValues, offset, length).asLong();
         return _getShard(hash, totalShards);
     }
 
+    public static short getShard(short cuboidShardBase, short shardOffset, int totalShards) {
+        if (totalShards <= 1) {
+            return 0;
+        }
+        return (short) ((cuboidShardBase + shardOffset) % totalShards);
+    }
+
     private static short _getShard(long hash, int totalShard) {
-        return (short) (Math.abs(hash) % totalShard);
+        long x = hash % totalShard;
+        if (x < 0) {
+            x += totalShard;
+        }
+        return (short) x;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 21deaba..1590c92 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -27,7 +27,6 @@ import java.util.Calendar;
 import java.util.IdentityHashMap;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.math3.primes.Primes;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -65,21 +64,9 @@ public class BasicTest {
 
     @Test
     public void testxx() {
-        byte[] temp = new byte[] { 1, 2, 3 };
-        byte[] temp2 = new byte[] { 1, 2, 3 };
-
-        System.out.println(temp.hashCode());
-        System.out.println(temp2.hashCode());
-
-        ByteBuffer buffer = ByteBuffer.allocateDirect(3);
-        buffer.put((byte) 1);
-        buffer.put((byte) 1);
-        buffer.put((byte) 1);
-        buffer.put((byte) 1);
-        System.out.println(buffer.position());
-        System.out.println(buffer.limit());
-        System.out.println(buffer.capacity());
-
+        short x = -64;
+        int y = (int)x;
+        System.out.println(y);
     }
 
     @Test
@@ -103,6 +90,7 @@ public class BasicTest {
         a.put(s1, null);
         b.put(s2, null);
     }
+
     @Test
     @Ignore("convenient trial tool for dev")
     public void testX() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d324b9c..1a44fcf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -25,6 +25,7 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
@@ -37,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonBackReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
@@ -67,8 +69,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     private String lastBuildJobID;
     @JsonProperty("create_time_utc")
     private long createTimeUTC;
-    @JsonProperty("cuboid_shards")
-    private Map<Long, Short> cuboidShards = null;
+    @JsonProperty("cuboid_shard_nums")
+    private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
     @JsonProperty("total_shards")
     private int totalShards = 0;
 
@@ -80,6 +82,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     @JsonProperty("snapshots")
     private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path
 
+    private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid
+
     public CubeDesc getCubeDesc() {
         return getCubeInstance().getDescriptor();
     }
@@ -364,14 +368,31 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
         return cubeInstance.getStorageType();
     }
 
-    public Map<Long, Short> getCuboidShards() {
-        return cuboidShards;
+    /**
+     * get the number of shards where each cuboid will distribute
+     * @return
+     */
+    public Short getCuboidShardNum(Long cuboidId) {
+        Short ret = this.cuboidShardNums.get(cuboidId);
+        if (ret == null) {
+            return 1;
+        } else {
+            return ret;
+        }
     }
 
-    public void setCuboidShards(Map<Long, Short> newCuboidShards) {
-        this.cuboidShards = newCuboidShards;
+    //    /**
+    //     * get the number of shards where each cuboid will distribute
+    //     * @return
+    //     */
+    //    public Map<Long, Short> getCuboidShards() {
+    //        return this.cuboidShards;
+    //    }
+
+    public void setCuboidShardNums(Map<Long, Short> newCuboidShards) {
+        this.cuboidShardNums = newCuboidShards;
     }
-    
+
     public int getTotalShards() {
         return totalShards;
     }
@@ -379,4 +400,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     public void setTotalShards(int totalShards) {
         this.totalShards = totalShards;
     }
+
+    public short getCuboidBaseShard(Long cuboidId) {
+        Short ret = cuboidBaseShards.get(cuboidId);
+        if (ret == null) {
+            ret = ShardingHash.getShard(cuboidId, totalShards);
+            cuboidBaseShards.put(cuboidId, ret);
+        }
+        return ret;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index 7e379dd..0111cee 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -27,10 +27,6 @@ import org.apache.kylin.cube.kv.RowKeyColumnIO;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
-/**
- * @author George Song (ysong1)
- * 
- */
 public class RowKeySplitter {
 
     private CubeDesc cubeDesc;
@@ -39,6 +35,9 @@ public class RowKeySplitter {
     private SplittedBytes[] splitBuffers;
     private int bufferSize;
 
+    private long lastSplittedCuboidId;
+    private short lastSplittedShard;
+
     public SplittedBytes[] getSplitBuffers() {
         return splitBuffers;
     }
@@ -47,6 +46,14 @@ public class RowKeySplitter {
         return bufferSize;
     }
 
+    public long getLastSplittedCuboidId() {
+        return lastSplittedCuboidId;
+    }
+
+    public short getLastSplittedShard() {
+        return lastSplittedShard;
+    }
+
     public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
         this.cubeDesc = cubeSeg.getCubeDesc();
         this.colIO = new RowKeyColumnIO(cubeSeg);
@@ -60,21 +67,27 @@ public class RowKeySplitter {
 
     /**
      * @param bytes
-     * @param byteLen
      * @return cuboid ID
      */
-    public long split(byte[] bytes, int byteLen) {
+    public long split(byte[] bytes) {
         this.bufferSize = 0;
         int offset = 0;
 
+        // extract shard
+        SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++];
+        shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN;
+        System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        offset += RowConstants.ROWKEY_SHARDID_LEN;
+
         // extract cuboid id
         SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
         cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN;
         System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
         offset += RowConstants.ROWKEY_CUBOIDID_LEN;
 
-        long cuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+        lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
+        lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length);
+        Cuboid cuboid = Cuboid.findById(cubeDesc, lastSplittedCuboidId);
 
         // rowkey columns
         for (int i = 0; i < cuboid.getColumns().size(); i++) {
@@ -86,6 +99,6 @@ public class RowKeySplitter {
             offset += colLength;
         }
 
-        return cuboidId;
+        return lastSplittedCuboidId;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index a7b2de4..9ee2315 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -28,6 +28,7 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.RowKeyColDesc;
@@ -36,9 +37,6 @@ import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
 import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
 import org.apache.kylin.metadata.model.TblColRef;
 
-/**
- * @author George Song (ysong1)
- */
 public class Cuboid implements Comparable<Cuboid> {
 
     private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index f566f5c..1e24432 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -62,6 +62,7 @@ public abstract class AbstractRowKeyEncoder {
 
     protected final Cuboid cuboid;
     protected byte blankByte = DEFAULT_BLANK_BYTE;
+    protected boolean encodeShard = true;
 
     protected AbstractRowKeyEncoder(Cuboid cuboid) {
         this.cuboid = cuboid;
@@ -71,6 +72,10 @@ public abstract class AbstractRowKeyEncoder {
         this.blankByte = blankByte;
     }
 
+    public void setEncodeShard(boolean encodeShard) {
+        this.encodeShard = encodeShard;
+    }
+
     abstract public byte[] encode(Map<TblColRef, String> valueMap);
 
     abstract public byte[] encode(byte[][] values);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 5077287..1c2f4ee 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -36,11 +36,9 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
     }
 
     @Override
-    protected int fillHeader(byte[] bytes, byte[][] values) {
+    protected int fillHeader(byte[] bytes) {
         // always fuzzy match cuboid ID to lock on the selected cuboid
-        int cuboidStart = this.headerLength - RowConstants.ROWKEY_CUBOIDID_LEN;
-        Arrays.fill(bytes, 0, cuboidStart, RowConstants.FUZZY_MASK_ONE);
-        Arrays.fill(bytes, cuboidStart, this.headerLength, RowConstants.FUZZY_MASK_ZERO);
+        Arrays.fill(bytes, 0, RowConstants.ROWKEY_HEADER_LEN, RowConstants.FUZZY_MASK_ZERO);
         return this.headerLength;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 5db37aa..af4adbb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -26,11 +26,14 @@ public class RowConstants {
     public static final byte ROWKEY_LOWER_BYTE = 0;
     // row key upper bound
     public static final byte ROWKEY_UPPER_BYTE = (byte) 0xff;
+
     // row key cuboid id length
     public static final int ROWKEY_CUBOIDID_LEN = 8;
     // row key shard length
     public static final int ROWKEY_SHARDID_LEN = 2;
 
+    public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
+
     // fuzzy mask
     public static final byte FUZZY_MASK_ZERO = 0;
     public static final byte FUZZY_MASK_ONE = 1;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 1b896a0..3506845 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -53,12 +53,12 @@ public class RowKeyDecoder {
     public long decode(byte[] bytes) throws IOException {
         this.values.clear();
 
-        long cuboidId = rowKeySplitter.split(bytes, bytes.length);
+        long cuboidId = rowKeySplitter.split(bytes);
         initCuboid(cuboidId);
 
         SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
 
-        int offset = 1; // skip cuboid id part
+        int offset = 2; // skip shard and cuboid id part
 
         for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
             TblColRef col = this.cuboid.getColumns().get(i);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 7f8bbd3..736a2b9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -24,56 +24,33 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.TblColRef;
 
-/**
- * @author George Song (ysong1)
- */
 public class RowKeyEncoder extends AbstractRowKeyEncoder {
 
     private int bytesLength;
     protected int headerLength;
     private RowKeyColumnIO colIO;
+    CubeSegment cubeSeg;
 
     protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
         super(cuboid);
+        this.cubeSeg = cubeSeg;
         colIO = new RowKeyColumnIO(cubeSeg);
-        bytesLength = headerLength = RowConstants.ROWKEY_CUBOIDID_LEN; // header
+        bytesLength = headerLength = RowConstants.ROWKEY_HEADER_LEN; // include shard and cuboidid 
         for (TblColRef column : cuboid.getColumns()) {
             bytesLength += colIO.getColumnLength(column);
         }
     }
 
-    public RowKeyColumnIO getColumnIO() {
-        return colIO;
-    }
-
-    public int getColumnOffset(TblColRef col) {
-        int offset = RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        for (TblColRef dimCol : cuboid.getColumns()) {
-            if (col.equals(dimCol))
-                return offset;
-            offset += colIO.getColumnLength(dimCol);
-        }
-
-        throw new IllegalArgumentException("Column " + col + " not found on cuboid " + cuboid);
-    }
-
     public int getColumnLength(TblColRef col) {
         return colIO.getColumnLength(col);
     }
 
-    public int getRowKeyLength() {
-        return bytesLength;
-    }
-
-    public int getHeaderLength() {
-        return headerLength;
-    }
-
     @Override
     public byte[] encode(Map<TblColRef, String> valueMap) {
         List<byte[]> valueList = new ArrayList<byte[]>();
@@ -95,7 +72,8 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
     @Override
     public byte[] encode(byte[][] values) {
         byte[] bytes = new byte[this.bytesLength];
-        int offset = fillHeader(bytes, values);
+        int bodyOffset = RowConstants.ROWKEY_HEADER_LEN;
+        int offset = bodyOffset;
 
         for (int i = 0; i < cuboid.getColumns().size(); i++) {
             TblColRef column = cuboid.getColumns().get(i);
@@ -107,18 +85,34 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
                 fillColumnValue(column, colLength, value, value.length, bytes, offset);
             }
             offset += colLength;
-
         }
+
+        //fill shard and cuboid
+        fillHeader(bytes);
+
         return bytes;
     }
 
-    protected int fillHeader(byte[] bytes, byte[][] values) {
+    protected int fillHeader(byte[] bytes) {
         int offset = 0;
+
+        if (encodeShard) {
+            short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+            short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
+            short finalShard = ShardingHash.getShard(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
+            BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+        } else {
+            BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+        }
+        offset += RowConstants.ROWKEY_SHARDID_LEN;
+
         System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
         offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
         if (this.headerLength != offset) {
             throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
         }
+
         return offset;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
index 9a7970c..98f1eef 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
@@ -28,10 +28,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * @author George Song (ysong1)
- * 
- */
 public class RowKeySplitterTest extends LocalFileMetadataTestCase {
 
     @Before
@@ -49,23 +45,23 @@ public class RowKeySplitterTest extends LocalFileMetadataTestCase {
     public void testWithSlr() throws Exception {
         CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITH_SLR_READY");
 
-        RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20);
+        RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20);
         // base cuboid rowkey
-        byte[] input = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
-        rowKeySplitter.split(input, input.length);
+        byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+        rowKeySplitter.split(input);
 
-        assertEquals(10, rowKeySplitter.getBufferSize());
+        assertEquals(11, rowKeySplitter.getBufferSize());
     }
 
     @Test
     public void testWithoutSlr() throws Exception {
         CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_READY");
 
-        RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20);
+        RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20);
         // base cuboid rowkey
-        byte[] input = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
-        rowKeySplitter.split(input, input.length);
+        byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
+        rowKeySplitter.split(input);
 
-        assertEquals(9, rowKeySplitter.getBufferSize());
+        assertEquals(10, rowKeySplitter.getBufferSize());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
index 3704e03..d6b1718 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
@@ -34,10 +34,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * @author George Song (ysong1)
- * 
- */
 public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
 
     @Before
@@ -57,7 +53,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
 
         RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
 
-        byte[] key = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
+        byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
 
         rowKeyDecoder.decode(key);
         List<String> values = rowKeyDecoder.getValues();
@@ -70,7 +66,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
 
         RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
 
-        byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+        byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
 
         rowKeyDecoder.decode(key);
         List<String> values = rowKeyDecoder.getValues();
@@ -97,7 +93,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
         AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
 
         byte[] encodedKey = rowKeyEncoder.encode(data);
-        assertEquals(30, encodedKey.length);
+        assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
 
         RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
         rowKeyDecoder.decode(encodedKey);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
index c50b8c9..45c8108 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
@@ -35,10 +35,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * @author George Song (ysong1)
- * 
- */
 public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
 
     @Before
@@ -74,9 +70,11 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
         AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
 
         byte[] encodedKey = rowKeyEncoder.encode(data);
-        assertEquals(30, encodedKey.length);
-        byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
-        byte[] rest = Arrays.copyOfRange(encodedKey, 8, encodedKey.length);
+        assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+        byte[] rest = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        assertEquals(0, Bytes.toShort(shard));
         assertEquals(255, Bytes.toLong(cuboidId));
         assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest);
     }
@@ -104,10 +102,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
         AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
 
         byte[] encodedKey = rowKeyEncoder.encode(data);
-        assertEquals(48, encodedKey.length);
-        byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26);
-        byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
-        byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length);
+        assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN);
+        byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+        byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        assertEquals(0, Bytes.toShort(shard));
         assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
         assertEquals(511, Bytes.toLong(cuboidId));
         assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest);
@@ -136,10 +136,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
         AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
 
         byte[] encodedKey = rowKeyEncoder.encode(data);
-        assertEquals(48, encodedKey.length);
-        byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26);
-        byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
-        byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length);
+        assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+        byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN);
+        byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        assertEquals(0, Bytes.toShort(shard));
         assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
         assertEquals(511, Bytes.toLong(cuboidId));
         assertArrayEquals(new byte[] { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }, rest);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
index 47553ad..bdcd257 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
@@ -119,7 +119,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
         }
 
         AbstractRowKeyEncoder encoder = AbstractRowKeyEncoder.createInstance(cubeSeg, cuboid);
-
+        encoder.setEncodeShard(false);//will enumerate all possible shards when scanning
+        
         encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
 
         this.startKey = encoder.encode(startValues);
@@ -133,7 +134,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
         // restore encoder defaults for later reuse (note
         // AbstractRowKeyEncoder.createInstance() caches instances)
         encoder.setBlankByte(AbstractRowKeyEncoder.DEFAULT_BLANK_BYTE);
-
+        
+        encoder.setEncodeShard(true);
         // always fuzzy match cuboid ID to lock on the selected cuboid
         this.fuzzyKeys = buildFuzzyKeys(fuzzyValues);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
index 3fb3f93..4839ab0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.common;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.commons.collections4.map.DefaultedMap;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -35,9 +34,9 @@ import com.google.common.collect.Maps;
 public class CuboidShardUtil {
     protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class);
 
-    public static Map<Long, Short> loadCuboidShards(CubeSegment segment) {
-        return DefaultedMap.defaultedMap(segment.getCuboidShards(), (short) 1);
-    }
+//    public static Map<Long, Short> loadCuboidShards(CubeSegment segment) {
+//        return DefaultedMap.defaultedMap(segment.getCuboidShards(), (short) 1);
+//    }
 
     public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards,int totalShards) throws IOException {
         CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -53,7 +52,7 @@ public class CuboidShardUtil {
                 }
             }
         });
-        segment.setCuboidShards(filered);
+        segment.setCuboidShardNums(filered);
         segment.setTotalShards(totalShards);
 
         CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 24043fc..58e820f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -7,7 +7,7 @@ import java.util.BitSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,14 +39,12 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
     private long cuboidRowCount = 0;
 
     //for shard
-    private int totalShards = 0;
 
     public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
         this.mapContext = mapContext;
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
         this.measureCount = cubeDesc.getMeasures().size();
-        this.totalShards = cubeSegment.getTotalShards();
     }
 
     @Override
@@ -63,19 +61,19 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
         }
 
         cuboidRowCount++;
-        int preamble = RowConstants.ROWKEY_CUBOIDID_LEN + RowConstants.ROWKEY_SHARDID_LEN;
+        int preamble = RowConstants.ROWKEY_HEADER_LEN;
         int offSet = preamble;
         for (int x = 0; x < dimensions; x++) {
             System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
             offSet += record.get(x).length();
         }
 
-        short shardForThisRec = 0;
-        if (totalShards != 0) {
-            shardForThisRec = ShardingHash.getShard(keyBuf, preamble, offSet, totalShards);
-        }
-        com.google.common.primitives.Bytes.
-        System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
+        //fill shard
+        short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
+        short shardOffset = ShardingHash.getShard(keyBuf, preamble, offSet - preamble, cuboidShardNum);
+        Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
+        short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
+        BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
 
         //output measures
         valueBuf.clear();
@@ -96,8 +94,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
     }
 
     private void initVariables(Long cuboidId) {
-        bytesLength = RowConstants.ROWKEY_SHARDID_LEN;
-        bytesLength += RowConstants.ROWKEY_CUBOIDID_LEN;
+        bytesLength = RowConstants.ROWKEY_HEADER_LEN;
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
         for (TblColRef column : cuboid.getColumns()) {
             bytesLength += cubeSegment.getColumnLength(column);
@@ -110,6 +107,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
             measureColumnsIndex[i] = dimensions + i;
         }
 
-        System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+        //write cuboid id first
+        BytesUtil.writeLong(cuboidId, keyBuf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 4598673..9b25c97 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -124,11 +124,16 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
 
         Preconditions.checkState(key.offset() == 0);
 
-        long cuboidID = rowKeySplitter.split(key.array(), key.length());
+        long cuboidID = rowKeySplitter.split(key.array());
+        short shard = rowKeySplitter.getLastSplittedShard();
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
 
         SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
         int bufOffset = 0;
+
+        BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN);
+        bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
+        
         BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
         bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 45f0d32..6301f3d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -103,7 +103,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
     }
-    
+
     private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
 
     public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
@@ -111,7 +111,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         String jobID = extractJobIDFromPath(filePath);
         return findSegmentWithUuid(jobID, cube);
     }
-    
+
     private static String extractJobIDFromPath(String path) {
         Matcher matcher = JOB_NAME_PATTERN.matcher(path);
         // check the first occurrence
@@ -134,11 +134,14 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
     @Override
     public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
+        long cuboidID = rowKeySplitter.split(key.getBytes());
+        short shard = rowKeySplitter.getLastSplittedShard();
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
 
         SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
         int bufOffset = 0;
+        BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN);
+        bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
         BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
         bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index c47d090..e7db1fb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -30,6 +32,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -38,6 +41,8 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * @author George Song (ysong1)
  * 
@@ -49,6 +54,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private Text outputKey = new Text();
     private String cubeName;
     private String segmentName;
+    private CubeSegment cubeSegment;
     private CubeDesc cubeDesc;
     private CuboidScheduler cuboidScheduler;
 
@@ -68,7 +74,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
         cubeDesc = cube.getDescriptor();
 
         // initialize CubiodScheduler
@@ -80,16 +86,21 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
         int offset = 0;
 
+        //shard id will be filled after other contents
+        offset += RowConstants.ROWKEY_SHARDID_LEN;
+
         // cuboid id
         System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
-        offset += childCuboid.getBytes().length;
+        offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+        int bodyOffset = offset;
 
         // rowkey columns
         long mask = Long.highestOneBit(parentCuboid.getId());
         long parentCuboidId = parentCuboid.getId();
         long childCuboidId = childCuboid.getId();
         long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
-        int index = 1; // skip cuboidId
+        int index = 2; // skip shard and cuboidId
         for (int i = 0; i < parentCuboidIdActualLength; i++) {
             if ((mask & parentCuboidId) > 0) {// if the this bit position equals
                                               // 1
@@ -103,12 +114,18 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
             mask = mask >> 1;
         }
 
+        //fill shard
+        short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId);
+        short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - bodyOffset, cuboidShardNum);
+        short finalShard = ShardingHash.getShard(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards());
+        BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
         return offset;
     }
 
     @Override
     public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
+        long cuboidId = rowKeySplitter.split(key.getBytes());
         Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
 
         Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 09295b0..9b839c3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -41,7 +41,7 @@ public class CoprocessorProjector {
 
         RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
             @Override
-            protected int fillHeader(byte[] bytes, byte[][] values) {
+            protected int fillHeader(byte[] bytes) {
                 Arrays.fill(bytes, 0, this.headerLength, (byte) 0xff);
                 return this.headerLength;
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
index 4b7c4dc..7ec97c0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
@@ -131,7 +131,7 @@ public class CoprocessorRowType {
 
     private void init() {
         int[] offsets = new int[columns.length];
-        int o = RowConstants.ROWKEY_CUBOIDID_LEN;
+        int o = RowConstants.ROWKEY_HEADER_LEN;
         for (int i = 0; i < columns.length; i++) {
             offsets[i] = o;
             o += columnSizes[i];

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 836f142..1e796ca 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage.hbase.cube.v1;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
@@ -34,11 +35,13 @@ import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -437,27 +440,30 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             }
 
             //log
-            sb.append(scanRanges.size() + "=>");
+            sb.append(scanRanges.size() + "=(mergeoverlap)>");
 
             List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
 
             //log
-            sb.append(mergedRanges.size() + "=>");
+            sb.append(mergedRanges.size() + "=(mergetoomany)>");
 
             mergedRanges = mergeTooManyRanges(mergedRanges);
 
             //log
-            sb.append(mergedRanges.size() + ", ");
+            sb.append(mergedRanges.size() + ",");
 
             result.addAll(mergedRanges);
         }
-
         logger.info(sb.toString());
 
         logger.info("hbasekeyrange count: " + result.size());
+
         dropUnhitSegments(result);
         logger.info("hbasekeyrange count after dropping unhit :" + result.size());
 
+        result = duplicateRangeByShard(result);
+        logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size());
+
         return result;
     }
 
@@ -667,6 +673,42 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         }
     }
 
+    private List<HBaseKeyRange> duplicateRangeByShard(List<HBaseKeyRange> scans) {
+        List<HBaseKeyRange> ret = Lists.newArrayList();
+
+        for (HBaseKeyRange scan : scans) {
+            CubeSegment segment = scan.getCubeSegment();
+
+            byte[] startKey = scan.getStartKey();
+            byte[] stopKey = scan.getStopKey();
+
+            short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId());
+            short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId());
+            for (short i = 0; i < cuboidShardNum; ++i) {
+                byte[] newStartKey = duplicateKeyAndChangeShard(i, startKey);
+                byte[] newStopKey = duplicateKeyAndChangeShard(i, stopKey);
+                HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, //
+                        scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate());
+                ret.add(newRange);
+            }
+        }
+
+        Collections.sort(ret, new Comparator<HBaseKeyRange>() {
+            @Override
+            public int compare(HBaseKeyRange o1, HBaseKeyRange o2) {
+                return Bytes.compareTo(o1.getStartKey(), o2.getStartKey());
+            }
+        });
+
+        return ret;
+    }
+
+    private byte[] duplicateKeyAndChangeShard(short newShard, byte[] bytes) {
+        byte[] ret = Arrays.copyOf(bytes, bytes.length);
+        BytesUtil.writeShort(newShard, ret, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        return ret;
+    }
+
     private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
         if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1d351433/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 086acb0..45a5f96 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -355,7 +355,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
      */
     private static long estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
 
-        int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+        int bytesLength = RowConstants.ROWKEY_HEADER_LEN;
 
         long mask = Long.highestOneBit(baseCuboidId);
         long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);