You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/25 14:12:18 UTC

[5/7] incubator-kylin git commit: KYLIN-942 support parallel scan for grid table

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 2b31e70..c81dd63 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -2,6 +2,7 @@ package org.apache.kylin.gridtable;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -88,7 +89,7 @@ public class GTScanRequest {
     }
 
     private void validateFilterPushDown(GTInfo info) {
-        if (hasFilterPushDown() == false)
+        if (!hasFilterPushDown())
             return;
 
         Set<TblColRef> filterColumns = Sets.newHashSet();
@@ -102,7 +103,7 @@ public class GTScanRequest {
         }
 
         // un-evaluatable filter must be removed
-        if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) {
+        if (!TupleFilter.isEvaluableRecursively(filterPushDown)) {
             Set<TblColRef> unevaluableColumns = Sets.newHashSet();
             filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns);
 
@@ -147,6 +148,10 @@ public class GTScanRequest {
         return range.pkEnd;
     }
 
+    public List<GTRecord> getFuzzyKeys() {
+        return range.fuzzyKeys;
+    }
+
     public ImmutableBitSet getSelectedColBlocks() {
         return selectedColBlocks;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
index bbd82c8..de9a5ce 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -33,10 +33,6 @@ public class GTUtil {
         return convertFilter(rootFilter, info, null, false, unevaluatableColumnCollector);
     }
 
-    public static TupleFilter convertFilterConstants(TupleFilter rootFilter, GTInfo info) {
-        return convertFilter(rootFilter, info, null, true, null);
-    }
-
     public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
             List<TblColRef> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
         return convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
index f812b8f..8f81654 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
@@ -14,17 +14,14 @@ public class GridTable implements Closeable {
     }
 
     public GTBuilder rebuild() throws IOException {
-        assert info.isShardingEnabled() == false;
         return rebuild(-1);
     }
 
     public GTBuilder rebuild(int shard) throws IOException {
-        assert shard < info.nShards;
         return new GTBuilder(info, shard, store);
     }
 
     public GTBuilder append() throws IOException {
-        assert info.isShardingEnabled() == false;
         return append(-1);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
index 5282544..f4c44f8 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
@@ -6,9 +6,9 @@ public interface IGTStore {
 
     GTInfo getInfo();
 
-    IGTWriter rebuild(int shard) throws IOException;
+    IGTWriter rebuild() throws IOException;
 
-    IGTWriter append(int shard) throws IOException;
+    IGTWriter append() throws IOException;
 
     IGTScanner scan(GTScanRequest scanRequest) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
index d7074e4..9675aa1 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
@@ -22,9 +22,6 @@ public class GTSimpleMemStore implements IGTStore {
     public GTSimpleMemStore(GTInfo info) {
         this.info = info;
         this.rowList = new ArrayList<byte[]>();
-
-        if (info.isShardingEnabled())
-            throw new UnsupportedOperationException();
     }
 
     @Override
@@ -41,13 +38,13 @@ public class GTSimpleMemStore implements IGTStore {
     }
 
     @Override
-    public IGTWriter rebuild(int shard) {
+    public IGTWriter rebuild() {
         rowList.clear();
         return new Writer();
     }
 
     @Override
-    public IGTWriter append(int shard) {
+    public IGTWriter append() {
         return new Writer();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
index 9a7970c..98f1eef 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
@@ -28,10 +28,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * @author George Song (ysong1)
- * 
- */
 public class RowKeySplitterTest extends LocalFileMetadataTestCase {
 
     @Before
@@ -49,23 +45,23 @@ public class RowKeySplitterTest extends LocalFileMetadataTestCase {
     public void testWithSlr() throws Exception {
         CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITH_SLR_READY");
 
-        RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20);
+        RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20);
         // base cuboid rowkey
-        byte[] input = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
-        rowKeySplitter.split(input, input.length);
+        byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+        rowKeySplitter.split(input);
 
-        assertEquals(10, rowKeySplitter.getBufferSize());
+        assertEquals(11, rowKeySplitter.getBufferSize());
     }
 
     @Test
     public void testWithoutSlr() throws Exception {
         CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_READY");
 
-        RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20);
+        RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20);
         // base cuboid rowkey
-        byte[] input = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
-        rowKeySplitter.split(input, input.length);
+        byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
+        rowKeySplitter.split(input);
 
-        assertEquals(9, rowKeySplitter.getBufferSize());
+        assertEquals(10, rowKeySplitter.getBufferSize());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
index 3704e03..d6b1718 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
@@ -34,10 +34,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * @author George Song (ysong1)
- * 
- */
 public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
 
     @Before
@@ -57,7 +53,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
 
         RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
 
-        byte[] key = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
+        byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 };
 
         rowKeyDecoder.decode(key);
         List<String> values = rowKeyDecoder.getValues();
@@ -70,7 +66,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
 
         RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
 
-        byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+        byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
 
         rowKeyDecoder.decode(key);
         List<String> values = rowKeyDecoder.getValues();
@@ -97,7 +93,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase {
         AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
 
         byte[] encodedKey = rowKeyEncoder.encode(data);
-        assertEquals(30, encodedKey.length);
+        assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
 
         RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment());
         rowKeyDecoder.decode(encodedKey);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
index c50b8c9..45c8108 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
@@ -35,10 +35,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * @author George Song (ysong1)
- * 
- */
 public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
 
     @Before
@@ -74,9 +70,11 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
         AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
 
         byte[] encodedKey = rowKeyEncoder.encode(data);
-        assertEquals(30, encodedKey.length);
-        byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
-        byte[] rest = Arrays.copyOfRange(encodedKey, 8, encodedKey.length);
+        assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+        byte[] rest = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        assertEquals(0, Bytes.toShort(shard));
         assertEquals(255, Bytes.toLong(cuboidId));
         assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest);
     }
@@ -104,10 +102,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
         AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
 
         byte[] encodedKey = rowKeyEncoder.encode(data);
-        assertEquals(48, encodedKey.length);
-        byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26);
-        byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
-        byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length);
+        assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN);
+        byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+        byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        assertEquals(0, Bytes.toShort(shard));
         assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
         assertEquals(511, Bytes.toLong(cuboidId));
         assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest);
@@ -136,10 +136,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase {
         AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid);
 
         byte[] encodedKey = rowKeyEncoder.encode(data);
-        assertEquals(48, encodedKey.length);
-        byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26);
-        byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8);
-        byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length);
+        assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN);
+        byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN);
+        byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length);
+        assertEquals(0, Bytes.toShort(shard));
         assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
         assertEquals(511, Bytes.toLong(cuboidId));
         assertArrayEquals(new byte[] { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }, rest);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index 684f0ef..91e7e18 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.gridtable.CubeCodeSystem;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.NumberDictionaryBuilder;
@@ -48,6 +49,7 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -55,39 +57,120 @@ import com.google.common.collect.Maps;
 
 public class DictGridTableTest {
 
+    private GridTable table;
+    private GTInfo info;
+    private CompareTupleFilter timeComp0;
+    private CompareTupleFilter timeComp1;
+    private CompareTupleFilter timeComp2;
+    private CompareTupleFilter timeComp3;
+    private CompareTupleFilter timeComp4;
+    private CompareTupleFilter timeComp5;
+    private CompareTupleFilter timeComp6;
+    private CompareTupleFilter ageComp1;
+    private CompareTupleFilter ageComp2;
+    private CompareTupleFilter ageComp3;
+    private CompareTupleFilter ageComp4;
+
+    @Before
+    public void setup() throws IOException {
+        table = newTestTable();
+        info = table.getInfo();
+
+        timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
+        timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+        timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+        timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+        timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
+        timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
+        ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+        ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+        ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+        ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+    }
+
+    @Test
+    public void verifySegmentSkipping() {
+
+        ByteArray segmentStart = enc(info, 0, "2015-01-14");
+        ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
+        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
+        assertEquals(segmentStart, segmentStartX);
+
+        GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0));
+
+        {
+            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(1, r.size());//scan range are [close,close]
+            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+            assertEquals(1, r.get(0).fuzzyKeys.size());
+            assertEquals("[[10]]", r.get(0).fuzzyKeys.toString());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp2, ageComp1);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(0, r.size());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp4, ageComp1);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(0, r.size());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(0, r.size());
+        }
+        {
+            LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1));
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
+            assertEquals("[[10], [1421193600000, 10]]", r.get(0).fuzzyKeys.toString());
+        }
+        {
+            LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
+            assertEquals(0, r.get(0).fuzzyKeys.size());
+        }
+    }
+
     @Test
-    public void test() throws IOException {
-        GridTable table = newTestTable();
-        verifyScanRangePlanner(table);
-        verifyFirstRow(table);
-        verifyScanWithUnevaluatableFilter(table);
-        verifyScanWithEvaluatableFilter(table);
-        verifyConvertFilterConstants1(table);
-        verifyConvertFilterConstants2(table);
-        verifyConvertFilterConstants3(table);
-        verifyConvertFilterConstants4(table);
+    public void verifySegmentSkipping2() {
+        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
+        GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0));
+
+        {
+            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(1, r.size());//scan range are [close,close]
+            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+            assertEquals(1, r.get(0).fuzzyKeys.size());
+            assertEquals("[[10]]", r.get(0).fuzzyKeys.toString());
+        }
+
+        {
+            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(0, r.size());//scan range are [close,close]
+        }
     }
 
-    private void verifyScanRangePlanner(GridTable table) {
-        GTInfo info = table.getInfo();
-        GTScanRangePlanner planner = new GTScanRangePlanner(info);
+    @Test
+    public void verifyScanRangePlanner() {
 
-        CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-        CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
-        CompareTupleFilter timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
-        CompareTupleFilter timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
-        CompareTupleFilter ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
-        CompareTupleFilter ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
-        CompareTupleFilter ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
-        CompareTupleFilter ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+        GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null);
 
         // flatten or-and & hbase fuzzy value
         {
             LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
             List<GTScanRange> r = planner.planScanRanges(filter);
             assertEquals(1, r.size());
-            assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
-            assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
+            assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
+            assertEquals("[[10], [20]]", r.get(0).fuzzyKeys.toString());
         }
 
         // pre-evaluate ever false
@@ -124,11 +207,13 @@ public class DictGridTableTest {
         }
     }
 
-    private void verifyFirstRow(GridTable table) throws IOException {
+    @Test
+    public void verifyFirstRow() throws IOException {
         doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]");
     }
 
-    private void verifyScanWithUnevaluatableFilter(GridTable table) throws IOException {
+    @Test
+    public void verifyScanWithUnevaluatableFilter() throws IOException {
         GTInfo info = table.getInfo();
 
         CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
@@ -144,7 +229,8 @@ public class DictGridTableTest {
         doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]");
     }
 
-    private void verifyScanWithEvaluatableFilter(GridTable table) throws IOException {
+    @Test
+    public void verifyScanWithEvaluatableFilter() throws IOException {
         GTInfo info = table.getInfo();
 
         CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
@@ -159,7 +245,8 @@ public class DictGridTableTest {
         doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
     }
 
-    private void verifyConvertFilterConstants1(GridTable table) {
+    @Test
+    public void verifyConvertFilterConstants1() {
         GTInfo info = table.getInfo();
 
         TableDesc extTable = TableDesc.mockup("ext");
@@ -178,7 +265,8 @@ public class DictGridTableTest {
         assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString());
     }
 
-    private void verifyConvertFilterConstants2(GridTable table) {
+    @Test
+    public void verifyConvertFilterConstants2() {
         GTInfo info = table.getInfo();
 
         TableDesc extTable = TableDesc.mockup("ext");
@@ -198,7 +286,8 @@ public class DictGridTableTest {
         assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString());
     }
 
-    private void verifyConvertFilterConstants3(GridTable table) {
+    @Test
+    public void verifyConvertFilterConstants3() {
         GTInfo info = table.getInfo();
 
         TableDesc extTable = TableDesc.mockup("ext");
@@ -218,7 +307,8 @@ public class DictGridTableTest {
         assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString());
     }
 
-    private void verifyConvertFilterConstants4(GridTable table) {
+    @Test
+    public void verifyConvertFilterConstants4() {
         GTInfo info = table.getInfo();
 
         TableDesc extTable = TableDesc.mockup("ext");
@@ -252,7 +342,7 @@ public class DictGridTableTest {
         scanner.close();
     }
 
-    private Object enc(GTInfo info, int col, String value) {
+    private ByteArray enc(GTInfo info, int col, String value) {
         ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
         info.codeSystem.encodeColumnValue(col, value, buf);
         return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 8b8fb87..919ede6 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.engine;
 
+import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V1;
+import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V2;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,30 +28,29 @@ import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IEngineAware;
-import static org.apache.kylin.metadata.model.IEngineAware.*;
 
 public class EngineFactory {
-    
-    private static ImplementationSwitch batchEngines;
-    private static ImplementationSwitch streamingEngines;
+
+    private static ImplementationSwitch<IBatchCubingEngine> batchEngines;
+    private static ImplementationSwitch<IStreamingCubingEngine> streamingEngines;
     static {
         Map<Integer, String> impls = new HashMap<>();
         impls.put(ID_MR_V1, "org.apache.kylin.engine.mr.MRBatchCubingEngine");
         impls.put(ID_MR_V2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
-        batchEngines = new ImplementationSwitch(impls);
-        
+        batchEngines = new ImplementationSwitch<IBatchCubingEngine>(impls, IBatchCubingEngine.class);
+
         impls.clear();
-        streamingEngines = new ImplementationSwitch(impls); // TODO
+        streamingEngines = new ImplementationSwitch<IStreamingCubingEngine>(impls, IStreamingCubingEngine.class); // TODO
     }
-    
+
     public static IBatchCubingEngine batchEngine(IEngineAware aware) {
-        return batchEngines.get(aware.getEngineType(), IBatchCubingEngine.class);
+        return batchEngines.get(aware.getEngineType());
     }
-    
+
     public static IStreamingCubingEngine streamingEngine(IEngineAware aware) {
-        return streamingEngines.get(aware.getEngineType(), IStreamingCubingEngine.class);
+        return streamingEngines.get(aware.getEngineType());
     }
-    
+
     /** Build a new cube segment, typically its time range appends to the end of current cube. */
     public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
         return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);
@@ -58,7 +60,7 @@ public class EngineFactory {
     public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
         return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
     }
-    
+
     public static Runnable createStreamingCubingBuilder(CubeSegment seg) {
         return streamingEngine(seg).createStreamingCubingBuilder(seg);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index c5bd3e0..e456ac1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -115,6 +115,17 @@ public abstract class TupleFilter {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
+     * flatten filter will ONLY contain AND and OR , no NOT will exist.
+     * This will help to decide scan ranges.
+     * 
+     * Notice that the flatten filter will ONLY be used for determining scan ranges,
+     * The filter that is later pushed down into storage level is still the ORIGINAL
+     * filter, since the flattened filter will be too "fat" to evaluate
+     * 
+     * @return
+     */
     public TupleFilter flatFilter() {
         return flattenInternal(this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
index a9d785b..7404136 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
@@ -33,7 +33,7 @@ import org.apache.kylin.common.util.BytesUtil;
  */
 public class TupleFilterSerializer {
 
-    public static interface Decorator {
+    public interface Decorator {
         TupleFilter onSerialize(TupleFilter filter);
     }
 
@@ -69,20 +69,20 @@ public class TupleFilterSerializer {
 
         if (filter.hasChildren()) {
             // serialize filter+true
-            serializeFilter(1, filter, decorator, buffer, cs);
+            serializeFilter(1, filter, buffer, cs);
             // serialize children
             for (TupleFilter child : filter.getChildren()) {
                 internalSerialize(child, decorator, buffer, cs);
             }
             // serialize none
-            serializeFilter(-1, filter, decorator, buffer, cs);
+            serializeFilter(-1, filter, buffer, cs);
         } else {
             // serialize filter+false
-            serializeFilter(0, filter, decorator, buffer, cs);
+            serializeFilter(0, filter, buffer, cs);
         }
     }
 
-    private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
+    private static void serializeFilter(int flag, TupleFilter filter, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
         if (flag < 0) {
             BytesUtil.writeVInt(-1, buffer);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index a771f5e..54068f6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -18,25 +18,26 @@
 
 package org.apache.kylin.source;
 
+import static org.apache.kylin.metadata.model.ISourceAware.ID_HIVE;
+
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.metadata.model.ISourceAware;
-import static org.apache.kylin.metadata.model.ISourceAware.*;
 import org.apache.kylin.metadata.model.TableDesc;
 
 public class SourceFactory {
 
-    private static ImplementationSwitch sources;
+    private static ImplementationSwitch<ISource> sources;
     static {
         Map<Integer, String> impls = new HashMap<>();
         impls.put(ID_HIVE, "org.apache.kylin.source.hive.HiveSource");
-        sources = new ImplementationSwitch(impls);
+        sources = new ImplementationSwitch<ISource>(impls, ISource.class);
     }
 
     public static ISource tableSource(ISourceAware aware) {
-        return sources.get(aware.getSourceType(), ISource.class);
+        return sources.get(aware.getSourceType());
     }
 
     public static ReadableTable createReadableTable(TableDesc table) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index b26dfdb..271583c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -18,7 +18,8 @@
 
 package org.apache.kylin.storage;
 
-import static org.apache.kylin.metadata.model.IStorageAware.*;
+import static org.apache.kylin.metadata.model.IStorageAware.ID_HBASE;
+import static org.apache.kylin.metadata.model.IStorageAware.ID_HYBRID;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -31,22 +32,22 @@ import org.apache.kylin.metadata.realization.IRealization;
  */
 public class StorageFactory {
 
-    private static ImplementationSwitch storages;
+    private static ImplementationSwitch<IStorage> storages;
     static {
         Map<Integer, String> impls = new HashMap<>();
         impls.put(ID_HBASE, "org.apache.kylin.storage.hbase.HBaseStorage");
         impls.put(ID_HYBRID, "org.apache.kylin.storage.hybrid.HybridStorage");
-        storages = new ImplementationSwitch(impls);
+        storages = new ImplementationSwitch<IStorage>(impls, IStorage.class);
     }
-    
+
     public static IStorage storage(IStorageAware aware) {
-        return storages.get(aware.getStorageType(), IStorage.class);
+        return storages.get(aware.getStorageType());
     }
-    
+
     public static IStorageQuery createQuery(IRealization realization) {
         return storage(realization).createQuery(realization);
     }
-    
+
     public static <T> T createEngineAdapter(IStorageAware aware, Class<T> engineInterface) {
         return storage(aware).adaptToBuildEngine(engineInterface);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 0c30a3c..4049fcd 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -163,7 +163,7 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
 
     @Override
     public int getCost(SQLDigest digest) {
-        cost = 100;
+        cost = Integer.MAX_VALUE;
         for (IRealization realization : this.getRealizations()) {
             if (realization.isCapable(digest))
                 cost = Math.min(cost, realization.getCost(digest));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
index fbc6d19..1e05eb8 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
@@ -32,23 +32,21 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-/**
- * @author yangli9
- * 
- */
 public class FuzzyValueCombination {
 
-    private static class Dim {
+    private static class Dim<E> {
         TblColRef col;
-        Set<String> values;
+        Set<E> values;
     }
 
-    private static final Set<String> SINGLE_NULL_SET = Sets.newHashSet();
+    private static final Set SINGLE_NULL_SET = Sets.newHashSet();
+
     static {
         SINGLE_NULL_SET.add(null);
     }
 
-    public static List<Map<TblColRef, String>> calculate(Map<TblColRef, Set<String>> fuzzyValues, long cap) {
+    public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> fuzzyValues, long cap) {
+        Collections.emptyMap();
         Dim[] dims = toDims(fuzzyValues);
         // If a query has many IN clause and each IN clause has many values, then it will easily generate 
         // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked 
@@ -61,9 +59,9 @@ public class FuzzyValueCombination {
     }
 
     @SuppressWarnings("unchecked")
-    private static List<Map<TblColRef, String>> combination(Dim[] dims) {
+    private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) {
 
-        List<Map<TblColRef, String>> result = Lists.newArrayList();
+        List<Map<TblColRef, E>> result = Lists.newArrayList();
 
         int emptyDims = 0;
         for (Dim dim : dims) {
@@ -76,8 +74,8 @@ public class FuzzyValueCombination {
             return result;
         }
 
-        Map<TblColRef, String> r = Maps.newHashMap();
-        Iterator<String>[] iters = new Iterator[dims.length];
+        Map<TblColRef, E> r = Maps.newHashMap();
+        Iterator<E>[] iters = new Iterator[dims.length];
         int level = 0;
         while (true) {
             Dim dim = dims[level];
@@ -85,7 +83,7 @@ public class FuzzyValueCombination {
                 iters[level] = dim.values.iterator();
             }
 
-            Iterator<String> it = iters[level];
+            Iterator<E> it = iters[level];
             if (it.hasNext() == false) {
                 if (level == 0)
                     break;
@@ -97,7 +95,7 @@ public class FuzzyValueCombination {
 
             r.put(dim.col, it.next());
             if (level == dims.length - 1) {
-                result.add(new HashMap<TblColRef, String>(r));
+                result.add(new HashMap<TblColRef, E>(r));
             } else {
                 level++;
             }
@@ -105,10 +103,10 @@ public class FuzzyValueCombination {
         return result;
     }
 
-    private static Dim[] toDims(Map<TblColRef, Set<String>> fuzzyValues) {
+    private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) {
         Dim[] dims = new Dim[fuzzyValues.size()];
         int i = 0;
-        for (Entry<TblColRef, Set<String>> entry : fuzzyValues.entrySet()) {
+        for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) {
             dims[i] = new Dim();
             dims[i].col = entry.getKey();
             dims[i].values = entry.getValue();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
index 47553ad..bdcd257 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
@@ -119,7 +119,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
         }
 
         AbstractRowKeyEncoder encoder = AbstractRowKeyEncoder.createInstance(cubeSeg, cuboid);
-
+        encoder.setEncodeShard(false);//will enumerate all possible shards when scanning
+        
         encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
 
         this.startKey = encoder.encode(startValues);
@@ -133,7 +134,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
         // restore encoder defaults for later reuse (note
         // AbstractRowKeyEncoder.createInstance() caches instances)
         encoder.setBlankByte(AbstractRowKeyEncoder.DEFAULT_BLANK_BYTE);
-
+        
+        encoder.setEncodeShard(true);
         // always fuzzy match cuboid ID to lock on the selected cuboid
         this.fuzzyKeys = buildFuzzyKeys(fuzzyValues);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/dev-support/test_all.sh
----------------------------------------------------------------------
diff --git a/dev-support/test_all.sh b/dev-support/test_all.sh
new file mode 100644
index 0000000..6a7b887
--- /dev/null
+++ b/dev-support/test_all.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+dir=$(dirname ${0})
+cd ${dir}
+cd ..
+
+mvn clean install -DskipTests | tee mci.log
+mvn test -Dtest=org.apache.kylin.job.BuildCubeWithEngineTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithEngineTest.log
+mvn test -Dtest=org.apache.kylin.job.BuildIIWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildIIWithStreamTest.log
+mvn test -Dtest=org.apache.kylin.job.BuildCubeWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithStreamTest.log
+mvn verify -fae -P sandbox | tee mvnverify.log
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
new file mode 100644
index 0000000..d09e4ec
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class CuboidShardUtil {
+    protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class);
+
+    public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards, int totalShards) throws IOException {
+        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        Map<Long, Short> filered = Maps.newHashMap();
+        for (Map.Entry<Long, Short> entry : cuboidShards.entrySet()) {
+            if (entry.getValue() <= 1) {
+                logger.info("Cuboid {} has {} shards, skip saving it as an optimization", entry.getKey(), entry.getValue());
+            } else {
+                logger.info("Cuboid {} has {} shards, saving it", entry.getKey(), entry.getValue());
+                filered.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        segment.setCuboidShardNums(filered);
+        segment.setTotalShards(totalShards);
+
+        CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance());
+        cubeBuilder.setToUpdateSegs(segment);
+        cubeManager.updateCube(cubeBuilder);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
new file mode 100644
index 0000000..10c82c3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+
+public class CuboidStatsUtil {
+
+   public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
+        Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+        SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class));
+
+        List<Long> allCuboids = new ArrayList<Long>();
+        allCuboids.addAll(cuboidHLLMap.keySet());
+        Collections.sort(allCuboids);
+
+        // persist the sample percentage with key 0
+        writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage)));
+        ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        try {
+            for (long i : allCuboids) {
+                valueBuf.clear();
+                cuboidHLLMap.get(i).writeRegisters(valueBuf);
+                valueBuf.flip();
+                writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()));
+            }
+        } finally {
+            writer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 85312ff..568dd77 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -47,6 +47,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Lists;
@@ -144,7 +145,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
         //output the hll info;
         if (collectStatistics) {
             writeMapperAndCuboidStatistics(context); // for human check
-            writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob
+            CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob
         }
     }
 
@@ -202,27 +203,4 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
 
     }
 
-    public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
-        Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
-        SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class));
-
-        List<Long> allCuboids = new ArrayList<Long>();
-        allCuboids.addAll(cuboidHLLMap.keySet());
-        Collections.sort(allCuboids);
-
-        // persist the sample percentage with key 0
-        writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage)));
-        ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        try {
-            for (long i : allCuboids) {
-                valueBuf.clear();
-                cuboidHLLMap.get(i).writeRegisters(valueBuf);
-                valueBuf.flip();
-                writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()));
-            }
-        } finally {
-            writer.close();
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 7bb2e16..4c743fb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -7,7 +7,8 @@ import java.util.BitSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
@@ -37,12 +38,13 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
     private ByteArrayWritable outputValue = new ByteArrayWritable();
     private long cuboidRowCount = 0;
 
+    //for shard
+
     public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
         this.mapContext = mapContext;
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
         this.measureCount = cubeDesc.getMeasures().size();
-
     }
 
     @Override
@@ -59,12 +61,20 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
         }
 
         cuboidRowCount++;
-        int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+        int header = RowConstants.ROWKEY_HEADER_LEN;
+        int offSet = header;
         for (int x = 0; x < dimensions; x++) {
             System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
             offSet += record.get(x).length();
         }
 
+        //fill shard
+        short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
+        short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum);
+        short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
+        short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
+        BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
         //output measures
         valueBuf.clear();
         record.exportColumns(measureColumnsIndex, valueBuf);
@@ -89,7 +99,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
     }
 
     private void initVariables(Long cuboidId) {
-        bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+        bytesLength = RowConstants.ROWKEY_HEADER_LEN;
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
         for (TblColRef column : cuboid.getColumns()) {
             bytesLength += cubeSegment.getColumnLength(column);
@@ -102,6 +112,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
             measureColumnsIndex[i] = dimensions + i;
         }
 
-        System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+        //write cuboid id first
+        BytesUtil.writeLong(cuboidId, keyBuf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 4598673..9b25c97 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -124,11 +124,16 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
 
         Preconditions.checkState(key.offset() == 0);
 
-        long cuboidID = rowKeySplitter.split(key.array(), key.length());
+        long cuboidID = rowKeySplitter.split(key.array());
+        short shard = rowKeySplitter.getLastSplittedShard();
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
 
         SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
         int bufOffset = 0;
+
+        BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN);
+        bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
+        
         BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
         bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 45f0d32..6301f3d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -103,7 +103,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
     }
-    
+
     private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
 
     public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
@@ -111,7 +111,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         String jobID = extractJobIDFromPath(filePath);
         return findSegmentWithUuid(jobID, cube);
     }
-    
+
     private static String extractJobIDFromPath(String path) {
         Matcher matcher = JOB_NAME_PATTERN.matcher(path);
         // check the first occurrence
@@ -134,11 +134,14 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
     @Override
     public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
+        long cuboidID = rowKeySplitter.split(key.getBytes());
+        short shard = rowKeySplitter.getLastSplittedShard();
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
 
         SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
         int bufOffset = 0;
+        BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN);
+        bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
         BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
         bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 5e935eb..67c4416 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -47,6 +47,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -126,7 +127,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
                 }
             }
             averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
-            FactDistinctColumnsReducer.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
+            CuboidStatsUtil.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
             Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
             FileSystem fs = statisticsFilePath.getFileSystem(conf);
             FSDataInputStream is = fs.open(statisticsFilePath);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java
index 40c4dd7..dc8fb3f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java
@@ -20,11 +20,6 @@ package org.apache.kylin.engine.mr.steps;
 
 import org.apache.hadoop.util.ToolRunner;
 
-/**
- * @author George Song (ysong1)
- * 
- */
-
 public class NDCuboidJob extends CuboidJob {
 
     public NDCuboidJob() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index c47d090..2180dd6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -30,6 +32,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -49,6 +52,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private Text outputKey = new Text();
     private String cubeName;
     private String segmentName;
+    private CubeSegment cubeSegment;
     private CubeDesc cubeDesc;
     private CuboidScheduler cuboidScheduler;
 
@@ -68,7 +72,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
         cubeDesc = cube.getDescriptor();
 
         // initialize CubiodScheduler
@@ -80,16 +84,21 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
         int offset = 0;
 
+        //shard id will be filled after other contents
+        offset += RowConstants.ROWKEY_SHARDID_LEN;
+
         // cuboid id
         System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
-        offset += childCuboid.getBytes().length;
+        offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+        int bodyOffset = offset;
 
         // rowkey columns
         long mask = Long.highestOneBit(parentCuboid.getId());
         long parentCuboidId = parentCuboid.getId();
         long childCuboidId = childCuboid.getId();
         long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
-        int index = 1; // skip cuboidId
+        int index = 2; // skip shard and cuboidId
         for (int i = 0; i < parentCuboidIdActualLength; i++) {
             if ((mask & parentCuboidId) > 0) {// if the this bit position equals
                                               // 1
@@ -103,12 +112,18 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
             mask = mask >> 1;
         }
 
+        //fill shard
+        short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId);
+        short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - bodyOffset, cuboidShardNum);
+        short finalShard = ShardingHash.normalize(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards());
+        BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
         return offset;
     }
 
     @Override
     public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
+        long cuboidId = rowKeySplitter.split(key.getBytes());
         Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
 
         Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
index 165bc13..5f2f100 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
@@ -9,6 +9,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -28,7 +29,7 @@ public class FactDistinctColumnsReducerTest {
 
         System.out.println(outputPath);
         Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
-        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
+        CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
         FileSystem.getLocal(conf).delete(outputPath, true);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
index efcb2ba..9e1fc2d 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.junit.After;
 import org.junit.Before;
@@ -73,7 +74,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
         mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
         mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
 
-        byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+        byte[] key = { 0,0,0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
         byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
         Pair<Text, Text> input1 = new Pair<Text, Text>(new Text(key), new Text(value));
 
@@ -83,7 +84,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
 
         assertEquals(4, result.size());
 
-        byte[] resultKey = { 0, 0, 0, 0, 0, 0, 1, 127, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+        byte[] resultKey = { 0,0,0, 0, 0, 0, 0, 0, 1, 127, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
         byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
         Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue));
 
@@ -103,7 +104,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
         System.out.println(Bytes.toLong(new byte[] { 0, 0, 0, 0, 0, 0, 1, -1 }));
         for (int i = 0; i < result.size(); i++) {
             byte[] bytes = new byte[result.get(i).getFirst().getLength()];
-            System.arraycopy(result.get(i).getFirst().getBytes(), 0, bytes, 0, result.get(i).getFirst().getLength());
+            System.arraycopy(result.get(i).getFirst().getBytes(), RowConstants.ROWKEY_SHARDID_LEN, bytes, 0, result.get(i).getFirst().getLength()-RowConstants.ROWKEY_SHARDID_LEN);
             System.out.println(Bytes.toLong(bytes));
             keySet[i] = Bytes.toLong(bytes);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 b/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000
old mode 100644
new mode 100755
index d277125..c412e3a
Binary files a/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 and b/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 differ

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/resources/data/base_cuboid/part-r-00000
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 b/engine-mr/src/test/resources/data/base_cuboid/part-r-00000
old mode 100644
new mode 100755
index ed53ffb..9ade717
Binary files a/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 and b/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 differ

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 7296fec..70b62c0 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -298,7 +298,7 @@ public class SparkCubing extends AbstractSparkApplication {
     private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception {
         CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
         final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap();
         for (TblColRef tblColRef : baseCuboidColumn) {
@@ -341,7 +341,7 @@ public class SparkCubing extends AbstractSparkApplication {
                 LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
                 System.out.println("load properties finished");
                 AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
-                final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeDesc, columnLengthMap));
+                final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeSegment, columnLengthMap));
                 Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
                 try {
                     while (listIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
index c687b78..986e45e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
@@ -17,18 +17,21 @@
 */
 package org.apache.kylin.engine.spark.cube;
 
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Map;
+
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.TblColRef;
-import scala.Tuple2;
 
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Map;
+import scala.Tuple2;
 
 /**
  */
@@ -36,13 +39,13 @@ public final class DefaultTupleConverter implements TupleConverter {
 
     private final static ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>();
     private final static ThreadLocal<int[]> measureColumnsIndex = new ThreadLocal<>();
-    private final CubeDesc cubeDesc;
+    private final CubeSegment segment;
     private final int measureCount;
     private final Map<TblColRef, Integer> columnLengthMap;
 
-    public DefaultTupleConverter(CubeDesc cubeDesc, Map<TblColRef, Integer> columnLengthMap) {
-        this.cubeDesc = cubeDesc;
-        this.measureCount = cubeDesc.getMeasures().size();
+    public DefaultTupleConverter(CubeSegment segment, Map<TblColRef, Integer> columnLengthMap) {
+        this.segment = segment;
+        this.measureCount = segment.getCubeDesc().getMeasures().size();
         this.columnLengthMap = columnLengthMap;
     }
 
@@ -59,16 +62,16 @@ public final class DefaultTupleConverter implements TupleConverter {
         }
         return measureColumnsIndex.get();
     }
-    
+
     @Override
     public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) {
-        int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+        int bytesLength = RowConstants.ROWKEY_HEADER_LEN;
+        Cuboid cuboid = Cuboid.findById(segment.getCubeDesc(), cuboidId);
         for (TblColRef column : cuboid.getColumns()) {
             bytesLength += columnLengthMap.get(column);
         }
 
-        final int dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+        final int dimensions = BitSet.valueOf(new long[] { cuboidId }).cardinality();
         int[] measureColumnsIndex = getMeasureColumnsIndex();
         for (int i = 0; i < measureCount; i++) {
             measureColumnsIndex[i] = dimensions + i;
@@ -76,13 +79,21 @@ public final class DefaultTupleConverter implements TupleConverter {
 
         byte[] key = new byte[bytesLength];
         System.arraycopy(Bytes.toBytes(cuboidId), 0, key, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
-        int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+        int header = RowConstants.ROWKEY_HEADER_LEN;
+        int offSet = header;
         for (int x = 0; x < dimensions; x++) {
             final ByteArray byteArray = record.get(x);
             System.arraycopy(byteArray.array(), byteArray.offset(), key, offSet, byteArray.length());
             offSet += byteArray.length();
         }
 
+        //fill shard
+        short cuboidShardNum = segment.getCuboidShardNum(cuboidId);
+        short shardOffset = ShardingHash.getShard(key, header, offSet - header, cuboidShardNum);
+        short cuboidShardBase = segment.getCuboidBaseShard(cuboidId);
+        short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, segment.getTotalShards());
+        BytesUtil.writeShort(finalShard, key, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
         ByteBuffer valueBuf = getValueBuf();
         valueBuf.clear();
         record.exportColumns(measureColumnsIndex, valueBuf);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java
deleted file mode 100644
index f16e9fe..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.invertedindex.index;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-public class ShardingHash {
-
-    static HashFunction hashFunc = Hashing.murmur3_128();
-
-    public static long hashInt(int integer) {
-        return hashFunc.newHasher().putInt(integer).hash().asLong();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index 817bf01..2521fbf 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.measure.LongMutable;
 
@@ -153,7 +154,7 @@ public class TableRecord implements Cloneable {
 
     public short getShard() {
         int timestampID = rawRecord.getValueID(info.getTimestampColumn());
-        return (short) (Math.abs(ShardingHash.hashInt(timestampID)) % info.getDescriptor().getSharding());
+        return ShardingHash.getShard(timestampID, info.getDescriptor().getSharding());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 661672e..7a2313c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@
         <commons-cli.version>1.2</commons-cli.version>
         <commons-lang.version>2.6</commons-lang.version>
         <commons-lang3.version>3.1</commons-lang3.version>
+        <commons-math3.version>3.3</commons-math3.version>
         <commons-io.version>2.4</commons-io.version>
         <commons-configuration.version>1.9</commons-configuration.version>
         <commons-daemon.version>1.0.15</commons-daemon.version>
@@ -331,6 +332,12 @@
                 <version>${commons-lang3.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-math3</artifactId>
+                <version>${commons-math3.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>
                 <version>${commons-io.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index d80763c..1cd55d4 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -49,13 +49,10 @@ public abstract class RoutingRule {
 
     public static void applyRules(List<IRealization> realizations, OLAPContext olapContext) {
         for (RoutingRule rule : rules) {
-            logger.info("Initial realizations order:");
-            logger.info(getPrintableText(realizations));
-            logger.info("Applying rule " + rule);
-
+            logger.info("Realizations order before: " + getPrintableText(realizations));
+            logger.info("Applying rule : " + rule);
             rule.apply(realizations, olapContext);
-
-            logger.info(getPrintableText(realizations));
+            logger.info("Realizations order after: " + getPrintableText(realizations));
             logger.info("===================================================");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/debug/query78.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/debug/query78.sql b/query/src/test/resources/query/debug/query78.sql
new file mode 100644
index 0000000..299f1a4
--- /dev/null
+++ b/query/src/test/resources/query/debug/query78.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select count(*) as c,sum(PRICE) as GMV, LSTG_FORMAT_NAME as FORMAT_NAME
+from test_kylin_fact
+where (LSTG_FORMAT_NAME in ('ABIN')) or  (LSTG_FORMAT_NAME>='FP-GTC' and LSTG_FORMAT_NAME<='Others')
+group by LSTG_FORMAT_NAME

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/sql/query01.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query01.sql b/query/src/test/resources/query/sql/query01.sql
index 5a53058..9ed1db3 100644
--- a/query/src/test/resources/query/sql/query01.sql
+++ b/query/src/test/resources/query/sql/query01.sql
@@ -16,5 +16,5 @@
 -- limitations under the License.
 --
 
-select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact 
- group by LSTG_FORMAT_NAME 
+select LSTG_FORMAT_NAME,slr_segment_cd ,sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact 
+ group by LSTG_FORMAT_NAME ,slr_segment_cd