You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/25 14:12:19 UTC

[6/7] incubator-kylin git commit: KYLIN-942 support parallel scan for grid table

KYLIN-942 support parallel scan for grid table


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/fabdd5cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/fabdd5cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/fabdd5cd

Branch: refs/heads/2.x-staging
Commit: fabdd5cd9f752a617fca6670acb9b7af190be45b
Parents: 5c9c097
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 18:18:02 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Sun Oct 25 21:15:21 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/hadoop/invertedindex/IITest.java  |   6 +-
 commit_SHA1                                     |  16 +
 core-common/pom.xml                             |   9 +
 .../org/apache/kylin/common/util/BitSets.java   |  39 +
 .../org/apache/kylin/common/util/BytesUtil.java |  20 +-
 .../kylin/common/util/CompressionUtils.java     |   6 +-
 .../kylin/common/util/ImplementationSwitch.java |  12 +-
 .../apache/kylin/common/util/ShardingHash.java  |  67 ++
 .../apache/kylin/common/util/BitSetsTest.java   |  36 +
 .../apache/kylin/common/util/BytesUtilTest.java |  11 +-
 .../common/util/ImplementationSwitchTest.java   |  28 +-
 .../org/apache/kylin/cube/CubeInstance.java     |  12 +-
 .../java/org/apache/kylin/cube/CubeSegment.java |  50 +
 .../cube/common/FuzzyValueCombination.java      | 130 +++
 .../kylin/cube/common/RowKeySplitter.java       |  31 +-
 .../org/apache/kylin/cube/cuboid/Cuboid.java    |   4 +-
 .../kylin/cube/gridtable/CubeCodeSystem.java    |   9 +-
 .../cube/gridtable/TrimmedCubeCodeSystem.java   |  15 +-
 .../cube/inmemcubing/ConcurrentDiskStore.java   |   4 +-
 .../kylin/cube/inmemcubing/MemDiskStore.java    |   4 +-
 .../kylin/cube/kv/AbstractRowKeyEncoder.java    |   5 +
 .../apache/kylin/cube/kv/FuzzyKeyEncoder.java   |   2 +-
 .../apache/kylin/cube/kv/FuzzyMaskEncoder.java  |  11 +-
 .../org/apache/kylin/cube/kv/RowConstants.java  |  11 +-
 .../org/apache/kylin/cube/kv/RowKeyDecoder.java |   4 +-
 .../org/apache/kylin/cube/kv/RowKeyEncoder.java |  56 +-
 .../org/apache/kylin/gridtable/GTBuilder.java   |   4 +-
 .../java/org/apache/kylin/gridtable/GTInfo.java |  30 +-
 .../org/apache/kylin/gridtable/GTRecord.java    |  75 +-
 .../org/apache/kylin/gridtable/GTScanRange.java |  30 +-
 .../kylin/gridtable/GTScanRangePlanner.java     | 244 +++--
 .../apache/kylin/gridtable/GTScanRequest.java   |   9 +-
 .../java/org/apache/kylin/gridtable/GTUtil.java |   4 -
 .../org/apache/kylin/gridtable/GridTable.java   |   3 -
 .../org/apache/kylin/gridtable/IGTStore.java    |   4 +-
 .../gridtable/memstore/GTSimpleMemStore.java    |   7 +-
 .../kylin/cube/common/RowKeySplitterTest.java   |  20 +-
 .../apache/kylin/cube/kv/RowKeyDecoderTest.java |  10 +-
 .../apache/kylin/cube/kv/RowKeyEncoderTest.java |  32 +-
 .../kylin/gridtable/DictGridTableTest.java      | 152 ++-
 .../org/apache/kylin/engine/EngineFactory.java  |  28 +-
 .../kylin/metadata/filter/TupleFilter.java      |  11 +
 .../metadata/filter/TupleFilterSerializer.java  |  10 +-
 .../org/apache/kylin/source/SourceFactory.java  |   9 +-
 .../apache/kylin/storage/StorageFactory.java    |  15 +-
 .../kylin/storage/hybrid/HybridInstance.java    |   2 +-
 .../translate/FuzzyValueCombination.java        |  30 +-
 .../kylin/storage/translate/HBaseKeyRange.java  |   6 +-
 dev-support/test_all.sh                         |  11 +
 .../kylin/engine/mr/common/CuboidShardUtil.java |  56 ++
 .../kylin/engine/mr/common/CuboidStatsUtil.java |  61 ++
 .../mr/steps/FactDistinctColumnsReducer.java    |  26 +-
 .../mr/steps/MapContextGTRecordWriter.java      |  21 +-
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   7 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |   9 +-
 .../engine/mr/steps/MergeStatisticsStep.java    |   3 +-
 .../kylin/engine/mr/steps/NDCuboidJob.java      |   5 -
 .../kylin/engine/mr/steps/NDCuboidMapper.java   |  23 +-
 .../steps/FactDistinctColumnsReducerTest.java   |   3 +-
 .../engine/mr/steps/NDCuboidMapperTest.java     |   7 +-
 .../test/resources/data/8d_cuboid/part-r-00000  | Bin 1476517 -> 913974 bytes
 .../resources/data/base_cuboid/part-r-00000     | Bin 394644 -> 243816 bytes
 .../apache/kylin/engine/spark/SparkCubing.java  |   4 +-
 .../spark/cube/DefaultTupleConverter.java       |  39 +-
 .../kylin/invertedindex/index/ShardingHash.java |  32 -
 .../kylin/invertedindex/index/TableRecord.java  |   3 +-
 pom.xml                                         |   7 +
 .../apache/kylin/query/routing/RoutingRule.java |   9 +-
 .../src/test/resources/query/debug/query78.sql  |  22 +
 query/src/test/resources/query/sql/query01.sql  |   4 +-
 query/src/test/resources/query/sql/query85.sql  |  26 +
 query/src/test/resources/query/sql/query86.sql  |  24 +
 .../resources/kylin-server-log4j.properties     |   2 +-
 .../kylin/storage/hbase/HBaseStorage.java       |   5 +-
 .../coprocessor/CoprocessorProjector.java       |   2 +-
 .../common/coprocessor/CoprocessorRowType.java  |   2 +-
 .../hbase/cube/v1/CubeSegmentTupleIterator.java |   2 +-
 .../storage/hbase/cube/v1/CubeStorageQuery.java |  80 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 136 ++-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     | 155 ++-
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |  98 +-
 .../storage/hbase/cube/v2/CubeScanner.java      | 265 -----
 .../hbase/cube/v2/CubeSegmentScanner.java       | 290 ++++++
 .../storage/hbase/cube/v2/CubeStorageQuery.java |   7 +-
 .../hbase/cube/v2/HBaseReadonlyStore.java       |  47 +-
 .../kylin/storage/hbase/cube/v2/HBaseScan.java  |  88 ++
 .../kylin/storage/hbase/cube/v2/RawScan.java    |  22 +-
 .../cube/v2/SequentialCubeTupleIterator.java    |   8 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  29 +-
 .../endpoint/generated/CubeVisitProtos.java     | 981 ++++++++++++++++++-
 .../endpoint/protobuf/CubeVisit.proto           |   4 +
 .../endpoint/EndpointTupleIterator.java         |  21 +-
 .../ii/coprocessor/endpoint/IIEndpoint.java     |  22 +-
 .../storage/hbase/steps/CreateHTableJob.java    | 153 ++-
 .../storage/hbase/steps/HBaseCuboidWriter.java  |  26 +-
 .../hbase/steps/HBaseStreamingOutput.java       |   6 +-
 .../kylin/storage/hbase/steps/MergeGCStep.java  |  10 +-
 .../hbase/steps/SandboxMetastoreCLI.java        |   2 +-
 98 files changed, 3158 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 913a2f7..2a643c8 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,9 +10,13 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+
 import com.google.common.base.Function;
 import kafka.message.Message;
 import kafka.message.MessageAndOffset;
+
+import com.google.protobuf.HBaseZeroCopyByteString;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -232,7 +236,7 @@ public class IITest extends LocalFileMetadataTestCase {
         System.out.println(response.getRowsList().size());
         Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
         for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
-            byte[] measuresBytes = responseRow.getMeasures().toByteArray();
+            byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(responseRow.getMeasures());
             List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
             Assert.assertTrue(answers.contains(metrics.get(0)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/commit_SHA1
----------------------------------------------------------------------
diff --git a/commit_SHA1 b/commit_SHA1
new file mode 100644
index 0000000..d2f3970
--- /dev/null
+++ b/commit_SHA1
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+b66c25803a2f976cca067148278dbe7d7b0d79ef

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 7c78723..ebcb676 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -49,6 +49,10 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
@@ -65,6 +69,11 @@
             <artifactId>commons-email</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>${commons-collections4.version}</version>
+        </dependency>
+        <dependency>
             <groupId>commons-httpclient</groupId>
             <artifactId>commons-httpclient</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java b/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java
new file mode 100644
index 0000000..b8a6de7
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import java.util.BitSet;
+
+public class BitSets {
+    public static BitSet valueOf(int[] indexes) {
+        if (indexes == null || indexes.length == 0) {
+            return new BitSet();
+        }
+
+        int maxIndex = Integer.MIN_VALUE;
+        for (int index : indexes) {
+            maxIndex = Math.max(maxIndex, index);
+        }
+        BitSet set = new BitSet(maxIndex);
+        for (int index : indexes) {
+            set.set(index);
+        }
+        return set;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0880da1..0d4dba9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -25,14 +25,23 @@ public class BytesUtil {
 
     public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
 
-    public static void writeLong(long num, byte[] bytes, int offset, int size) {
+    public static void writeShort(short num, byte[] bytes, int offset, int size) {
         for (int i = offset + size - 1; i >= offset; i--) {
             bytes[i] = (byte) num;
             num >>>= 8;
         }
     }
 
-    public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+    public static long readShort(byte[] bytes, int offset, int size) {
+        short num = 0;
+        for (int i = offset, n = offset + size; i < n; i++) {
+            num <<= 8;
+            num |= (short) bytes[i] & 0xFF;
+        }
+        return num;
+    }
+
+    public static void writeLong(long num, byte[] bytes, int offset, int size) {
         for (int i = offset + size - 1; i >= offset; i--) {
             bytes[i] = (byte) num;
             num >>>= 8;
@@ -48,6 +57,13 @@ public class BytesUtil {
         return integer;
     }
 
+    public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+        for (int i = offset + size - 1; i >= offset; i--) {
+            bytes[i] = (byte) num;
+            num >>>= 8;
+        }
+    }
+
     public static int readUnsigned(byte[] bytes, int offset, int size) {
         int integer = 0;
         for (int i = offset, n = offset + size; i < n; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
index 13abab5..c9838e4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
@@ -45,8 +45,7 @@ public class CompressionUtils {
         outputStream.close();
         byte[] output = outputStream.toByteArray();
 
-        logger.info("Original: " + data.length + " bytes");
-        logger.info("Compressed: " + output.length + " bytes");
+        logger.info("Original: " + data.length + " bytes. " + "Compressed: " + output.length + " bytes ");
         return output;
     }
 
@@ -63,8 +62,7 @@ public class CompressionUtils {
         outputStream.close();
         byte[] output = outputStream.toByteArray();
 
-        logger.info("Original: " + data.length + " bytes");
-        logger.info("Decompressed: " + output.length + " bytes");
+        logger.info("Original: " + data.length + " bytes. " + "Decompressed: " + output.length + " bytes");
         return output;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
index 3101c81..f6924a0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
@@ -26,14 +26,16 @@ import org.slf4j.LoggerFactory;
  * Provide switch between different implementations of a same interface.
  * Each implementation is identified by an integer ID.
  */
-public class ImplementationSwitch {
+public class ImplementationSwitch<I> {
 
     private static final Logger logger = LoggerFactory.getLogger(ImplementationSwitch.class);
 
     final private Object[] instances;
+    private Class<I> interfaceClz;
 
-    public ImplementationSwitch(Map<Integer, String> impls) {
-        instances = initInstances(impls);
+    public ImplementationSwitch(Map<Integer, String> impls, Class<I> interfaceClz) {
+        this.interfaceClz = interfaceClz;
+        this.instances = initInstances(impls);
     }
 
     private Object[] initInstances(Map<Integer, String> impls) {
@@ -58,13 +60,13 @@ public class ImplementationSwitch {
         return result;
     }
 
-    public <I> I get(int id, Class<I> interfaceClz) {
+    public I get(int id) {
         @SuppressWarnings("unchecked")
         I result = (I) instances[id];
 
         if (result == null)
             throw new IllegalArgumentException("Implementations missing, ID " + id + ", interafce " + interfaceClz.getName());
-        
+
         return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
new file mode 100644
index 0000000..8d728c8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+public class ShardingHash {
+
+    static HashFunction hashFunc = Hashing.murmur3_128();
+
+    public static short getShard(int integerValue, int totalShards) {
+        if (totalShards <= 1) {
+            return 0;
+        }
+        long hash = hashFunc.hashInt(integerValue).asLong();
+        return _getShard(hash, totalShards);
+    }
+
+    public static short getShard(long longValue, int totalShards) {
+        if (totalShards <= 1) {
+            return 0;
+        }
+        long hash = hashFunc.hashLong(longValue).asLong();
+        return _getShard(hash, totalShards);
+    }
+
+    public static short getShard(byte[] byteValues, int offset, int length, int totalShards) {
+        if (totalShards <= 1) {
+            return 0;
+        }
+
+        long hash = hashFunc.hashBytes(byteValues, offset, length).asLong();
+        return _getShard(hash, totalShards);
+    }
+
+    public static short normalize(short cuboidShardBase, short shardOffset, int totalShards) {
+        if (totalShards <= 1) {
+            return 0;
+        }
+        return (short) ((cuboidShardBase + shardOffset) % totalShards);
+    }
+
+    private static short _getShard(long hash, int totalShard) {
+        long x = hash % totalShard;
+        if (x < 0) {
+            x += totalShard;
+        }
+        return (short) x;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java
new file mode 100644
index 0000000..c923969
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import java.util.BitSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BitSetsTest {
+
+    @Test
+    public void basicTest() {
+        BitSet a = BitSets.valueOf(new int[] { 1, 3, 10 });
+        Assert.assertEquals(3, a.cardinality());
+        Assert.assertTrue(10 < a.size());
+        Assert.assertTrue(a.get(3));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
index 7436de9..79bc9f1 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
@@ -18,17 +18,15 @@
 
 package org.apache.kylin.common.util;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import junit.framework.TestCase;
-
 import org.junit.Test;
 
-/**
- * by honma
- */
-public class BytesUtilTest extends TestCase {
+public class BytesUtilTest {
     @Test
     public void test() {
         ByteBuffer buffer = ByteBuffer.allocate(10000);
@@ -77,6 +75,7 @@ public class BytesUtilTest extends TestCase {
         assertTrue(Arrays.equals(anOtherNewBytes, ba.array()));
     }
 
+    @Test
     public void testReadable() {
         String x = "\\x00\\x00\\x00\\x00\\x00\\x01\\xFC\\xA8";
         byte[] bytes = BytesUtil.fromReadableText(x);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java b/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
index 4c69eeb..ab0b565 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.kylin.common.util;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -25,34 +25,34 @@ import java.util.Map;
 import org.junit.Test;
 
 public class ImplementationSwitchTest {
-    
-    ImplementationSwitch sw;
+
+    ImplementationSwitch<I> sw;
 
     public ImplementationSwitchTest() {
         Map<Integer, String> impls = new HashMap<>();
         impls.put(0, "non.exist.class");
         impls.put(1, Impl1.class.getName());
         impls.put(2, Impl2.class.getName());
-        sw = new ImplementationSwitch(impls);
+        sw = new ImplementationSwitch<I>(impls, I.class);
     }
-    
-    public static interface I {
+
+    public interface I {
     }
-    
+
     public static class Impl1 implements I {
     }
-    
+
     public static class Impl2 implements I {
     }
-    
+
     @Test
     public void test() {
-        assertTrue(sw.get(1, I.class) instanceof Impl1);
-        assertTrue(sw.get(2, I.class) instanceof Impl2);
+        assertTrue(sw.get(1) instanceof Impl1);
+        assertTrue(sw.get(2) instanceof Impl2);
     }
-    
-    @Test(expected = IllegalArgumentException.class)  
+
+    @Test(expected = IllegalArgumentException.class)
     public void testException() {
-        sw.get(0, I.class);
+        sw.get(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 8b6653b..f8e37f3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -48,10 +48,9 @@ import com.google.common.collect.Lists;
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class CubeInstance extends RootPersistentEntity implements IRealization, IBuildable {
-    private static final int COST_WEIGHT_DIMENSION = 1;
     private static final int COST_WEIGHT_MEASURE = 1;
-    private static final int COST_WEIGHT_LOOKUP_TABLE = 1;
-    private static final int COST_WEIGHT_INNER_JOIN = 2;
+    private static final int COST_WEIGHT_DIMENSION = 10;
+    private static final int COST_WEIGHT_INNER_JOIN = 100;
 
     public static CubeInstance create(String cubeName, String projectName, CubeDesc cubeDesc) {
         CubeInstance cubeInstance = new CubeInstance();
@@ -67,7 +66,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
 
         return cubeInstance;
     }
-    
+
     @JsonIgnore
     private KylinConfig config;
     @JsonProperty("name")
@@ -122,7 +121,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         }
         return mergingSegments;
     }
-    
+
     public CubeDesc getDescriptor() {
         return CubeDescManager.getInstance(config).getCubeDesc(descName);
     }
@@ -355,7 +354,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
 
         for (LookupDesc lookupDesc : this.getDescriptor().getModel().getLookups()) {
             // more tables, more cost
-            calculatedCost += COST_WEIGHT_LOOKUP_TABLE;
             if ("inner".equals(lookupDesc.getJoin().getType())) {
                 // inner join cost is bigger than left join, as it will filter some records
                 calculatedCost += COST_WEIGHT_INNER_JOIN;
@@ -444,12 +442,10 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
     public int getStorageType() {
         return getDescriptor().getStorageType();
     }
-    
 
     @Override
     public int getEngineType() {
         return getDescriptor().getEngineType();
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 7d89470..1a44fcf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -25,6 +25,7 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
@@ -37,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonBackReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
@@ -67,6 +69,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     private String lastBuildJobID;
     @JsonProperty("create_time_utc")
     private long createTimeUTC;
+    @JsonProperty("cuboid_shard_nums")
+    private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
+    @JsonProperty("total_shards")
+    private int totalShards = 0;
 
     @JsonProperty("binary_signature")
     private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check
@@ -76,6 +82,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     @JsonProperty("snapshots")
     private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path
 
+    private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid
+
     public CubeDesc getCubeDesc() {
         return getCubeInstance().getDescriptor();
     }
@@ -360,4 +368,46 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
         return cubeInstance.getStorageType();
     }
 
+    /**
+     * get the number of shards where each cuboid will distribute
+     * @return
+     */
+    public Short getCuboidShardNum(Long cuboidId) {
+        Short ret = this.cuboidShardNums.get(cuboidId);
+        if (ret == null) {
+            return 1;
+        } else {
+            return ret;
+        }
+    }
+
+    //    /**
+    //     * get the number of shards where each cuboid will distribute
+    //     * @return
+    //     */
+    //    public Map<Long, Short> getCuboidShards() {
+    //        return this.cuboidShards;
+    //    }
+
+    public void setCuboidShardNums(Map<Long, Short> newCuboidShards) {
+        this.cuboidShardNums = newCuboidShards;
+    }
+
+    public int getTotalShards() {
+        return totalShards;
+    }
+
+    public void setTotalShards(int totalShards) {
+        this.totalShards = totalShards;
+    }
+
+    public short getCuboidBaseShard(Long cuboidId) {
+        Short ret = cuboidBaseShards.get(cuboidId);
+        if (ret == null) {
+            ret = ShardingHash.getShard(cuboidId, totalShards);
+            cuboidBaseShards.put(cuboidId, ret);
+        }
+        return ret;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
new file mode 100644
index 0000000..4ddb06a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class FuzzyValueCombination {
+
+    private static class Dim<K, V> {
+        K col;
+        Set<V> values;
+    }
+
+    private static final Set SINGLE_NULL_SET = Sets.newHashSet();
+
+    static {
+        SINGLE_NULL_SET.add(null);
+    }
+
+    public static <K, V> List<Map<K, V>> calculate(Map<K, Set<V>> fuzzyValues, long cap) {
+        Collections.emptyMap();
+        Dim<K, V>[] dims = toDims(fuzzyValues);
+        // If a query has many IN clause and each IN clause has many values, then it will easily generate 
+        // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked 
+        // on it. So simply choose to abandon all fuzzy keys in this case.
+        if (exceedCap(dims, cap)) {
+            return Lists.newArrayList();
+        } else {
+            return combination(dims);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <K, V> List<Map<K, V>> combination(Dim<K, V>[] dims) {
+
+        List<Map<K, V>> result = Lists.newArrayList();
+
+        int emptyDims = 0;
+        for (Dim dim : dims) {
+            if (dim.values.isEmpty()) {
+                dim.values = SINGLE_NULL_SET;
+                emptyDims++;
+            }
+        }
+        if (emptyDims == dims.length) {
+            return result;
+        }
+
+        Map<K, V> r = Maps.newHashMap();
+        Iterator<V>[] iters = new Iterator[dims.length];
+        int level = 0;
+        while (true) {
+            Dim<K, V> dim = dims[level];
+            if (iters[level] == null) {
+                iters[level] = dim.values.iterator();
+            }
+
+            Iterator<V> it = iters[level];
+            if (it.hasNext() == false) {
+                if (level == 0)
+                    break;
+                r.remove(dim.col);
+                iters[level] = null;
+                level--;
+                continue;
+            }
+
+            r.put(dim.col, it.next());
+            if (level == dims.length - 1) {
+                result.add(new HashMap<K, V>(r));
+            } else {
+                level++;
+            }
+        }
+        return result;
+    }
+
+    private static <K, V> Dim<K, V>[] toDims(Map<K, Set<V>> fuzzyValues) {
+        Dim[] dims = new Dim[fuzzyValues.size()];
+        int i = 0;
+        for (Entry<K, Set<V>> entry : fuzzyValues.entrySet()) {
+            dims[i] = new Dim<K, V>();
+            dims[i].col = entry.getKey();
+            dims[i].values = entry.getValue();
+            if (dims[i].values == null)
+                dims[i].values = Collections.emptySet();
+            i++;
+        }
+        return dims;
+    }
+
+    private static boolean exceedCap(Dim[] dims, long cap) {
+        return combCount(dims) > cap;
+    }
+
+    private static long combCount(Dim[] dims) {
+        long count = 1;
+        for (Dim dim : dims) {
+            count *= Math.max(dim.values.size(), 1);
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index 7e379dd..0111cee 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -27,10 +27,6 @@ import org.apache.kylin.cube.kv.RowKeyColumnIO;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
-/**
- * @author George Song (ysong1)
- * 
- */
 public class RowKeySplitter {
 
     private CubeDesc cubeDesc;
@@ -39,6 +35,9 @@ public class RowKeySplitter {
     private SplittedBytes[] splitBuffers;
     private int bufferSize;
 
+    private long lastSplittedCuboidId;
+    private short lastSplittedShard;
+
     public SplittedBytes[] getSplitBuffers() {
         return splitBuffers;
     }
@@ -47,6 +46,14 @@ public class RowKeySplitter {
         return bufferSize;
     }
 
+    public long getLastSplittedCuboidId() {
+        return lastSplittedCuboidId;
+    }
+
+    public short getLastSplittedShard() {
+        return lastSplittedShard;
+    }
+
     public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
         this.cubeDesc = cubeSeg.getCubeDesc();
         this.colIO = new RowKeyColumnIO(cubeSeg);
@@ -60,21 +67,27 @@ public class RowKeySplitter {
 
     /**
      * @param bytes
-     * @param byteLen
      * @return cuboid ID
      */
-    public long split(byte[] bytes, int byteLen) {
+    public long split(byte[] bytes) {
         this.bufferSize = 0;
         int offset = 0;
 
+        // extract shard
+        SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++];
+        shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN;
+        System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        offset += RowConstants.ROWKEY_SHARDID_LEN;
+
         // extract cuboid id
         SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
         cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN;
         System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
         offset += RowConstants.ROWKEY_CUBOIDID_LEN;
 
-        long cuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+        lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
+        lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length);
+        Cuboid cuboid = Cuboid.findById(cubeDesc, lastSplittedCuboidId);
 
         // rowkey columns
         for (int i = 0; i < cuboid.getColumns().size(); i++) {
@@ -86,6 +99,6 @@ public class RowKeySplitter {
             offset += colLength;
         }
 
-        return cuboidId;
+        return lastSplittedCuboidId;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 3ff1d61..2c8680d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -22,15 +22,13 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.*;
 import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
 import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
 import org.apache.kylin.metadata.model.TblColRef;
 
-/**
- * @author George Song (ysong1)
- */
 public class Cuboid implements Comparable<Cuboid> {
 
     private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index b0554ae..9e45fe0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -20,9 +20,10 @@ import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
 import org.apache.kylin.metadata.measure.serializer.StringSerializer;
 
 /**
- * Created by shaoshi on 3/23/15.
- * This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check
- * its data type to serialize/deserialize it;
+ * defines how column values will be encoded to/ decoded from GTRecord 
+ * 
+ * Cube meta can provide which columns are dictionary encoded (dict encoded dimensions) or fixed length encoded (fixed length dimensions)
+ * Metrics columns are more flexible, they will use DataTypeSerializer according to their data type.
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class CubeCodeSystem implements IGTCodeSystem {
@@ -110,7 +111,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
         if (serializer instanceof DictionarySerializer) {
             ((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf);
         } else {
-            if ((!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer)) && (value instanceof String)) {
+            if ((value instanceof String) && (!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer))) {
                 value = serializer.valueOf((String) value);
             }
             serializer.serialize(value, buf);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index e662a82..e4f32fb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -74,15 +74,12 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem {
     @Override
     public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
         DataTypeSerializer serializer = serializers[col];
-        if (serializer instanceof CubeCodeSystem.TrimmedDictionarySerializer || serializer instanceof CubeCodeSystem.DictionarySerializer) {
-            //TODO: remove this check
-            throw new IllegalStateException("Encode dictionary value in coprocessor");
-        } else {
-            if ((!(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer)) && (value instanceof String)) {
-                value = serializer.valueOf((String) value);
-            }
-            serializer.serialize(value, buf);
-        }
+
+//        if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) {
+//            value = serializer.valueOf((String) value);
+//        }
+        
+        serializer.serialize(value, buf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
index ebff9c8..8b95b4f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java
@@ -92,12 +92,12 @@ public class ConcurrentDiskStore implements IGTStore, Closeable {
     }
 
     @Override
-    public IGTWriter rebuild(int shard) throws IOException {
+    public IGTWriter rebuild() throws IOException {
         return newWriter(0);
     }
 
     @Override
-    public IGTWriter append(int shard) throws IOException {
+    public IGTWriter append() throws IOException {
         return newWriter(diskFile.length());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
index 2a12d1b..166ae76 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
@@ -88,12 +88,12 @@ public class MemDiskStore implements IGTStore, Closeable {
     }
 
     @Override
-    public IGTWriter rebuild(int shard) throws IOException {
+    public IGTWriter rebuild() throws IOException {
         return newWriter(0);
     }
 
     @Override
-    public IGTWriter append(int shard) throws IOException {
+    public IGTWriter append() throws IOException {
         return newWriter(length());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index f566f5c..1e24432 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -62,6 +62,7 @@ public abstract class AbstractRowKeyEncoder {
 
     protected final Cuboid cuboid;
     protected byte blankByte = DEFAULT_BLANK_BYTE;
+    protected boolean encodeShard = true;
 
     protected AbstractRowKeyEncoder(Cuboid cuboid) {
         this.cuboid = cuboid;
@@ -71,6 +72,10 @@ public abstract class AbstractRowKeyEncoder {
         this.blankByte = blankByte;
     }
 
+    public void setEncodeShard(boolean encodeShard) {
+        this.encodeShard = encodeShard;
+    }
+
     abstract public byte[] encode(Map<TblColRef, String> valueMap);
 
     abstract public byte[] encode(byte[][] values);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
index a17bb28..2185bc5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
@@ -37,7 +37,7 @@ public class FuzzyKeyEncoder extends RowKeyEncoder {
     @Override
     protected byte[] defaultValue(int length) {
         byte[] keyBytes = new byte[length];
-        Arrays.fill(keyBytes, RowConstants.FUZZY_MASK_ZERO);
+        Arrays.fill(keyBytes, RowConstants.BYTE_ZERO);
         return keyBytes;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 5077287..bf67538 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -36,20 +36,19 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
     }
 
     @Override
-    protected int fillHeader(byte[] bytes, byte[][] values) {
+    protected int fillHeader(byte[] bytes) {
+        Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
         // always fuzzy match cuboid ID to lock on the selected cuboid
-        int cuboidStart = this.headerLength - RowConstants.ROWKEY_CUBOIDID_LEN;
-        Arrays.fill(bytes, 0, cuboidStart, RowConstants.FUZZY_MASK_ONE);
-        Arrays.fill(bytes, cuboidStart, this.headerLength, RowConstants.FUZZY_MASK_ZERO);
+        Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.BYTE_ZERO);
         return this.headerLength;
     }
 
     @Override
     protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
         if (value == null) {
-            Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ONE);
+            Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ONE);
         } else {
-            Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ZERO);
+            Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ZERO);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 7607edf..6a8eeb5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -26,18 +26,23 @@ public class RowConstants {
     public static final byte ROWKEY_LOWER_BYTE = 0;
     // row key upper bound
     public static final byte ROWKEY_UPPER_BYTE = (byte) 0xff;
+
     // row key cuboid id length
     public static final int ROWKEY_CUBOIDID_LEN = 8;
+    // row key shard length
+    public static final int ROWKEY_SHARDID_LEN = 2;
 
-    // fuzzy mask
-    public static final byte FUZZY_MASK_ZERO = 0;
-    public static final byte FUZZY_MASK_ONE = 1;
+    public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
+    
+    public static final byte BYTE_ZERO = 0;
+    public static final byte BYTE_ONE = 1;
 
     // row value delimiter
     public static final byte ROWVALUE_DELIMITER_BYTE = 7;
     public static final String ROWVALUE_DELIMITER_STRING = String.valueOf((char) 7);
     public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 };
 
+    public static final int ROWKEY_BUFFER_SIZE = 1024 * 1024; // 1 MB
     public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB
 
     // marker class

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 1b896a0..3506845 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -53,12 +53,12 @@ public class RowKeyDecoder {
     public long decode(byte[] bytes) throws IOException {
         this.values.clear();
 
-        long cuboidId = rowKeySplitter.split(bytes, bytes.length);
+        long cuboidId = rowKeySplitter.split(bytes);
         initCuboid(cuboidId);
 
         SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
 
-        int offset = 1; // skip cuboid id part
+        int offset = 2; // skip shard and cuboid id part
 
         for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
             TblColRef col = this.cuboid.getColumns().get(i);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 7f8bbd3..0676df6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -24,56 +24,33 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.TblColRef;
 
-/**
- * @author George Song (ysong1)
- */
 public class RowKeyEncoder extends AbstractRowKeyEncoder {
 
     private int bytesLength;
     protected int headerLength;
     private RowKeyColumnIO colIO;
+    CubeSegment cubeSeg;
 
     protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
         super(cuboid);
+        this.cubeSeg = cubeSeg;
         colIO = new RowKeyColumnIO(cubeSeg);
-        bytesLength = headerLength = RowConstants.ROWKEY_CUBOIDID_LEN; // header
+        bytesLength = headerLength = RowConstants.ROWKEY_HEADER_LEN; // include shard and cuboidid 
         for (TblColRef column : cuboid.getColumns()) {
             bytesLength += colIO.getColumnLength(column);
         }
     }
 
-    public RowKeyColumnIO getColumnIO() {
-        return colIO;
-    }
-
-    public int getColumnOffset(TblColRef col) {
-        int offset = RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        for (TblColRef dimCol : cuboid.getColumns()) {
-            if (col.equals(dimCol))
-                return offset;
-            offset += colIO.getColumnLength(dimCol);
-        }
-
-        throw new IllegalArgumentException("Column " + col + " not found on cuboid " + cuboid);
-    }
-
     public int getColumnLength(TblColRef col) {
         return colIO.getColumnLength(col);
     }
 
-    public int getRowKeyLength() {
-        return bytesLength;
-    }
-
-    public int getHeaderLength() {
-        return headerLength;
-    }
-
     @Override
     public byte[] encode(Map<TblColRef, String> valueMap) {
         List<byte[]> valueList = new ArrayList<byte[]>();
@@ -95,7 +72,8 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
     @Override
     public byte[] encode(byte[][] values) {
         byte[] bytes = new byte[this.bytesLength];
-        int offset = fillHeader(bytes, values);
+        int bodyOffset = RowConstants.ROWKEY_HEADER_LEN;
+        int offset = bodyOffset;
 
         for (int i = 0; i < cuboid.getColumns().size(); i++) {
             TblColRef column = cuboid.getColumns().get(i);
@@ -107,18 +85,34 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
                 fillColumnValue(column, colLength, value, value.length, bytes, offset);
             }
             offset += colLength;
-
         }
+
+        //fill shard and cuboid
+        fillHeader(bytes);
+
         return bytes;
     }
 
-    protected int fillHeader(byte[] bytes, byte[][] values) {
+    protected int fillHeader(byte[] bytes) {
         int offset = 0;
+
+        if (encodeShard) {
+            short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+            short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
+            short finalShard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
+            BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+        } else {
+            BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+        }
+        offset += RowConstants.ROWKEY_SHARDID_LEN;
+
         System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
         offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
         if (this.headerLength != offset) {
             throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
         }
+        
         return offset;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
index 31ea9e2..5eefa54 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
@@ -19,9 +19,9 @@ public class GTBuilder implements Closeable {
         this.info = info;
 
         if (append) {
-            storeWriter = store.append(shard);
+            storeWriter = store.append();
         } else {
-            storeWriter = store.rebuild(shard);
+            storeWriter = store.rebuild();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index d4fe3fb..e3d3640 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -34,13 +34,11 @@ public class GTInfo {
     ImmutableBitSet colBlocksAll;
     int rowBlockSize; // 0: disable row block
 
-    // sharding
-    int nShards; // 0: no sharding
-
     // must create from builder
     private GTInfo() {
     }
 
+
     public String getTableName() {
         return tableName;
     }
@@ -56,15 +54,11 @@ public class GTInfo {
     public ImmutableBitSet getPrimaryKey() {
         return primaryKey;
     }
-    
+
     public ImmutableBitSet getAllColumns() {
         return colAll;
     }
 
-    public boolean isShardingEnabled() {
-        return nShards > 0;
-    }
-
     public boolean isRowBlockEnabled() {
         return rowBlockSize > 0;
     }
@@ -119,7 +113,7 @@ public class GTInfo {
 
     public void validateColRef(TblColRef ref) {
         TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex());
-        if (expected.equals(ref) == false)
+        if (!expected.equals(ref))
             throw new IllegalArgumentException();
     }
 
@@ -162,11 +156,11 @@ public class GTInfo {
         for (int i = 0; i < colBlocks.length; i++) {
             merge = merge.or(colBlocks[i]);
         }
-        if (merge.equals(colAll) == false)
+        if (!merge.equals(colAll))
             throw new IllegalStateException();
 
         // primary key must be the first column block
-        if (primaryKey.equals(colBlocks[0]) == false)
+        if (!primaryKey.equals(colBlocks[0]))
             throw new IllegalStateException();
 
         // drop empty column block
@@ -177,7 +171,7 @@ public class GTInfo {
             if (cb.isEmpty())
                 it.remove();
         }
-        colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]);
+        colBlocks = list.toArray(new ImmutableBitSet[list.size()]);
     }
 
     public static class Builder {
@@ -228,12 +222,6 @@ public class GTInfo {
         }
 
         /** optional */
-        public Builder enableSharding(int nShards) {
-            info.nShards = nShards;
-            return this;
-        }
-
-        /** optional */
         public Builder setColumnPreferIndex(ImmutableBitSet colPreferIndex) {
             info.colPreferIndex = colPreferIndex;
             return this;
@@ -256,8 +244,12 @@ public class GTInfo {
             return KryoUtils.serialize(info);
         }
     }
-    
+
     public static GTInfo deserialize(byte[] bytes) {
         return KryoUtils.deserialize(bytes, GTInfo.class);
     }
+
+    public IGTCodeSystem getCodeSystem() {
+        return codeSystem;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index dbfdf57..0f4eb3d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -2,7 +2,7 @@ package org.apache.kylin.gridtable;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.BitSet;
+import java.util.List;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -159,7 +159,7 @@ public class GTRecord implements Comparable<GTRecord> {
             return false;
         for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
             int c = maskForEqualHashComp.trueBitAt(i);
-            if (this.cols[c].equals(o.cols[c]) == false) {
+            if (!this.cols[c].equals(o.cols[c])) {
                 return false;
             }
         }
@@ -228,19 +228,6 @@ public class GTRecord implements Comparable<GTRecord> {
         buf.setLength(pos);
     }
 
-    /** write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not*/
-    public void exportAllColumns(ByteBuffer buf) {
-        for (int i = 0; i < info.colAll.trueBitCount(); i++) {
-            int c = info.colAll.trueBitAt(i);
-            if (cols[c] == null || cols[c].array() == null) {
-                buf.put((byte) 0);
-            } else {
-                buf.put((byte) 1);
-                buf.put(cols[c].array(), cols[c].offset(), cols[c].length());
-            }
-        }
-    }
-
     /** write data to given buffer, like serialize */
     public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
         for (int i = 0; i < selectedCols.trueBitCount(); i++) {
@@ -261,34 +248,11 @@ public class GTRecord implements Comparable<GTRecord> {
     }
 
     /** change pointers to point to data in given buffer, UNLIKE deserialize */
-    public void loadPrimaryKey(ByteBuffer buf) {
-        loadColumns(info.primaryKey, buf);
-    }
-
-    /** change pointers to point to data in given buffer, UNLIKE deserialize */
     public void loadCellBlock(int c, ByteBuffer buf) {
         loadColumns(info.colBlocks[c], buf);
     }
 
     /** change pointers to point to data in given buffer, UNLIKE deserialize */
-    public void loadAllColumns(ByteBuffer buf) {
-        int pos = buf.position();
-        for (int i = 0; i < info.colAll.trueBitCount(); i++) {
-            int c = info.colAll.trueBitAt(i);
-
-            byte exist = buf.get();
-            pos++;
-
-            if (exist == 1) {
-                int len = info.codeSystem.codeLength(c, buf);
-                cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
-                pos += len;
-                buf.position(pos);
-            }
-        }
-    }
-
-    /** change pointers to point to data in given buffer, UNLIKE deserialize */
     public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
         int pos = buf.position();
         for (int i = 0; i < selectedCols.trueBitCount(); i++) {
@@ -300,30 +264,19 @@ public class GTRecord implements Comparable<GTRecord> {
         }
     }
 
-    /** similar to export(primaryKey) but will stop at the first null value */
-    public static ByteArray exportScanKey(GTRecord rec) {
-        if (rec == null)
-            return null;
-
-        GTInfo info = rec.getInfo();
-
-        BitSet selectedColumns = new BitSet();
-        int len = 0;
-        for (int i = 0; i < info.primaryKey.trueBitCount(); i++) {
-            int c = info.primaryKey.trueBitAt(i);
-            if (rec.cols[c].array() == null) {
-                break;
-            }
-            selectedColumns.set(c);
-            len += rec.cols[c].length();
+    /** change pointers to point to data in given buffer, UNLIKE deserialize
+     *  unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this
+     *  method allows to defined specific columns(in order) to load
+     */
+    public void loadColumns(List<Integer> selectedCols, ByteBuffer buf) {
+        int pos = buf.position();
+        for (int i = 0; i < selectedCols.size(); i++) {
+            int c = selectedCols.get(i);
+            int len = info.codeSystem.codeLength(c, buf);
+            cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
+            pos += len;
+            buf.position(pos);
         }
-
-        if (selectedColumns.cardinality() == 0)
-            return null;
-
-        ByteArray buf = ByteArray.allocate(len);
-        rec.exportColumns(new ImmutableBitSet(selectedColumns), buf);
-        return buf;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
index 197fde4..eefe88e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
@@ -7,42 +7,26 @@ public class GTScanRange {
 
     final public GTRecord pkStart; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
     final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
-    final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys
+    final public List<GTRecord> fuzzyKeys; // partial matching primary keys
 
     public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
         this(pkStart, pkEnd, null);
     }
 
-    public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) {
+    public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys) {
         GTInfo info = pkStart.info;
         assert info == pkEnd.info;
 
-        validateRangeKey(pkStart);
-        validateRangeKey(pkEnd);
-
         this.pkStart = pkStart;
         this.pkEnd = pkEnd;
-        this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord> emptyList() : hbaseFuzzyKeys;
-    }
-
-    private void validateRangeKey(GTRecord pk) {
-        pk.maskForEqualHashComp(pk.info.primaryKey);
-        boolean afterNull = false;
-        for (int i = 0; i < pk.info.primaryKey.trueBitCount(); i++) {
-            int c = pk.info.primaryKey.trueBitAt(i);
-            if (afterNull) {
-                pk.cols[c].set(null, 0, 0);
-            } else {
-                afterNull = pk.cols[c].array() == null;
-            }
-        }
+        this.fuzzyKeys = fuzzyKeys == null ? Collections.<GTRecord> emptyList() : fuzzyKeys;
     }
 
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode());
+        result = prime * result + ((fuzzyKeys == null) ? 0 : fuzzyKeys.hashCode());
         result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode());
         result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode());
         return result;
@@ -57,10 +41,10 @@ public class GTScanRange {
         if (getClass() != obj.getClass())
             return false;
         GTScanRange other = (GTScanRange) obj;
-        if (hbaseFuzzyKeys == null) {
-            if (other.hbaseFuzzyKeys != null)
+        if (fuzzyKeys == null) {
+            if (other.fuzzyKeys != null)
                 return false;
-        } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys))
+        } else if (!fuzzyKeys.equals(other.fuzzyKeys))
             return false;
         if (pkEnd == null) {
             if (other.pkEnd != null)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index c09ecf0..d860090 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -1,6 +1,7 @@
 package org.apache.kylin.gridtable;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -11,35 +12,50 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.common.FuzzyValueCombination;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class GTScanRangePlanner {
 
+    private static final Logger logger = LoggerFactory.getLogger(GTScanRangePlanner.class);
+
     private static final int MAX_HBASE_FUZZY_KEYS = 100;
 
     final private GTInfo info;
-    final private ComparatorEx<ByteArray> byteUnknownIsSmaller;
-    final private ComparatorEx<ByteArray> byteUnknownIsBigger;
-    final private ComparatorEx<GTRecord> recordUnknownIsSmaller;
-    final private ComparatorEx<GTRecord> recordUnknownIsBigger;
+    final private Pair<ByteArray, ByteArray> segmentStartAndEnd;
+    final private TblColRef partitionColRef;
+
+    final private RecordComparator rangeStartComparator;
+    final private RecordComparator rangeEndComparator;
+    final private RecordComparator rangeStartEndComparator;
 
-    public GTScanRangePlanner(GTInfo info) {
+    public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> segmentStartAndEnd, TblColRef partitionColRef) {
         this.info = info;
+        this.segmentStartAndEnd = segmentStartAndEnd;
+        this.partitionColRef = partitionColRef;
 
         IGTComparator comp = info.codeSystem.getComparator();
-        this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(comp);
-        this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(comp);
-        this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(comp);
-        this.recordUnknownIsBigger = recordComparatorTreatsUnknownBigger(comp);
+
+        //start key GTRecord compare to start key GTRecord
+        this.rangeStartComparator = getRangeStartComparator(comp);
+        //stop key GTRecord compare to stop key GTRecord
+        this.rangeEndComparator = getRangeEndComparator(comp);
+        //start key GTRecord compare to stop key GTRecord
+        this.rangeStartEndComparator = getRangeStartEndComparator(comp);
     }
 
     // return empty list meaning filter is always false
@@ -57,7 +73,8 @@ public class GTScanRangePlanner {
         List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
         for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
             GTScanRange scanRange = newScanRange(andDimRanges);
-            scanRanges.add(scanRange);
+            if (scanRange != null)
+                scanRanges.add(scanRange);
         }
 
         List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
@@ -69,28 +86,64 @@ public class GTScanRangePlanner {
     private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
         GTRecord pkStart = new GTRecord(info);
         GTRecord pkEnd = new GTRecord(info);
-        List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList();
+        Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
+
+        List<GTRecord> fuzzyKeys;
 
         for (ColumnRange range : andDimRanges) {
+
+            if (partitionColRef != null && range.column.equals(partitionColRef)) {
+
+                if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 //
+                        && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0) {
+                    //segment range is [Closed,Open)
+                } else {
+                    return null;
+                }
+            }
+
             int col = range.column.getColumnDesc().getZeroBasedIndex();
-            if (info.primaryKey.get(col) == false)
+            if (!info.primaryKey.get(col))
                 continue;
 
             pkStart.set(col, range.begin);
             pkEnd.set(col, range.end);
 
-            if (range.equals != null) {
-                ImmutableBitSet fuzzyMask = new ImmutableBitSet(col);
-                for (ByteArray v : range.equals) {
-                    GTRecord fuzzy = new GTRecord(info);
-                    fuzzy.set(col, v);
-                    fuzzy.maskForEqualHashComp(fuzzyMask);
-                    hbaseFuzzyKeys.add(fuzzy);
-                }
+            if (range.valueSet != null && !range.valueSet.isEmpty()) {
+                fuzzyValues.put(col, range.valueSet);
             }
         }
 
-        return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys);
+        fuzzyKeys = buildFuzzyKeys(fuzzyValues);
+
+        return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
+    }
+
+    private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) {
+        ArrayList<GTRecord> result = Lists.newArrayList();
+
+        if (fuzzyValueSet.isEmpty())
+            return result;
+
+        // debug/profiling purpose
+        if (BackdoorToggles.getDisableFuzzyKey()) {
+            logger.info("The execution of this query will not use fuzzy key");
+            return result;
+        }
+
+        List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS);
+
+        for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
+            GTRecord fuzzy = new GTRecord(info);
+            BitSet bitSet = new BitSet(info.getColumnCount());
+            for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
+                bitSet.set(entry.getKey());
+                fuzzy.set(entry.getKey(), entry.getValue());
+            }
+            fuzzy.maskForEqualHashComp(new ImmutableBitSet(bitSet));
+            result.add(fuzzy);
+        }
+        return result;
     }
 
     private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
@@ -194,7 +247,7 @@ public class GTScanRangePlanner {
         Collections.sort(ranges, new Comparator<GTScanRange>() {
             @Override
             public int compare(GTScanRange a, GTScanRange b) {
-                return recordUnknownIsSmaller.compare(a.pkStart, b.pkStart);
+                return rangeStartComparator.compare(a.pkStart, b.pkStart);
             }
         });
 
@@ -202,13 +255,12 @@ public class GTScanRangePlanner {
         List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
         int mergeBeginIndex = 0;
         GTRecord mergeEnd = ranges.get(0).pkEnd;
-        for (int index = 0; index < ranges.size(); index++) {
+        for (int index = 1; index < ranges.size(); index++) {
             GTScanRange range = ranges.get(index);
 
             // if overlap, swallow it
-            if (recordUnknownIsSmaller.min(range.pkStart, mergeEnd) == range.pkStart //
-                    || recordUnknownIsBigger.max(mergeEnd, range.pkStart) == mergeEnd) {
-                mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+            if (rangeStartEndComparator.compare(range.pkStart, mergeEnd) <= 0) {
+                mergeEnd = rangeEndComparator.max(mergeEnd, range.pkEnd);
                 continue;
             }
 
@@ -218,7 +270,7 @@ public class GTScanRangePlanner {
 
             // start new split
             mergeBeginIndex = index;
-            mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+            mergeEnd = range.pkEnd;
         }
 
         // don't miss the last range
@@ -239,9 +291,9 @@ public class GTScanRangePlanner {
 
         boolean hasNonFuzzyRange = false;
         for (GTScanRange range : ranges) {
-            hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty();
-            newFuzzyKeys.addAll(range.hbaseFuzzyKeys);
-            end = recordUnknownIsBigger.max(end, range.pkEnd);
+            hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty();
+            newFuzzyKeys.addAll(range.fuzzyKeys);
+            end = rangeEndComparator.max(end, range.pkEnd);
         }
 
         // if any range is non-fuzzy, then all fuzzy keys must be cleared
@@ -269,7 +321,7 @@ public class GTScanRangePlanner {
         private TblColRef column;
         private ByteArray begin = ByteArray.EMPTY;
         private ByteArray end = ByteArray.EMPTY;
-        private Set<ByteArray> equals;
+        private Set<ByteArray> valueSet;
 
         public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
             this.column = column;
@@ -277,16 +329,16 @@ public class GTScanRangePlanner {
             switch (op) {
             case EQ:
             case IN:
-                equals = new HashSet<ByteArray>(values);
+                valueSet = new HashSet<ByteArray>(values);
                 refreshBeginEndFromEquals();
                 break;
             case LT:
             case LTE:
-                end = byteUnknownIsBigger.max(values);
+                end = rangeEndComparator.comparator.max(values);
                 break;
             case GT:
             case GTE:
-                begin = byteUnknownIsSmaller.min(values);
+                begin = rangeStartComparator.comparator.min(values);
                 break;
             case NEQ:
             case NOTIN:
@@ -303,16 +355,16 @@ public class GTScanRangePlanner {
             this.column = column;
             this.begin = beginValue;
             this.end = endValue;
-            this.equals = equalValues;
+            this.valueSet = equalValues;
         }
 
         private void refreshBeginEndFromEquals() {
-            if (equals.isEmpty()) {
+            if (valueSet.isEmpty()) {
                 begin = ByteArray.EMPTY;
                 end = ByteArray.EMPTY;
             } else {
-                begin = byteUnknownIsSmaller.min(equals);
-                end = byteUnknownIsBigger.max(equals);
+                begin = rangeStartComparator.comparator.min(valueSet);
+                end = rangeEndComparator.comparator.max(valueSet);
             }
         }
 
@@ -321,8 +373,8 @@ public class GTScanRangePlanner {
         }
 
         public boolean satisfyNone() {
-            if (equals != null) {
-                return equals.isEmpty();
+            if (valueSet != null) {
+                return valueSet.isEmpty();
             } else if (begin.array() != null && end.array() != null) {
                 return info.codeSystem.getComparator().compare(begin, end) > 0;
             } else {
@@ -338,36 +390,36 @@ public class GTScanRangePlanner {
             }
 
             if (this.satisfyAll()) {
-                copy(another.column, another.begin, another.end, another.equals);
+                copy(another.column, another.begin, another.end, another.valueSet);
                 return;
             }
 
-            if (this.equals != null && another.equals != null) {
-                this.equals.retainAll(another.equals);
+            if (this.valueSet != null && another.valueSet != null) {
+                this.valueSet.retainAll(another.valueSet);
                 refreshBeginEndFromEquals();
                 return;
             }
 
-            if (this.equals != null) {
-                this.equals = filter(this.equals, another.begin, another.end);
+            if (this.valueSet != null) {
+                this.valueSet = filter(this.valueSet, another.begin, another.end);
                 refreshBeginEndFromEquals();
                 return;
             }
 
-            if (another.equals != null) {
-                this.equals = filter(another.equals, this.begin, this.end);
+            if (another.valueSet != null) {
+                this.valueSet = filter(another.valueSet, this.begin, this.end);
                 refreshBeginEndFromEquals();
                 return;
             }
 
-            this.begin = byteUnknownIsSmaller.max(this.begin, another.begin);
-            this.end = byteUnknownIsBigger.min(this.end, another.end);
+            this.begin = rangeStartComparator.comparator.max(this.begin, another.begin);
+            this.end = rangeEndComparator.comparator.min(this.end, another.end);
         }
 
         private Set<ByteArray> filter(Set<ByteArray> equalValues, ByteArray beginValue, ByteArray endValue) {
             Set<ByteArray> result = Sets.newHashSetWithExpectedSize(equalValues.size());
             for (ByteArray v : equalValues) {
-                if (byteUnknownIsSmaller.compare(beginValue, v) <= 0 && byteUnknownIsBigger.compare(v, endValue) <= 0) {
+                if (rangeStartEndComparator.comparator.compare(beginValue, v) <= 0 && rangeStartEndComparator.comparator.compare(v, endValue) <= 0) {
                     result.add(v);
                 }
             }
@@ -375,10 +427,10 @@ public class GTScanRangePlanner {
         }
 
         public String toString() {
-            if (equals == null) {
+            if (valueSet == null) {
                 return column.getName() + " between " + begin + " and " + end;
             } else {
-                return column.getName() + " in " + equals;
+                return column.getName() + " in " + valueSet;
             }
         }
     }
@@ -424,40 +476,55 @@ public class GTScanRangePlanner {
         }
     }
 
-    public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownSmaller(final IGTComparator comp) {
-        return new ComparatorEx<ByteArray>() {
+    public static RecordComparator getRangeStartComparator(final IGTComparator comp) {
+        return new RecordComparator(new ComparatorEx<ByteArray>() {
             @Override
             public int compare(ByteArray a, ByteArray b) {
-                if (a.array() == null)
-                    return -1;
-                else if (b.array() == null)
+                if (a.array() == null) {
+                    if (b.array() == null) {
+                        return 0;
+                    } else {
+                        return -1;
+                    }
+                } else if (b.array() == null) {
                     return 1;
-                else
+                } else {
                     return comp.compare(a, b);
+                }
             }
-        };
+        });
     }
 
-    public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownBigger(final IGTComparator comp) {
-        return new ComparatorEx<ByteArray>() {
+    public static RecordComparator getRangeEndComparator(final IGTComparator comp) {
+        return new RecordComparator(new ComparatorEx<ByteArray>() {
             @Override
             public int compare(ByteArray a, ByteArray b) {
-                if (a.array() == null)
-                    return 1;
-                else if (b.array() == null)
+                if (a.array() == null) {
+                    if (b.array() == null) {
+                        return 0;
+                    } else {
+                        return 1;
+                    }
+                } else if (b.array() == null) {
                     return -1;
-                else
+                } else {
                     return comp.compare(a, b);
+                }
             }
-        };
-    }
-
-    public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownSmaller(IGTComparator comp) {
-        return new RecordComparator(byteComparatorTreatsUnknownSmaller(comp));
+        });
     }
 
-    public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownBigger(IGTComparator comp) {
-        return new RecordComparator(byteComparatorTreatsUnknownBigger(comp));
+    public static RecordComparator getRangeStartEndComparator(final IGTComparator comp) {
+        return new AsymmetricRecordComparator(new ComparatorEx<ByteArray>() {
+            @Override
+            public int compare(ByteArray a, ByteArray b) {
+                if (a.array() == null || b.array() == null) {
+                    return -1;
+                } else {
+                    return comp.compare(a, b);
+                }
+            }
+        });
     }
 
     private static class RecordComparator extends ComparatorEx<GTRecord> {
@@ -473,7 +540,7 @@ public class GTScanRangePlanner {
             assert a.maskForEqualHashComp() == b.maskForEqualHashComp();
             ImmutableBitSet mask = a.maskForEqualHashComp();
 
-            int comp = 0;
+            int comp;
             for (int i = 0; i < mask.trueBitCount(); i++) {
                 int c = mask.trueBitAt(i);
                 comp = comparator.compare(a.cols[c], b.cols[c]);
@@ -483,4 +550,35 @@ public class GTScanRangePlanner {
             return 0; // equals
         }
     }
+
+    /**
+     * asymmetric means compare(a,b) > 0 does not cause compare(b,a) < 0 
+     * so min max functions will not bu supported
+     */
+    private static class AsymmetricRecordComparator extends RecordComparator {
+
+        AsymmetricRecordComparator(ComparatorEx<ByteArray> byteComparator) {
+            super(byteComparator);
+        }
+
+        public GTRecord min(Collection<GTRecord> v) {
+            throw new UnsupportedOperationException();
+        }
+
+        public GTRecord max(Collection<GTRecord> v) {
+            throw new UnsupportedOperationException();
+        }
+
+        public GTRecord min(GTRecord a, GTRecord b) {
+            throw new UnsupportedOperationException();
+        }
+
+        public GTRecord max(GTRecord a, GTRecord b) {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean between(GTRecord v, GTRecord start, GTRecord end) {
+            throw new UnsupportedOperationException();
+        }
+    }
 }