You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/13 05:39:53 UTC

[4/8] kylin git commit: refactor

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
deleted file mode 100644
index d50baad..0000000
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.ErrorTestExecutable;
-import org.apache.kylin.job.FailedTestExecutable;
-import org.apache.kylin.job.SelfStopExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.junit.Test;
-
-/**
- */
-public class DefaultSchedulerTest extends BaseSchedulerTest {
-
-    @Test
-    public void testSingleTaskJob() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-    }
-
-    @Test
-    public void testSucceed() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        BaseTestExecutable task2 = new SucceedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testSucceedAndFailed() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        BaseTestExecutable task2 = new FailedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testSucceedAndError() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new ErrorTestExecutable();
-        BaseTestExecutable task2 = new SucceedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testDiscard() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SelfStopExecutable();
-        job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
-        jobService.discardJob(job.getId());
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
-        Thread.sleep(5000);
-        System.out.println(job);
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Test
-    public void testSchedulerPool() throws InterruptedException {
-        ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
-        final CountDownLatch countDownLatch = new CountDownLatch(3);
-        ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                countDownLatch.countDown();
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
-        assertTrue("future should still running", future.cancel(true));
-
-        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
-        ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                countDownLatch2.countDown();
-                throw new RuntimeException();
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
-        assertFalse("future2 should has been stopped", future2.cancel(true));
-
-        final CountDownLatch countDownLatch3 = new CountDownLatch(3);
-        ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    countDownLatch3.countDown();
-                    throw new RuntimeException();
-                } catch (Exception e) {
-                }
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
-        assertTrue("future3 should still running", future3.cancel(true));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
index 0ae7e6a..18da37a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.metadata.filter.UDF;
 
-import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.metadata.filter.function.Functions;
 import org.apache.kylin.metadata.model.TblColRef;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
new file mode 100644
index 0000000..3bb4540
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
@@ -0,0 +1,189 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ */
+public class StorageMockUtils {
+    public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) {
+        TupleInfo info = new TupleInfo();
+        int idx = 0;
+
+        for (TblColRef col : groups) {
+            info.setField(col.getName(), col, idx++);
+        }
+
+        TableDesc sourceTable = groups.get(0).getColumnDesc().getTable();
+        for (FunctionDesc func : aggregations) {
+            TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable));
+            info.setField(col.getName(), col, idx++);
+        }
+
+        return info;
+    }
+
+    public static List<TblColRef> buildGroups() {
+        List<TblColRef> groups = new ArrayList<TblColRef>();
+
+        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+        ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
+        TblColRef cf1 = new TblColRef(c1);
+        groups.add(cf1);
+
+        TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS");
+        ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string");
+        TblColRef cf2 = new TblColRef(c2);
+        groups.add(cf2);
+
+        return groups;
+    }
+
+    public static List<FunctionDesc> buildAggregations1() {
+        List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+        TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
+
+        FunctionDesc f1 = new FunctionDesc();
+        f1.setExpression("SUM");
+        ParameterDesc p1 = new ParameterDesc();
+        p1.setType("column");
+        p1.setValue("PRICE");
+        p1.setColRefs(ImmutableList.of(priceCol));
+        f1.setParameter(p1);
+        f1.setReturnType("decimal(19,4)");
+        functions.add(f1);
+
+
+        return functions;
+    }
+
+    public static List<FunctionDesc> buildAggregations() {
+        List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+        TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
+        TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint"));
+
+        FunctionDesc f1 = new FunctionDesc();
+        f1.setExpression("SUM");
+        ParameterDesc p1 = new ParameterDesc();
+        p1.setType("column");
+        p1.setValue("PRICE");
+        p1.setColRefs(ImmutableList.of(priceCol));
+        f1.setParameter(p1);
+        f1.setReturnType("decimal(19,4)");
+        functions.add(f1);
+
+        FunctionDesc f2 = new FunctionDesc();
+        f2.setExpression("COUNT_DISTINCT");
+        ParameterDesc p2 = new ParameterDesc();
+        p2.setType("column");
+        p2.setValue("SELLER_ID");
+        p2.setColRefs(ImmutableList.of(sellerCol));
+        f2.setParameter(p2);
+        f2.setReturnType("hllc(10)");
+        functions.add(f2);
+
+        return functions;
+    }
+
+    public static CompareTupleFilter buildTs2010Filter(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter1);
+        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01");
+        compareFilter.addChild(constantFilter1);
+        return compareFilter;
+    }
+
+    public static CompareTupleFilter buildTs2011Filter(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter1);
+        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01");
+        compareFilter.addChild(constantFilter1);
+        return compareFilter;
+    }
+
+    public static CompareTupleFilter buildFilter1(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter1);
+        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
+        compareFilter.addChild(constantFilter1);
+        return compareFilter;
+    }
+
+    public static CompareTupleFilter buildFilter2(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter2);
+        ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories");
+        compareFilter.addChild(constantFilter2);
+        return compareFilter;
+    }
+
+    public static CompareTupleFilter buildFilter3(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter1);
+        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
+        compareFilter.addChild(constantFilter1);
+        return compareFilter;
+    }
+
+
+    public static TupleFilter buildAndFilter(List<TblColRef> columns) {
+        CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
+        CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
+        LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        andFilter.addChild(compareFilter1);
+        andFilter.addChild(compareFilter2);
+        return andFilter;
+    }
+
+    public static TupleFilter buildOrFilter(List<TblColRef> columns) {
+        CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
+        CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
+        LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+        logicFilter.addChild(compareFilter1);
+        logicFilter.addChild(compareFilter2);
+        return logicFilter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
deleted file mode 100644
index 5f8f08f..0000000
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.cache;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- */
-public class StorageMockUtils {
-    public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) {
-        TupleInfo info = new TupleInfo();
-        int idx = 0;
-
-        for (TblColRef col : groups) {
-            info.setField(col.getName(), col, idx++);
-        }
-
-        TableDesc sourceTable = groups.get(0).getColumnDesc().getTable();
-        for (FunctionDesc func : aggregations) {
-            TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable));
-            info.setField(col.getName(), col, idx++);
-        }
-
-        return info;
-    }
-
-    public static List<TblColRef> buildGroups() {
-        List<TblColRef> groups = new ArrayList<TblColRef>();
-
-        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
-        ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
-        TblColRef cf1 = new TblColRef(c1);
-        groups.add(cf1);
-
-        TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS");
-        ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string");
-        TblColRef cf2 = new TblColRef(c2);
-        groups.add(cf2);
-
-        return groups;
-    }
-
-    public static List<FunctionDesc> buildAggregations() {
-        List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
-
-        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
-        TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
-        TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint"));
-
-        FunctionDesc f1 = new FunctionDesc();
-        f1.setExpression("SUM");
-        ParameterDesc p1 = new ParameterDesc();
-        p1.setType("column");
-        p1.setValue("PRICE");
-        p1.setColRefs(ImmutableList.of(priceCol));
-        f1.setParameter(p1);
-        f1.setReturnType("decimal(19,4)");
-        functions.add(f1);
-
-        FunctionDesc f2 = new FunctionDesc();
-        f2.setExpression("COUNT_DISTINCT");
-        ParameterDesc p2 = new ParameterDesc();
-        p2.setType("column");
-        p2.setValue("SELLER_ID");
-        p2.setColRefs(ImmutableList.of(sellerCol));
-        f2.setParameter(p2);
-        f2.setReturnType("hllc(10)");
-        functions.add(f2);
-
-        return functions;
-    }
-
-    public static CompareTupleFilter buildTs2010Filter(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
-        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter1);
-        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01");
-        compareFilter.addChild(constantFilter1);
-        return compareFilter;
-    }
-
-    public static CompareTupleFilter buildTs2011Filter(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
-        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter1);
-        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01");
-        compareFilter.addChild(constantFilter1);
-        return compareFilter;
-    }
-
-    public static CompareTupleFilter buildFilter1(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
-        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter1);
-        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
-        compareFilter.addChild(constantFilter1);
-        return compareFilter;
-    }
-
-    public static CompareTupleFilter buildFilter2(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
-        ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter2);
-        ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories");
-        compareFilter.addChild(constantFilter2);
-        return compareFilter;
-    }
-
-    public static TupleFilter buildAndFilter(List<TblColRef> columns) {
-        CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
-        CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
-        LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
-        andFilter.addChild(compareFilter1);
-        andFilter.addChild(compareFilter2);
-        return andFilter;
-    }
-
-    public static TupleFilter buildOrFilter(List<TblColRef> columns) {
-        CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
-        CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
-        LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
-        logicFilter.addChild(compareFilter1);
-        logicFilter.addChild(compareFilter2);
-        return logicFilter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 60815c7..1a3efb7 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -124,6 +124,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-job</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-storage-hbase</artifactId>
             <type>test-jar</type>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index 4291d91..809cfb7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -63,8 +63,8 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
         CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
 
         cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+        flatTable = LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
     }
 
     @AfterClass
@@ -84,7 +84,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
 
         {
             Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
-            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
             future.get();
         }
     }
@@ -101,7 +101,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
 
         @Override
         public void close() {
-            
+
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
new file mode 100644
index 0000000..ab9ac63
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -0,0 +1,163 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(ITDoggedCubeBuilderTest.class);
+
+    private static final int INPUT_ROWS = 10000;
+    private static final int SPLIT_ROWS = 5000;
+    private static final int THREADS = 4;
+
+    private static CubeInstance cube;
+    private static String flatTable;
+    private static Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+    @BeforeClass
+    public static void before() throws IOException {
+        staticCreateTestMetadata();
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        flatTable = LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        long randSeed = System.currentTimeMillis();
+
+        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        doggedBuilder.setConcurrentThreads(THREADS);
+        doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
+        FileRecordWriter doggedResult = new FileRecordWriter();
+
+        {
+            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            doggedResult.close();
+        }
+
+        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        inmemBuilder.setConcurrentThreads(THREADS);
+        FileRecordWriter inmemResult = new FileRecordWriter();
+
+        {
+            Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            inmemResult.close();
+        }
+
+        fileCompare(doggedResult.file, inmemResult.file);
+        doggedResult.file.delete();
+        inmemResult.file.delete();
+    }
+
+    private void fileCompare(File file, File file2) throws IOException {
+        BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+        BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
+
+        String line1, line2;
+        do {
+            line1 = r1.readLine();
+            line2 = r2.readLine();
+
+            assertEquals(line1, line2);
+
+        } while (line1 != null || line2 != null);
+
+        r1.close();
+        r2.close();
+    }
+
+    class FileRecordWriter implements ICuboidWriter {
+
+        File file;
+        PrintWriter writer;
+
+        FileRecordWriter() throws IOException {
+            file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
+            writer = new PrintWriter(file, "UTF-8");
+        }
+
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            writer.print(cuboidId);
+            writer.print(", ");
+            writer.print(record.toString());
+            writer.println();
+        }
+
+        @Override
+        public void flush() {
+
+        }
+
+        @Override
+        public void close() {
+            writer.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
new file mode 100644
index 0000000..ad02f2a
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -0,0 +1,271 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(ITInMemCubeBuilderTest.class);
+
+    private CubeInstance cube;
+    private String flatTable;
+    private Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+    private int nInpRows;
+    private int nThreads;
+
+    @Before
+    public void before() throws IOException {
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testKylinCube() throws Exception {
+        testBuild("test_kylin_cube_without_slr_left_join_empty", //
+                LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv", 70000, 4);
+    }
+
+    @Test
+    public void testSSBCube() throws Exception {
+        testBuild("ssb", //
+                LOCALMETA_TEST_DATA + "/data/kylin_intermediate_ssb_19920101000000_19920201000000.csv", 1000, 1);
+    }
+
+    public void testBuild(String cubeName, String flatTable, int nInpRows, int nThreads) throws Exception {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        this.nInpRows = nInpRows;
+        this.nThreads = nThreads;
+
+        this.cube = cubeManager.getCube(cubeName);
+        this.flatTable = flatTable;
+        this.dictionaryMap = getDictionaryMap(cube, flatTable);
+
+        testBuildInner();
+    }
+
+    private void testBuildInner() throws Exception {
+
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        cubeBuilder.setConcurrentThreads(nThreads);
+
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        try {
+            // round 1
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, nInpRows);
+                future.get();
+            }
+
+            // round 2, zero input
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, 0);
+                future.get();
+            }
+
+            // round 3
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, nInpRows);
+                future.get();
+            }
+
+        } catch (Exception e) {
+            logger.error("stream build failed", e);
+            throw new IOException("Failed to build cube ", e);
+        }
+    }
+
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+        feedData(cube, flatTable, queue, count, 0);
+    }
+
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
+        int nColumns = flatTableDesc.getColumnList().size();
+
+        @SuppressWarnings("unchecked")
+        Set<String>[] distinctSets = new Set[nColumns];
+        for (int i = 0; i < nColumns; i++)
+            distinctSets[i] = new TreeSet<String>();
+
+        // get distinct values on each column
+        List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+        for (String line : lines) {
+            String[] row = line.trim().split(",");
+            assert row.length == nColumns;
+            for (int i = 0; i < nColumns; i++)
+                distinctSets[i].add(row[i]);
+        }
+
+        List<String[]> distincts = new ArrayList<String[]>();
+        for (int i = 0; i < nColumns; i++) {
+            distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
+        }
+
+        Random rand = new Random();
+        if (randSeed != 0)
+            rand.setSeed(randSeed);
+
+        // output with random data
+        for (; count > 0; count--) {
+            ArrayList<String> row = new ArrayList<String>(nColumns);
+            for (int i = 0; i < nColumns; i++) {
+                String[] candidates = distincts.get(i);
+                row.add(candidates[rand.nextInt(candidates.length)]);
+            }
+            queue.put(row);
+        }
+        queue.put(new ArrayList<String>(0));
+    }
+
+    static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+        CubeDesc desc = cube.getDescriptor();
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+        int nColumns = flatTableDesc.getColumnList().size();
+
+        List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
+        for (int c = 0; c < columns.size(); c++) {
+            TblColRef col = columns.get(c);
+            if (desc.getRowkey().isUseDictionary(col)) {
+                logger.info("Building dictionary for " + col);
+                List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+                Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+                result.put(col, dict);
+            }
+        }
+
+        for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) {
+            MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx);
+            FunctionDesc func = measureDesc.getFunction();
+            List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func);
+            if (dictCols.isEmpty())
+                continue;
+
+            int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
+            List<TblColRef> paramCols = func.getParameter().getColRefs();
+            for (int i = 0; i < paramCols.size(); i++) {
+                TblColRef col = paramCols.get(i);
+                if (dictCols.contains(col)) {
+                    int colIdxOnFlat = flatTableIdx[i];
+                    logger.info("Building dictionary for " + col);
+                    List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat);
+                    Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+
+                    result.put(col, dict);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+        List<byte[]> result = Lists.newArrayList();
+        List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+        for (String line : lines) {
+            String[] row = line.trim().split(",");
+            if (row.length != nColumns) {
+                throw new IllegalStateException();
+            }
+            if (row[c] != null) {
+                result.add(Bytes.toBytes(row[c]));
+            }
+        }
+        return result;
+    }
+
+    class ConsoleGTRecordWriter implements ICuboidWriter {
+
+        boolean verbose = false;
+
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            if (verbose)
+                System.out.println(record.toString());
+        }
+
+        @Override
+        public void flush() {
+            if (verbose) {
+                System.out.println("flush");
+            }
+        }
+
+        @Override
+        public void close() {
+            if (verbose) {
+                System.out.println("close");
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
new file mode 100644
index 0000000..ad1ddd3
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.job.impl.threadpool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.job.BaseTestExecutable;
+import org.apache.kylin.job.ErrorTestExecutable;
+import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.SelfStopExecutable;
+import org.apache.kylin.job.SucceedTestExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class ITDefaultSchedulerTest extends BaseSchedulerTest {
+
+    @Test
+    public void testSingleTaskJob() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        job.addTask(task1);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+    }
+
+    @Test
+    public void testSucceed() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        BaseTestExecutable task2 = new SucceedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testSucceedAndFailed() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        BaseTestExecutable task2 = new FailedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testSucceedAndError() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new ErrorTestExecutable();
+        BaseTestExecutable task2 = new SucceedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testDiscard() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SelfStopExecutable();
+        job.addTask(task1);
+        jobService.addJob(job);
+        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+        jobService.discardJob(job.getId());
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
+        Thread.sleep(5000);
+        System.out.println(job);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testSchedulerPool() throws InterruptedException {
+        ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
+        final CountDownLatch countDownLatch = new CountDownLatch(3);
+        ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                countDownLatch.countDown();
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
+        assertTrue("future should still running", future.cancel(true));
+
+        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
+        ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                countDownLatch2.countDown();
+                throw new RuntimeException();
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
+        assertFalse("future2 should has been stopped", future2.cancel(true));
+
+        final CountDownLatch countDownLatch3 = new CountDownLatch(3);
+        ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    countDownLatch3.countDown();
+                    throw new RuntimeException();
+                } catch (Exception e) {
+                }
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
+        assertTrue("future3 should still running", future3.cancel(true));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index c945485..e1cbe1f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -99,12 +99,12 @@ public class BuildCubeWithEngine {
             logger.info("Will not use fast build mode");
         }
 
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
 
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
 
         try {
             //check hdfs permission

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
index 5ab5e83..d862dbf 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
@@ -80,7 +80,7 @@ public class BuildCubeWithSpark {
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
@@ -89,7 +89,7 @@ public class BuildCubeWithSpark {
 
     @Before
     public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
 
         DeployUtil.initCliWorkDir();
         DeployUtil.deployMetadata();
@@ -117,7 +117,7 @@ public class BuildCubeWithSpark {
     @Test
     public void test() throws Exception {
         final CubeSegment segment = createSegment();
-        String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
+        String confPath = new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
         KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
         String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
         logger.info("confPath location:" + confPath);

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index eeff999..99da26f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -67,11 +67,11 @@ public class BuildCubeWithStream {
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
     }
 
     public void before() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
index 4b8ce24..643b122 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
@@ -92,7 +92,7 @@ public class BuildIIWithEngine {
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
@@ -100,7 +100,7 @@ public class BuildIIWithEngine {
 
     @Before
     public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
 
         //DeployUtil.initCliWorkDir();
         //        DeployUtil.deployMetadata();

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
index 9b7cd14..ace1a2f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -109,7 +109,7 @@ public class BuildIIWithStream {
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
     }
 
     public void before() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index ec3e60f..46aa68d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -19,11 +19,9 @@
 package org.apache.kylin.query;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
@@ -38,22 +36,18 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.logging.LogManager;
 
-import com.google.common.base.Throwables;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionContext;
 import org.apache.kylin.common.KylinConfig;
 import org.dbunit.Assertion;
 import org.dbunit.database.DatabaseConfig;
 import org.dbunit.database.DatabaseConnection;
 import org.dbunit.database.IDatabaseConnection;
 import org.dbunit.dataset.DataSetException;
-import org.dbunit.dataset.DefaultTable;
 import org.dbunit.dataset.ITable;
 import org.dbunit.dataset.SortedTable;
 import org.dbunit.dataset.datatype.DataType;
@@ -63,8 +57,6 @@ import org.dbunit.ext.h2.H2DataTypeFactory;
 import org.junit.Assert;
 
 import com.google.common.io.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  */
@@ -287,7 +279,6 @@ public class KylinTestBase {
         return ret;
     }
 
-
     protected void execQueryUsingH2(String queryFolder, boolean needSort) throws Exception {
         printInfo("---------- Running H2 queries: " + queryFolder);
 
@@ -349,7 +340,6 @@ public class KylinTestBase {
             h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
             ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort);
 
-
             try {
                 // compare the result
                 Assert.assertEquals(h2Table.getRowCount(), kylinTable.getRowCount());
@@ -427,7 +417,6 @@ public class KylinTestBase {
             h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
             ITable h2Table = executeDynamicQuery(h2Conn, queryName, sql, parameters, needSort);
 
-
             // compare the result
             Assertion.assertEquals(h2Table, kylinTable);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index 15e435e..136342d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -38,7 +38,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.StorageFactory;
-import org.apache.kylin.storage.cache.StorageMockUtils;
+import org.apache.kylin.storage.StorageMockUtils;
 import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.junit.After;
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index eb36eed..796b9c1 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -44,6 +44,13 @@
 
         <!-- Env & Test -->
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
index 70c11b3..83c50c0 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
@@ -24,18 +24,16 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * Created by dongli on 2/22/16.
- */
 public class HiveCmdBuilderTest {
 
     @Before
     public void setup() {
-        System.setProperty("KYLIN_CONF", "../examples/test_case_data/localmeta");
+        System.setProperty("KYLIN_CONF", LocalFileMetadataTestCase.LOCALMETA_TEST_DATA);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 6bbb0b7..1d3da28 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -56,6 +57,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -250,20 +252,21 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
     }
 
     @Override
-    public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException {
+    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
 
         final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
+
         logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
 
         short cuboidBaseShard = cubeSeg.getCuboidBaseShard(this.cuboid.getId());
         short shardNum = cubeSeg.getCuboidShardNum(this.cuboid.getId());
         int totalShards = cubeSeg.getTotalShards();
 
-        final List<ByteString> scanRequestByteStrings = Lists.newArrayList();
-        final List<ByteString> rawScanByteStrings = Lists.newArrayList();
+        ByteString scanRequestByteString = null;
+        ByteString rawScanByteString = null;
 
         // primary key (also the 0th column block) is always selected
-        final ImmutableBitSet selectedColBlocks = scanRequests.get(0).getSelectedColBlocks().set(0);
+        final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
 
         // globally shared connection, does not require close
         final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
@@ -274,65 +277,89 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
         }
 
-        for (GTScanRequest req : scanRequests) {
-            ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-            GTScanRequest.serializer.serialize(req, buffer);
-            buffer.flip();
-            scanRequestByteStrings.add(HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()));
+        //TODO: raw scan can be constructed at region side to reduce traffic
+        List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
+        int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
+                BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
+                for (RawScan rs : rawScans) {
+                    RawScan.serializer.serialize(rs, rawScanBuffer);
+                }
+                rawScanBuffer.flip();
+                rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
+                break;
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
+                rawScanBufferSize *= 4;
+            }
+        }
+        scanRequest.setGTScanRanges(Lists.<GTScanRange> newArrayList());//since raw scans are sent to coprocessor, we don't need to duplicate sending it
 
-            RawScan rawScan = preparedHBaseScan(req.getPkStart(), req.getPkEnd(), req.getFuzzyKeys(), selectedColBlocks);
+        int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
+                GTScanRequest.serializer.serialize(scanRequest, buffer);
+                buffer.flip();
+                scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
+                break;
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
+                scanRequestBufferSize *= 4;
+            }
+        }
 
-            ByteBuffer rawScanBuffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-            RawScan.serializer.serialize(rawScan, rawScanBuffer);
-            rawScanBuffer.flip();
-            rawScanByteStrings.add(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit()));
+        logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
-            logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", buffer.limit() - buffer.position(), rawScanBuffer.limit() - rawScanBuffer.position());
 
-            logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(req)), cubeSeg);
-            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+        logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg);
+        for (RawScan rs : rawScans) {
+            logScan(rs, cubeSeg.getStorageLocationIdentifier());
         }
 
-        logger.debug("Submitting rpc to {} shards starting from shard {}, scan requests count {}", new Object[] { shardNum, cuboidBaseShard, scanRequests.size() });
+        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", new Object[] { shardNum, cuboidBaseShard, rawScans.size() });
 
         final AtomicInteger totalScannedCount = new AtomicInteger(0);
-        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(scanRequests.size() * shardNum);
+        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
+        final String currentThreadName = Thread.currentThread().getName();
 
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
+            final ByteString finalScanRequestByteString = scanRequestByteString;
+            final ByteString finalRawScanByteString = rawScanByteString;
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
-                    for (int i = 0; i < scanRequests.size(); ++i) {
-                        CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
-                        builder.setGtScanRequest(scanRequestByteStrings.get(i)).setHbaseRawScan(rawScanByteStrings.get(i));
-                        for (IntList intList : hbaseColumnsToGTIntList) {
-                            builder.addHbaseColumnsToGT(intList);
-                        }
-                        builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
-                        builder.setBehavior(toggle);
+                    CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
+                    builder.setGtScanRequest(finalScanRequestByteString).setHbaseRawScan(finalRawScanByteString);
+                    for (IntList intList : hbaseColumnsToGTIntList) {
+                        builder.addHbaseColumnsToGT(intList);
+                    }
+                    builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
+                    builder.setBehavior(toggle);
+
+                    Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+                    try {
+                        results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
+                    } catch (Throwable throwable) {
+                        throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + "Error when visiting cubes by endpoint", throwable);
+                    }
 
-                        Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+                    for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
+                        totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
+                        logger.info("<sub-thread for GTScanRequest " +  Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
                         try {
-                            results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
-                        } catch (Throwable throwable) {
-                            throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + "Error when visiting cubes by endpoint", throwable);
-                        }
-
-                        for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
-                            totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
-                            logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + getStatsString(result));
-                            try {
-                                epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
-                            } catch (IOException | DataFormatException e) {
-                                throw new RuntimeException("Error when decompressing", e);
-                            }
+                            epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
+                        } catch (IOException | DataFormatException e) {
+                            throw new RuntimeException("Error when decompressing", e);
                         }
                     }
                 }
             });
         }
 
-        return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequests.get(0).getColumns(), totalScannedCount.get());
+        return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
     }
 
     private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 8563a5e..49e8593 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.storage.hbase.cube.v2;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -43,16 +42,15 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.cube.model.HBaseMappingDesc;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.IGTStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public abstract class CubeHBaseRPC {
+public abstract class CubeHBaseRPC implements IGTStorage {
 
     public static final Logger logger = LoggerFactory.getLogger(CubeHBaseRPC.class);
 
@@ -71,8 +69,6 @@ public abstract class CubeHBaseRPC {
         this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
     }
 
-    abstract IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException;
-
     public static Scan buildScan(RawScan rawScan) {
         Scan scan = new Scan();
         scan.setCaching(rawScan.hbaseCaching);
@@ -96,7 +92,7 @@ public abstract class CubeHBaseRPC {
         return scan;
     }
 
-    protected RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+    private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
         final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
 
         LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
@@ -123,46 +119,12 @@ public abstract class CubeHBaseRPC {
         return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize);
     }
 
-    protected List<RawScan> preparedHBaseScans(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
-        final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
-        List<RawScan> ret = Lists.newArrayList();
-
-        LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
-        byte[] start = encoder.createBuf();
-        byte[] end = encoder.createBuf();
-        List<byte[]> startKeys;
-        List<byte[]> endKeys;
-
-        encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
-        encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start);
-        startKeys = encoder.getRowKeysDifferentShards(start);
-
-        encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
-        encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end);
-        endKeys = encoder.getRowKeysDifferentShards(end);
-        endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() {
-            @Override
-            public byte[] apply(byte[] input) {
-                byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning
-                System.arraycopy(input, 0, shardEnd, 0, input.length);
-                return shardEnd;
-            }
-        });
-
-        Preconditions.checkState(startKeys.size() == endKeys.size());
-        List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys);
-
-        KylinConfig config = cubeSeg.getCubeDesc().getConfig();
-        int hbaseCaching = config.getHBaseScanCacheRows();
-        int hbaseMaxResultSize = config.getHBaseScanMaxResultSize();
-        if (isMemoryHungry(selectedColBlocks))
-            hbaseCaching /= 10;
-
-        for (short i = 0; i < startKeys.size(); ++i) {
-            ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize));
+    protected List<RawScan> preparedHBaseScans(List<GTScanRange> ranges, ImmutableBitSet selectedColBlocks) {
+        List<RawScan> allRawScans = Lists.newArrayList();
+        for (GTScanRange range : ranges) {
+            allRawScans.add(preparedHBaseScan(range.pkStart, range.pkEnd, range.fuzzyKeys, selectedColBlocks));
         }
-        return ret;
-
+        return allRawScans;
     }
 
     private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) {