You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/13 05:39:50 UTC

[1/8] kylin git commit: minor bug fix in removing ext filter from project

Repository: kylin
Updated Branches:
  refs/heads/master aaf3b870c -> 59cb57ca6


minor bug fix in removing ext filter from project


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b6c893db
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b6c893db
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b6c893db

Branch: refs/heads/master
Commit: b6c893dbeefead2523fc686055736526d4f3ca3e
Parents: b2a3861
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 16:01:45 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/metadata/MetadataManager.java | 12 ++++--------
 .../apache/kylin/metadata/project/ProjectInstance.java  |  9 +++++----
 2 files changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b6c893db/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index aa249fd..3391ef4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -143,7 +143,7 @@ public class MetadataManager {
         return Lists.newArrayList(srcTableMap.values());
     }
 
-    public List<ExternalFilterDesc> listAllExternalFilters(){
+    public List<ExternalFilterDesc> listAllExternalFilters() {
         return Lists.newArrayList(extFilterMap.values());
     }
 
@@ -217,27 +217,23 @@ public class MetadataManager {
     }
 
     public void saveExternalFilter(ExternalFilterDesc desc) throws IOException {
-        if(desc.getUuid() == null){
+        if (desc.getUuid() == null) {
             throw new IllegalArgumentException("UUID not set.");
         }
         String path = desc.getResourcePath();
-        getStore().putResource(path,desc,EXTERNAL_FILTER_DESC_SERIALIZER);
+        getStore().putResource(path, desc, EXTERNAL_FILTER_DESC_SERIALIZER);
         desc = reloadExternalFilterAt(path);
-        extFilterMap.put(desc.getName(),desc);
+        extFilterMap.put(desc.getName(), desc);
 
     }
 
     public void removeExternalFilter(String name) throws IOException {
-        if(name !=null ){
-            name = name.toLowerCase();
-        }
         String path = ExternalFilterDesc.concatResourcePath(name);
         getStore().deleteResource(path);
         extFilterMap.remove(name);
 
     }
 
-
     private void init(KylinConfig config) throws IOException {
         this.config = config;
         this.srcTableMap = new CaseInsensitiveStringCache<>(config, Broadcaster.TYPE.TABLE);

http://git-wip-us.apache.org/repos/asf/kylin/blob/b6c893db/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 9567da3..74f843f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -210,9 +210,13 @@ public class ProjectInstance extends RootPersistentEntity {
     public void removeTable(String tableName) {
         tables.remove(tableName.toUpperCase());
     }
+    
+    public void addExtFilter(String extFilterName){
+        this.getExtFilters().add(extFilterName);
+    }
 
     public void removeExtFilter(String filterName) {
-        extFilters.remove(filterName.toUpperCase());
+        extFilters.remove(filterName);
     }
 
     public int getTablesCount() {
@@ -227,9 +231,6 @@ public class ProjectInstance extends RootPersistentEntity {
         return tables;
     }
 
-    public void addExtFilter(String extFilterName){
-        this.getExtFilters().add(extFilterName.toLowerCase());
-    }
 
     public Set<String> getExtFilters(){
         return extFilters;


[5/8] kylin git commit: refactor

Posted by ma...@apache.org.
refactor


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b26b2489
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b26b2489
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b26b2489

Branch: refs/heads/master
Commit: b26b2489baddcfd148eaf8f17330878bbf349048
Parents: aaf3b87
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Apr 11 17:33:07 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/DeployLocalMetaToRemoteTest.java  |   4 +-
 .../java/org/apache/kylin/job/DeployUtil.java   |   4 +-
 .../org/apache/kylin/common/util/ByteArray.java |  11 +-
 .../kylin/common/util/Log4jConfigurer.java      |   2 +
 .../common/util/AbstractKylinTestCase.java      |  14 +-
 .../common/util/HBaseMetadataTestCase.java      |   4 +-
 .../common/util/LocalFileMetadataTestCase.java  |   8 +-
 .../org/apache/kylin/cube/cuboid/Cuboid.java    |  18 ++
 .../apache/kylin/cube/kv/LazyRowKeyEncoder.java |  25 --
 .../org/apache/kylin/cube/model/CubeDesc.java   |   5 +
 .../org/apache/kylin/gridtable/GTRecord.java    |  10 +-
 .../org/apache/kylin/gridtable/GTScanRange.java |   2 -
 .../kylin/gridtable/GTScanRangePlanner.java     | 294 ++++++++++++++++---
 .../apache/kylin/gridtable/GTScanRequest.java   |  68 +++--
 .../org/apache/kylin/gridtable/IGTStorage.java  |  27 ++
 .../apache/kylin/gridtable/ScannerWorker.java   |  72 +++++
 .../kylin/cube/AggregationGroupRuleTest.java    |  19 +-
 .../apache/kylin/cube/RowKeyAttrRuleTest.java   |   5 +-
 .../cube/inmemcubing/DoggedCubeBuilderTest.java | 160 ----------
 .../cube/inmemcubing/InMemCubeBuilderTest.java  | 268 -----------------
 .../kylin/gridtable/DictGridTableTest.java      |  58 ++--
 .../impl/threadpool/DefaultSchedulerTest.java   | 151 ----------
 .../filter/UDF/MassInValueProviderFactory.java  |   1 -
 .../apache/kylin/storage/StorageMockUtils.java  | 189 ++++++++++++
 .../kylin/storage/cache/StorageMockUtils.java   | 157 ----------
 kylin-it/pom.xml                                |   7 +
 .../ITDoggedCubeBuilderStressTest.java          |   8 +-
 .../inmemcubing/ITDoggedCubeBuilderTest.java    | 163 ++++++++++
 .../inmemcubing/ITInMemCubeBuilderTest.java     | 271 +++++++++++++++++
 .../impl/threadpool/ITDefaultSchedulerTest.java | 154 ++++++++++
 .../kylin/provision/BuildCubeWithEngine.java    |   4 +-
 .../kylin/provision/BuildCubeWithSpark.java     |   6 +-
 .../kylin/provision/BuildCubeWithStream.java    |   4 +-
 .../kylin/provision/BuildIIWithEngine.java      |   4 +-
 .../kylin/provision/BuildIIWithStream.java      |   2 +-
 .../org/apache/kylin/query/KylinTestBase.java   |  11 -
 .../kylin/storage/hbase/ITStorageTest.java      |   2 +-
 source-hive/pom.xml                             |   7 +
 .../kylin/source/hive/HiveCmdBuilderTest.java   |   6 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 111 ++++---
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |  56 +---
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |  94 ++++--
 .../hbase/cube/v2/CubeSegmentScanner.java       | 215 +-------------
 .../storage/hbase/cube/v2/CubeStorageQuery.java |  30 +-
 .../kylin/storage/hbase/cube/v2/RawScan.java    |  10 +
 .../coprocessor/endpoint/CubeVisitService.java  |  88 ++++--
 .../hbase/steps/SandboxMetastoreCLI.java        |   2 +-
 47 files changed, 1539 insertions(+), 1292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
index 1267ab7..8494607 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
@@ -45,7 +45,7 @@ public class DeployLocalMetaToRemoteTest {
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
@@ -53,7 +53,7 @@ public class DeployLocalMetaToRemoteTest {
 
     @Before
     public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
 
         DeployUtil.initCliWorkDir();
         DeployUtil.deployMetadata();

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 2f51475..513f546 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -34,7 +34,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.ResourceTool;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeUpdate;
@@ -66,7 +66,7 @@ public class DeployUtil {
     public static void deployMetadata() throws IOException {
         // install metadata to hbase
         ResourceTool.reset(config());
-        ResourceTool.copy(KylinConfig.createInstanceFromUri(AbstractKylinTestCase.LOCALMETA_TEST_DATA), config());
+        ResourceTool.copy(KylinConfig.createInstanceFromUri(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA), config());
 
         // update cube desc signature.
         for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
index 5e35257..d850f8e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
@@ -100,7 +100,12 @@ public class ByteArray implements Comparable<ByteArray>, Serializable {
     }
 
     public ByteArray copy() {
-        ByteArray copy = new ByteArray(length);
+        ByteArray copy;
+        if (data != null) {
+            copy = new ByteArray(length);
+        } else {
+            copy = new ByteArray(null);
+        }
         copy.copyFrom(this);
         return copy;
     }
@@ -116,7 +121,9 @@ public class ByteArray implements Comparable<ByteArray>, Serializable {
     }
 
     public void copyFrom(ByteArray other) {
-        System.arraycopy(other.array(), other.offset, data, offset, other.length);
+        if (other.data != null) {
+            System.arraycopy(other.array(), other.offset, data, offset, other.length);
+        }
         this.length = other.length;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java b/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java
index fe0c55a..696aaff 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java
@@ -21,6 +21,7 @@ package org.apache.kylin.common.util;
 import java.util.Enumeration;
 
 import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
@@ -35,6 +36,7 @@ public class Log4jConfigurer {
     public static void initLogger() {
         if (!INITIALIZED && !isConfigured()) {
             org.apache.log4j.BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(DEFAULT_PATTERN_LAYOUT)));
+            LogManager.getRootLogger().setLevel(Level.DEBUG);
         }
         INITIALIZED = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
index d517930..3a4b7bb 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
@@ -28,16 +28,14 @@ import org.apache.kylin.common.KylinConfig;
  */
 public abstract class AbstractKylinTestCase {
 
-    public static final String LOCALMETA_TEST_DATA = "../examples/test_case_data/localmeta";
 
-    public static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
-
-    public static final String[] SERVICES_WITH_CACHE = {//
-    "org.apache.kylin.cube.CubeManager",//
+    public static final String[] SERVICES_WITH_CACHE = { //
+            "org.apache.kylin.cube.CubeManager", //
             "org.apache.kylin.cube.CubeDescManager", //
-            "org.apache.kylin.invertedindex.IIDescManager",//
-            "org.apache.kylin.invertedindex.IIManager",//
-            "org.apache.kylin.storage.hybrid.HybridManager", "org.apache.kylin.metadata.realization.RealizationRegistry", //
+            "org.apache.kylin.invertedindex.IIDescManager", //
+            "org.apache.kylin.invertedindex.IIManager", //
+            "org.apache.kylin.storage.hybrid.HybridManager", //
+            "org.apache.kylin.metadata.realization.RealizationRegistry", //
             "org.apache.kylin.metadata.project.ProjectManager", //
             "org.apache.kylin.metadata.MetadataManager" //
     };

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
index e0be1c2..fdf066b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
@@ -26,9 +26,11 @@ import org.apache.kylin.common.KylinConfig;
  */
 public class HBaseMetadataTestCase extends AbstractKylinTestCase {
 
+    public static String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
+    
     static {
         try {
-            ClassUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath());
+            ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath());
         } catch (Exception e) {
             e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
index 70c849c..e2f2d8b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
@@ -27,9 +27,15 @@ import org.apache.kylin.common.persistence.ResourceStore;
 
 public class LocalFileMetadataTestCase extends AbstractKylinTestCase {
 
+    public static String LOCALMETA_TEST_DATA = "../examples/test_case_data/localmeta";
+
     @Override
     public void createTestMetadata() {
-        staticCreateTestMetadata();
+        staticCreateTestMetadata(getLocalMetaTestData());
+    }
+
+    protected String getLocalMetaTestData() {
+        return LOCALMETA_TEST_DATA;
     }
 
     public static void staticCreateTestMetadata() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 7f8d2b8..e8a75d7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang.StringUtils;
@@ -33,6 +34,7 @@ import org.apache.kylin.cube.model.AggregationGroup;
 import org.apache.kylin.cube.model.AggregationGroup.HierarchyMask;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Function;
@@ -52,6 +54,20 @@ public class Cuboid implements Comparable<Cuboid> {
         }
     };
 
+    public static Cuboid identifyCuboid(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
+        for (FunctionDesc metric : metrics) {
+            if (metric.getMeasureType().onlyAggrInBaseCuboid())
+                return Cuboid.getBaseCuboid(cubeDesc);
+        }
+
+        long cuboidID = 0;
+        for (TblColRef column : dimensions) {
+            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+            cuboidID |= 1L << index;
+        }
+        return Cuboid.findById(cubeDesc, cuboidID);
+    }
+
     public static Cuboid findById(CubeDesc cube, byte[] cuboidID) {
         return findById(cube, Bytes.toLong(cuboidID));
     }
@@ -397,6 +413,8 @@ public class Cuboid implements Comparable<Cuboid> {
         return cuboidToGridTableMapping;
     }
 
+   
+
     public static String getDisplayName(long cuboidID, int dimensionCount) {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < dimensionCount; ++i) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
index c93f65e..9c3d037 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
@@ -18,15 +18,9 @@
 
 package org.apache.kylin.cube.kv;
 
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * A LazyRowKeyEncoder will not try to calculate shard
  * It works for both enableSharding or non-enableSharding scenario
@@ -45,23 +39,4 @@ public class LazyRowKeyEncoder extends RowKeyEncoder {
         }
     }
 
-
-    //for non-sharding cases it will only return one byte[] with not shard at beginning
-    public List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
-        final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
-
-        if (!enableSharding) {
-            return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked
-        } else {
-            List<byte[]> ret = Lists.newArrayList();
-            for (short i = 0; i < cuboidShardNum; ++i) {
-                short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
-                byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length);
-                BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
-                ret.add(cookedKey);
-            }
-            return ret;
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 240cf52..65ba0a5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -900,6 +900,11 @@ public class CubeDesc extends RootPersistentEntity {
         }
     }
 
+    public TblColRef getColumnByBitIndex(int bitIndex) {
+        RowKeyColDesc[] rowKeyColumns = this.getRowkey().getRowKeyColumns();
+        return rowKeyColumns[rowKeyColumns.length - 1 - bitIndex].getColRef();
+    }
+
     public boolean hasMemoryHungryMeasures() {
         for (MeasureDesc measure : measures) {
             if (measure.getFunction().getMeasureType().isMemoryHungry()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index d09ab9a..bccd0c5 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -47,7 +47,6 @@ public class GTRecord implements Comparable<GTRecord> {
         }
         this.info = info;
         this.maskForEqualHashComp = maskForEqualHashComp;
-
     }
 
     public GTRecord(GTInfo info) {
@@ -58,6 +57,15 @@ public class GTRecord implements Comparable<GTRecord> {
         this(info, info.colAll, cols);
     }
 
+    public GTRecord(GTRecord other) {
+        this.info = other.info;
+        this.maskForEqualHashComp = other.maskForEqualHashComp;
+        this.cols = new ByteArray[info.getColumnCount()];
+        for (int i = 0; i < other.cols.length; i++) {
+            this.cols[i] = other.cols[i].copy();
+        } 
+    }
+
     public GTInfo getInfo() {
         return info;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
index 56f55f6..e1b38dc 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
@@ -29,8 +29,6 @@ public class GTScanRange {
     final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
     final public List<GTRecord> fuzzyKeys; // partial matching primary keys
 
-
-
     public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
         this(pkStart, pkEnd, null);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 2307aaf..3f9bac0 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -18,69 +18,166 @@
 
 package org.apache.kylin.gridtable;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.FuzzyValueCombination;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 public class GTScanRangePlanner {
 
     private static final Logger logger = LoggerFactory.getLogger(GTScanRangePlanner.class);
 
+    private static final int MAX_SCAN_RANGES = 200;
     private static final int MAX_HBASE_FUZZY_KEYS = 100;
 
-    final private GTInfo info;
-    final private Pair<ByteArray, ByteArray> segmentStartAndEnd;
-    final private TblColRef partitionColRef;
+    protected int maxScanRanges;
+
+    //non-GT
+    protected CubeSegment cubeSegment;
+    protected CubeDesc cubeDesc;
+    protected Cuboid cuboid;
+    protected TupleFilter filter;
+    protected Set<TblColRef> dimensions;
+    protected Set<TblColRef> groupbyDims;
+    protected Set<TblColRef> filterDims;
+    protected Collection<FunctionDesc> metrics;
+
+    //GT 
+    protected TupleFilter gtFilter;
+    protected GTInfo gtInfo;
+    protected Pair<ByteArray, ByteArray> gtStartAndEnd;
+    protected TblColRef gtPartitionCol;
+    protected ImmutableBitSet gtDimensions;
+    protected ImmutableBitSet gtAggrGroups;
+    protected ImmutableBitSet gtAggrMetrics;
+    protected String[] gtAggrFuncs;
+    final protected RecordComparator rangeStartComparator;
+    final protected RecordComparator rangeEndComparator;
+    final protected RecordComparator rangeStartEndComparator;
+
+    public GTScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupbyDims, //
+            Collection<FunctionDesc> metrics) {
+
+        this.maxScanRanges = MAX_SCAN_RANGES;
+
+        this.cubeSegment = cubeSegment;
+        this.cubeDesc = cubeSegment.getCubeDesc();
+        this.cuboid = cuboid;
+        this.dimensions = dimensions;
+        this.groupbyDims = groupbyDims;
+        this.filter = filter;
+        this.metrics = metrics;
+        this.filterDims = Sets.newHashSet();
+        TupleFilter.collectColumns(filter, this.filterDims);
+
+        this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId());
+        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+
+        IGTComparator comp = gtInfo.codeSystem.getComparator();
+        //start key GTRecord compare to start key GTRecord
+        this.rangeStartComparator = getRangeStartComparator(comp);
+        //stop key GTRecord compare to stop key GTRecord
+        this.rangeEndComparator = getRangeEndComparator(comp);
+        //start key GTRecord compare to stop key GTRecord
+        this.rangeStartEndComparator = getRangeStartEndComparator(comp);
+
+        //replace the constant values in filter to dictionary codes 
+        this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getCuboidDimensionsInGTOrder(), this.groupbyDims);
 
-    final private RecordComparator rangeStartComparator;
-    final private RecordComparator rangeEndComparator;
-    final private RecordComparator rangeStartEndComparator;
+        this.gtDimensions = makeGridTableColumns(mapping, dimensions);
+        this.gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groupbyDims, cubeSegment.getCubeDesc()));
+        this.gtAggrMetrics = makeGridTableColumns(mapping, metrics);
+        this.gtAggrFuncs = makeAggrFuncs(mapping, metrics);
+
+        if (cubeSegment.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) {
+            int index = mapping.getIndexOf(cubeSegment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef());
+            if (index >= 0) {
+                this.gtStartAndEnd = getSegmentStartAndEnd(index);
+                this.gtPartitionCol = gtInfo.colRef(index);
+            }
+        }
+
+    }
 
     /**
+     * constrcut GTScanRangePlanner with incomplete information. only be used for UT  
      * @param info
-     * @param segmentStartAndEnd in GT encoding
-     * @param partitionColRef the TblColRef in GT
+     * @param gtStartAndEnd
+     * @param gtPartitionCol
+     * @param gtFilter
      */
-    public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> segmentStartAndEnd, TblColRef partitionColRef) {
-
-        this.info = info;
-        this.segmentStartAndEnd = segmentStartAndEnd;
-        this.partitionColRef = partitionColRef;
+    public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
-        IGTComparator comp = info.codeSystem.getComparator();
+        this.maxScanRanges = MAX_SCAN_RANGES;
+        this.gtInfo = info;
 
+        IGTComparator comp = gtInfo.codeSystem.getComparator();
         //start key GTRecord compare to start key GTRecord
         this.rangeStartComparator = getRangeStartComparator(comp);
         //stop key GTRecord compare to stop key GTRecord
         this.rangeEndComparator = getRangeEndComparator(comp);
         //start key GTRecord compare to stop key GTRecord
         this.rangeStartEndComparator = getRangeStartEndComparator(comp);
-    }
 
-    // return empty list meaning filter is always false
-    public List<GTScanRange> planScanRanges(TupleFilter filter) {
-        return planScanRanges(filter, Integer.MAX_VALUE);
+
+        this.gtFilter = gtFilter;
+        this.gtStartAndEnd = gtStartAndEnd;
+        this.gtPartitionCol = gtPartitionCol;
     }
+    
 
-    // return empty list meaning filter is always false
-    public List<GTScanRange> planScanRanges(TupleFilter filter, int maxRanges) {
+    public GTScanRequest planScanRequest(boolean allowPreAggregate) {
+        GTScanRequest scanRequest;
+        List<GTScanRange> scanRanges = this.planScanRanges();
+        if (scanRanges != null && scanRanges.size() != 0) {
+            scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+        } else {
+            scanRequest = null;
+        }
+        return scanRequest;
+    }
 
-        TupleFilter flatFilter = flattenToOrAndFilter(filter);
+    /**
+     * Overwrite this method to provide smarter storage visit plans
+     * @return
+     */
+    public List<GTScanRange> planScanRanges() {
+        TupleFilter flatFilter = flattenToOrAndFilter(gtFilter);
 
         List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
 
@@ -92,11 +189,108 @@ public class GTScanRangePlanner {
         }
 
         List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
-        mergedRanges = mergeTooManyRanges(mergedRanges, maxRanges);
+        mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges);
 
         return mergedRanges;
     }
 
+    private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) {
+        ByteArray start;
+        if (cubeSegment.getDateRangeStart() != Long.MIN_VALUE) {
+            start = encodeTime(cubeSegment.getDateRangeStart(), index, 1);
+        } else {
+            start = new ByteArray();
+        }
+
+        ByteArray end;
+        if (cubeSegment.getDateRangeEnd() != Long.MAX_VALUE) {
+            end = encodeTime(cubeSegment.getDateRangeEnd(), index, -1);
+        } else {
+            end = new ByteArray();
+        }
+        return Pair.newPair(start, end);
+
+    }
+
+    private ByteArray encodeTime(long ts, int index, int roundingFlag) {
+        String value;
+        DataType partitionColType = gtInfo.getColumnType(index);
+        if (partitionColType.isDate()) {
+            value = DateFormat.formatToDateStr(ts);
+        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+            value = DateFormat.formatToTimeWithoutMilliStr(ts);
+        } else if (partitionColType.isStringFamily()) {
+            String partitionDateFormat = cubeSegment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+            if (StringUtils.isEmpty(partitionDateFormat))
+                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+            value = DateFormat.formatToDateStr(ts, partitionDateFormat);
+        } else {
+            throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+        }
+
+        ByteBuffer buffer = ByteBuffer.allocate(gtInfo.getMaxColumnLength());
+        gtInfo.getCodeSystem().encodeColumnValue(index, value, roundingFlag, buffer);
+
+        return ByteArray.copyOf(buffer.array(), 0, buffer.position());
+    }
+
+    private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
+        Set<TblColRef> ret = Sets.newHashSet();
+        for (TblColRef col : input) {
+            if (cubeDesc.hasHostColumn(col)) {
+                for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
+                    ret.add(host);
+                }
+            } else {
+                ret.add(col);
+            }
+        }
+        return ret;
+    }
+
+    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
+        BitSet result = new BitSet();
+        for (TblColRef dim : dimensions) {
+            int idx = mapping.getIndexOf(dim);
+            if (idx >= 0)
+                result.set(idx);
+        }
+        return new ImmutableBitSet(result);
+    }
+
+    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+        BitSet result = new BitSet();
+        for (FunctionDesc metric : metrics) {
+            int idx = mapping.getIndexOf(metric);
+            if (idx < 0)
+                throw new IllegalStateException(metric + " not found in " + mapping);
+            result.set(idx);
+        }
+        return new ImmutableBitSet(result);
+    }
+
+    private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+
+        //metrics are represented in ImmutableBitSet, which loses order information
+        //sort the aggrFuns to align with metrics natural order 
+        List<FunctionDesc> metricList = Lists.newArrayList(metrics);
+        Collections.sort(metricList, new Comparator<FunctionDesc>() {
+            @Override
+            public int compare(FunctionDesc o1, FunctionDesc o2) {
+                int a = mapping.getIndexOf(o1);
+                int b = mapping.getIndexOf(o2);
+                return a - b;
+            }
+        });
+
+        String[] result = new String[metricList.size()];
+        int i = 0;
+        for (FunctionDesc metric : metricList) {
+            result[i++] = metric.getExpression();
+        }
+        return result;
+    }
+
     private String makeReadable(ByteArray byteArray) {
         if (byteArray == null) {
             return null;
@@ -105,29 +299,29 @@ public class GTScanRangePlanner {
         }
     }
 
-    private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
-        GTRecord pkStart = new GTRecord(info);
-        GTRecord pkEnd = new GTRecord(info);
+    protected GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
+        GTRecord pkStart = new GTRecord(gtInfo);
+        GTRecord pkEnd = new GTRecord(gtInfo);
         Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
 
         List<GTRecord> fuzzyKeys;
 
         for (ColumnRange range : andDimRanges) {
-            if (partitionColRef != null && range.column.equals(partitionColRef)) {
-                if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 //
-                        && (rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0 //
-                        || rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) == 0 //
-                        && (range.op == FilterOperatorEnum.EQ || range.op == FilterOperatorEnum.LTE || range.op == FilterOperatorEnum.GTE || range.op == FilterOperatorEnum.IN))) {
+            if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
+                if (rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end) <= 0 //
+                        && (rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()) < 0 //
+                                || rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()) == 0 //
+                                        && (range.op == FilterOperatorEnum.EQ || range.op == FilterOperatorEnum.LTE || range.op == FilterOperatorEnum.GTE || range.op == FilterOperatorEnum.IN))) {
                     //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <= when has equals in condition. 
                 } else {
-                    logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}",//
-                            new Object[] { partitionColRef, makeReadable(segmentStartAndEnd.getFirst()), makeReadable(segmentStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) });
+                    logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
+                            new Object[] { gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) });
                     return null;
                 }
             }
 
             int col = range.column.getColumnDesc().getZeroBasedIndex();
-            if (!info.primaryKey.get(col))
+            if (!gtInfo.primaryKey.get(col))
                 continue;
 
             pkStart.set(col, range.begin);
@@ -139,7 +333,6 @@ public class GTScanRangePlanner {
         }
 
         fuzzyKeys = buildFuzzyKeys(fuzzyValues);
-
         return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
     }
 
@@ -154,17 +347,16 @@ public class GTScanRangePlanner {
             logger.info("The execution of this query will not use fuzzy key");
             return result;
         }
-        
 
         List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS);
 
         for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
 
-            BitSet bitSet = new BitSet(info.getColumnCount());
+            BitSet bitSet = new BitSet(gtInfo.getColumnCount());
             for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
                 bitSet.set(entry.getKey());
             }
-            GTRecord fuzzy = new GTRecord(info, new ImmutableBitSet(bitSet));
+            GTRecord fuzzy = new GTRecord(gtInfo, new ImmutableBitSet(bitSet));
             for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
                 fuzzy.set(entry.getKey(), entry.getValue());
             }
@@ -174,7 +366,7 @@ public class GTScanRangePlanner {
         return result;
     }
 
-    private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
+    protected TupleFilter flattenToOrAndFilter(TupleFilter filter) {
         if (filter == null)
             return null;
 
@@ -193,7 +385,7 @@ public class GTScanRangePlanner {
         return flatFilter;
     }
 
-    private List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) {
+    protected List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) {
         List<Collection<ColumnRange>> result = Lists.newArrayList();
 
         if (flatFilter == null) {
@@ -272,7 +464,7 @@ public class GTScanRangePlanner {
         return orAndRanges;
     }
 
-    private List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
+    protected List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
         if (ranges.size() <= 1) {
             return ranges;
         }
@@ -339,7 +531,7 @@ public class GTScanRangePlanner {
         return new GTScanRange(start, end, newFuzzyKeys);
     }
 
-    private List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
+    protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
         if (ranges.size() <= maxRanges) {
             return ranges;
         }
@@ -351,7 +543,15 @@ public class GTScanRangePlanner {
         return result;
     }
 
-    private class ColumnRange {
+    public int getMaxScanRanges() {
+        return maxScanRanges;
+    }
+
+    public void setMaxScanRanges(int maxScanRanges) {
+        this.maxScanRanges = maxScanRanges;
+    }
+
+    protected class ColumnRange {
         private TblColRef column;
         private ByteArray begin = ByteArray.EMPTY;
         private ByteArray end = ByteArray.EMPTY;
@@ -412,7 +612,7 @@ public class GTScanRangePlanner {
             if (valueSet != null) {
                 return valueSet.isEmpty();
             } else if (begin.array() != null && end.array() != null) {
-                return info.codeSystem.getComparator().compare(begin, end) > 0;
+                return gtInfo.codeSystem.getComparator().compare(begin, end) > 0;
             } else {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 1edfb36..c4abb57 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -37,7 +37,7 @@ import com.google.common.collect.Sets;
 public class GTScanRequest {
 
     private GTInfo info;
-    private GTScanRange range;
+    private List<GTScanRange> ranges;
     private ImmutableBitSet columns;
     private transient ImmutableBitSet selectedColBlocks;
 
@@ -53,18 +53,26 @@ public class GTScanRequest {
     private boolean allowPreAggregation = true;
     private double aggrCacheGB = 0; // no limit
 
-    public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet columns, TupleFilter filterPushDown) {
+    public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) {
         this.info = info;
-        this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
+        if (ranges == null) {
+            this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
+        } else {
+            this.ranges = ranges;
+        }
         this.columns = columns;
         this.filterPushDown = filterPushDown;
         validate(info);
     }
 
-    public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+    public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
             ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) {
         this.info = info;
-        this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
+        if (ranges == null) {
+            this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
+        } else {
+            this.ranges = ranges;
+        }
         this.columns = dimensions;
         this.filterPushDown = filterPushDown;
 
@@ -137,7 +145,7 @@ public class GTScanRequest {
     /**
      * doFilter,doAggr,doMemCheck are only for profiling use.
      * in normal cases they are all true.
-     * 
+     * <p/>
      * Refer to CoprocessorBehavior for explanation
      */
     public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException {
@@ -195,16 +203,12 @@ public class GTScanRequest {
         return info;
     }
 
-    public GTRecord getPkStart() {
-        return range.pkStart;
+    public List<GTScanRange> getGTScanRanges() {
+        return ranges;
     }
 
-    public GTRecord getPkEnd() {
-        return range.pkEnd;
-    }
-
-    public List<GTRecord> getFuzzyKeys() {
-        return range.fuzzyKeys;
+    public void setGTScanRanges(List<GTScanRange> ranges) {
+        this.ranges = ranges;
     }
 
     public ImmutableBitSet getSelectedColBlocks() {
@@ -241,7 +245,7 @@ public class GTScanRequest {
 
     @Override
     public String toString() {
-        return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
+        return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
     }
 
     public static final BytesSerializer<GTScanRequest> serializer = new BytesSerializer<GTScanRequest>() {
@@ -249,11 +253,14 @@ public class GTScanRequest {
         public void serialize(GTScanRequest value, ByteBuffer out) {
             GTInfo.serializer.serialize(value.info, out);
 
-            serializeGTRecord(value.range.pkStart, out);
-            serializeGTRecord(value.range.pkEnd, out);
-            BytesUtil.writeVInt(value.range.fuzzyKeys.size(), out);
-            for (GTRecord f : value.range.fuzzyKeys) {
-                serializeGTRecord(f, out);
+            BytesUtil.writeVInt(value.ranges.size(), out);
+            for (GTScanRange range : value.ranges) {
+                serializeGTRecord(range.pkStart, out);
+                serializeGTRecord(range.pkEnd, out);
+                BytesUtil.writeVInt(range.fuzzyKeys.size(), out);
+                for (GTRecord f : range.fuzzyKeys) {
+                    serializeGTRecord(f, out);
+                }
             }
 
             ImmutableBitSet.serializer.serialize(value.columns, out);
@@ -270,14 +277,19 @@ public class GTScanRequest {
         public GTScanRequest deserialize(ByteBuffer in) {
             GTInfo sInfo = GTInfo.serializer.deserialize(in);
 
-            GTRecord sPkStart = deserializeGTRecord(in, sInfo);
-            GTRecord sPkEnd = deserializeGTRecord(in, sInfo);
-            List<GTRecord> sFuzzyKeys = Lists.newArrayList();
-            int sFuzzyKeySize = BytesUtil.readVInt(in);
-            for (int i = 0; i < sFuzzyKeySize; i++) {
-                sFuzzyKeys.add(deserializeGTRecord(in, sInfo));
+            List<GTScanRange> sRanges = Lists.newArrayList();
+            int sRangesCount = BytesUtil.readVInt(in);
+            for (int rangeIdx = 0; rangeIdx < sRangesCount; rangeIdx++) {
+                GTRecord sPkStart = deserializeGTRecord(in, sInfo);
+                GTRecord sPkEnd = deserializeGTRecord(in, sInfo);
+                List<GTRecord> sFuzzyKeys = Lists.newArrayList();
+                int sFuzzyKeySize = BytesUtil.readVInt(in);
+                for (int i = 0; i < sFuzzyKeySize; i++) {
+                    sFuzzyKeys.add(deserializeGTRecord(in, sInfo));
+                }
+                GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys);
+                sRanges.add(sRange);
             }
-            GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys);
 
             ImmutableBitSet sColumns = ImmutableBitSet.serializer.deserialize(in);
             TupleFilter sGTFilter = GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo);
@@ -288,7 +300,7 @@ public class GTScanRequest {
             boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
             double sAggrCacheGB = in.getDouble();
 
-            return new GTScanRequest(sInfo, sRange, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
+            return new GTScanRequest(sInfo, sRanges, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
         }
 
         private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java
new file mode 100644
index 0000000..ff95743
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java
@@ -0,0 +1,27 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+
+public interface IGTStorage {
+    IGTScanner getGTScanner(final GTScanRequest scanRequests) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
new file mode 100644
index 0000000..75fa94b
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScannerWorker {
+
+    private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
+    private IGTScanner internal = null;
+
+    public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest) {
+        if (scanRequest == null) {
+            logger.info("Segment {} will be skipped", cubeSeg);
+            internal = new EmptyGTScanner(0);
+            return;
+        }
+
+        final GTInfo info = scanRequest.getInfo();
+
+        try {
+            IGTStorage rpc;
+            if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
+                rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // for local debug
+            } else {
+                rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior
+            }
+            internal = rpc.getGTScanner(scanRequest);
+        } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
+            throw new RuntimeException("error", e);
+        }
+    }
+
+    public Iterator<GTRecord> iterator() {
+        return internal.iterator();
+    }
+
+    public void close() throws IOException {
+        internal.close();
+    }
+
+    public int getScannedRowCount() {
+        return internal.getScannedRowCount();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
index 53952c4..6c3d544 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ValidateContext;
@@ -39,7 +40,7 @@ public class AggregationGroupRuleTest {
     public void testGoodDesc() throws IOException {
         AggregationGroupRule rule = getAggregationGroupRule();
 
-        for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) {
+        for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) {
             CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class);
             ValidateContext vContext = new ValidateContext();
             rule.validate(desc, vContext);
@@ -57,7 +58,7 @@ public class AggregationGroupRuleTest {
             }
         };
 
-        for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) {
+        for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) {
             System.out.println(f.getName());
             CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class);
             ValidateContext vContext = new ValidateContext();
@@ -72,9 +73,9 @@ public class AggregationGroupRuleTest {
     public void testGoodDesc2() throws IOException {
 
         ValidateContext vContext = new ValidateContext();
-        CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
-        desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] {//
-        new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "CATEG_LVL2_NAME" } };
+        CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
+        desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] { //
+                new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "CATEG_LVL2_NAME" } };
 
         IValidatorRule<CubeDesc> rule = getAggregationGroupRule();
         rule.validate(desc, vContext);
@@ -86,7 +87,7 @@ public class AggregationGroupRuleTest {
     public void testBadDesc1() throws IOException {
 
         ValidateContext vContext = new ValidateContext();
-        CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
+        CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
         String[] temp = Arrays.asList(desc.getAggregationGroups().get(0).getIncludes()).subList(0, 3).toArray(new String[3]);
 
         desc.getAggregationGroups().get(0).setIncludes(temp);
@@ -103,9 +104,9 @@ public class AggregationGroupRuleTest {
     public void testBadDesc2() throws IOException {
 
         ValidateContext vContext = new ValidateContext();
-        CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
-        desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] {//
-        new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "META_CATEG_NAME", "CATEG_LVL2_NAME" } };
+        CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
+        desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] { //
+                new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "META_CATEG_NAME", "CATEG_LVL2_NAME" } };
 
         IValidatorRule<CubeDesc> rule = getAggregationGroupRule();
         rule.validate(desc, vContext);

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java
index b07d360..636102c 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java
@@ -25,6 +25,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ValidateContext;
@@ -35,7 +36,7 @@ public class RowKeyAttrRuleTest {
 
     @Test
     public void testGoodDesc() throws IOException {
-        for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) {
+        for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) {
             CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class);
             ValidateContext vContext = new ValidateContext();
             IValidatorRule<CubeDesc> rule = new RowKeyAttrRule();
@@ -48,7 +49,7 @@ public class RowKeyAttrRuleTest {
     @Test
     public void testBadDesc() throws IOException {
         ValidateContext vContext = new ValidateContext();
-        CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
+        CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
         desc.getRowkey().getRowKeyColumns()[2].setColumn("");
         IValidatorRule<CubeDesc> rule = new RowKeyAttrRule();
         rule.validate(desc, vContext);

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
deleted file mode 100644
index 80e3df1..0000000
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.cube.inmemcubing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dimension.Dictionary;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
-
-    @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class);
-
-    private static final int INPUT_ROWS = 10000;
-    private static final int SPLIT_ROWS = 5000;
-    private static final int THREADS = 4;
-
-    private static CubeInstance cube;
-    private static String flatTable;
-    private static Map<TblColRef, Dictionary<String>> dictionaryMap;
-
-    @BeforeClass
-    public static void before() throws IOException {
-        staticCreateTestMetadata();
-
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
-        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
-    }
-
-    @AfterClass
-    public static void after() throws Exception {
-        staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        long randSeed = System.currentTimeMillis();
-
-        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        doggedBuilder.setConcurrentThreads(THREADS);
-        doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
-        FileRecordWriter doggedResult = new FileRecordWriter();
-
-        {
-            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
-            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
-            future.get();
-            doggedResult.close();
-        }
-
-        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        inmemBuilder.setConcurrentThreads(THREADS);
-        FileRecordWriter inmemResult = new FileRecordWriter();
-
-        {
-            Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
-            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
-            future.get();
-            inmemResult.close();
-        }
-
-        fileCompare(doggedResult.file, inmemResult.file);
-        doggedResult.file.delete();
-        inmemResult.file.delete();
-    }
-
-    private void fileCompare(File file, File file2) throws IOException {
-        BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
-        BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
-
-        String line1, line2;
-        do {
-            line1 = r1.readLine();
-            line2 = r2.readLine();
-
-            assertEquals(line1, line2);
-
-        } while (line1 != null || line2 != null);
-
-        r1.close();
-        r2.close();
-    }
-
-    class FileRecordWriter implements ICuboidWriter {
-
-        File file;
-        PrintWriter writer;
-
-        FileRecordWriter() throws IOException {
-            file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
-            writer = new PrintWriter(file, "UTF-8");
-        }
-
-        @Override
-        public void write(long cuboidId, GTRecord record) throws IOException {
-            writer.print(cuboidId);
-            writer.print(", ");
-            writer.print(record.toString());
-            writer.println();
-        }
-
-        @Override
-        public void flush() {
-
-        }
-
-        @Override
-        public void close() {
-            writer.close();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
deleted file mode 100644
index 88573c6..0000000
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.cube.inmemcubing;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-import org.apache.kylin.dimension.Dictionary;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
-
-    private CubeInstance cube;
-    private String flatTable;
-    private Map<TblColRef, Dictionary<String>> dictionaryMap;
-
-    private int nInpRows;
-    private int nThreads;
-
-    @Before
-    public void before() throws IOException {
-        createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    @Test
-    public void testKylinCube() throws Exception {
-        testBuild("test_kylin_cube_without_slr_left_join_empty", //
-                "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv", 70000, 4);
-    }
-
-    @Test
-    public void testSSBCube() throws Exception {
-        testBuild("ssb", //
-                "../examples/test_case_data/localmeta/data/kylin_intermediate_ssb_19920101000000_19920201000000.csv", 1000, 1);
-    }
-
-    public void testBuild(String cubeName, String flatTable, int nInpRows, int nThreads) throws Exception {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
-        this.nInpRows = nInpRows;
-        this.nThreads = nThreads;
-
-        this.cube = cubeManager.getCube(cubeName);
-        this.flatTable = flatTable;
-        this.dictionaryMap = getDictionaryMap(cube, flatTable);
-
-        testBuildInner();
-    }
-
-    private void testBuildInner() throws Exception {
-
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        cubeBuilder.setConcurrentThreads(nThreads);
-
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-
-        try {
-            // round 1
-            {
-                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
-                feedData(cube, flatTable, queue, nInpRows);
-                future.get();
-            }
-
-            // round 2, zero input
-            {
-                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
-                feedData(cube, flatTable, queue, 0);
-                future.get();
-            }
-
-            // round 3
-            {
-                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
-                feedData(cube, flatTable, queue, nInpRows);
-                future.get();
-            }
-
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            throw new IOException("Failed to build cube ", e);
-        }
-    }
-
-    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
-        feedData(cube, flatTable, queue, count, 0);
-    }
-
-    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
-        int nColumns = flatTableDesc.getColumnList().size();
-
-        @SuppressWarnings("unchecked")
-        Set<String>[] distinctSets = new Set[nColumns];
-        for (int i = 0; i < nColumns; i++)
-            distinctSets[i] = new TreeSet<String>();
-
-        // get distinct values on each column
-        List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
-        for (String line : lines) {
-            String[] row = line.trim().split(",");
-            assert row.length == nColumns;
-            for (int i = 0; i < nColumns; i++)
-                distinctSets[i].add(row[i]);
-        }
-
-        List<String[]> distincts = new ArrayList<String[]>();
-        for (int i = 0; i < nColumns; i++) {
-            distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
-        }
-
-        Random rand = new Random();
-        if (randSeed != 0)
-            rand.setSeed(randSeed);
-
-        // output with random data
-        for (; count > 0; count--) {
-            ArrayList<String> row = new ArrayList<String>(nColumns);
-            for (int i = 0; i < nColumns; i++) {
-                String[] candidates = distincts.get(i);
-                row.add(candidates[rand.nextInt(candidates.length)]);
-            }
-            queue.put(row);
-        }
-        queue.put(new ArrayList<String>(0));
-    }
-
-    static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
-        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-        CubeDesc desc = cube.getDescriptor();
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
-        int nColumns = flatTableDesc.getColumnList().size();
-
-        List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
-        for (int c = 0; c < columns.size(); c++) {
-            TblColRef col = columns.get(c);
-            if (desc.getRowkey().isUseDictionary(col)) {
-                logger.info("Building dictionary for " + col);
-                List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
-                Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
-                result.put(col, dict);
-            }
-        }
-
-        for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) {
-            MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx);
-            FunctionDesc func = measureDesc.getFunction();
-            List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func);
-            if (dictCols.isEmpty())
-                continue;
-
-            int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
-            List<TblColRef> paramCols = func.getParameter().getColRefs();
-            for (int i = 0; i < paramCols.size(); i++) {
-                TblColRef col = paramCols.get(i);
-                if (dictCols.contains(col)) {
-                    int colIdxOnFlat = flatTableIdx[i];
-                    logger.info("Building dictionary for " + col);
-                    List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat);
-                    Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
-
-                    result.put(col, dict);
-                }
-            }
-        }
-
-        return result;
-    }
-
-    private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
-        List<byte[]> result = Lists.newArrayList();
-        List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
-        for (String line : lines) {
-            String[] row = line.trim().split(",");
-            if (row.length != nColumns) {
-                throw new IllegalStateException();
-            }
-            if (row[c] != null) {
-                result.add(Bytes.toBytes(row[c]));
-            }
-        }
-        return result;
-    }
-
-    class ConsoleGTRecordWriter implements ICuboidWriter {
-
-        boolean verbose = false;
-
-        @Override
-        public void write(long cuboidId, GTRecord record) throws IOException {
-            if (verbose)
-                System.out.println(record.toString());
-        }
-
-        @Override
-        public void flush() {
-            if (verbose) {
-                System.out.println("flush");
-            }
-        }
-
-        @Override
-        public void close() {
-            if (verbose) {
-                System.out.println("close");
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index b1b5ee9..b411e96 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -101,11 +101,11 @@ public class DictGridTableTest {
         ByteArray segmentEnd = enc(info, 0, "2015-01-15");
         assertEquals(segmentStart, segmentStartX);
 
-        GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0));
 
         {
             LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());//scan range are [close,close]
             assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
             assertEquals(1, r.get(0).fuzzyKeys.size());
@@ -113,29 +113,34 @@ public class DictGridTableTest {
         }
         {
             LogicalTupleFilter filter = and(timeComp2, ageComp1);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp4, ageComp1);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
         {
             LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1));
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
             assertEquals("[[10], [1421193600000, 10]]", r.get(0).fuzzyKeys.toString());
         }
         {
             LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
             assertEquals(0, r.get(0).fuzzyKeys.size());
@@ -143,20 +148,23 @@ public class DictGridTableTest {
         {
             //skip FALSE filter
             LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
         {
             //TRUE or FALSE filter
             LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[null, null]-[null, null]", r.get(0).toString());
         }
         {
             //TRUE or other filter
             LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[null, null]-[null, null]", r.get(0).toString());
         }
@@ -165,11 +173,11 @@ public class DictGridTableTest {
     @Test
     public void verifySegmentSkipping2() {
         ByteArray segmentEnd = enc(info, 0, "2015-01-15");
-        GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0));
 
         {
             LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());//scan range are [close,close]
             assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
             assertEquals(1, r.get(0).fuzzyKeys.size());
@@ -178,7 +186,8 @@ public class DictGridTableTest {
 
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0),filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());//scan range are [close,close]
         }
     }
@@ -186,12 +195,12 @@ public class DictGridTableTest {
     @Test
     public void verifyScanRangePlanner() {
 
-        GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null);
 
         // flatten or-and & hbase fuzzy value
         {
             LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
             assertEquals("[[10], [20]]", r.get(0).fuzzyKeys.toString());
@@ -200,33 +209,38 @@ public class DictGridTableTest {
         // pre-evaluate ever false
         {
             LogicalTupleFilter filter = and(timeComp1, timeComp2);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
 
         // pre-evaluate ever true
         {
             LogicalTupleFilter filter = or(timeComp1, ageComp4);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[null, null]-[null, null]]", r.toString());
         }
 
         // merge overlap range
         {
             LogicalTupleFilter filter = or(timeComp1, timeComp3);
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[null, null]-[null, null]]", r.toString());
         }
 
         // merge too many ranges
         {
             LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3));
-            List<GTScanRange> r = planner.planScanRanges(filter);
+            GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+            List<GTScanRange> r = planner.planScanRanges();
             assertEquals(3, r.size());
             assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
             assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
             assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
-            List<GTScanRange> r2 = planner.planScanRanges(filter, 2);
+            planner.setMaxScanRanges(2);
+            List<GTScanRange> r2 = planner.planScanRanges();
             assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
         }
     }
@@ -269,7 +283,7 @@ public class DictGridTableTest {
         GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0);
 
         // note the unEvaluatable column 1 in filter is added to group by
-        assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+        assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
 
         doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
     }
@@ -284,7 +298,7 @@ public class DictGridTableTest {
 
         GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0);
         // note the evaluatable column 1 in filter is added to returned columns but not in group by
-        assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+        assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
 
         doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
     }


[2/8] kylin git commit: KYLIN-1579 revise logging

Posted by ma...@apache.org.
KYLIN-1579 revise logging


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b2a38612
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b2a38612
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b2a38612

Branch: refs/heads/master
Commit: b2a386126e075ac6cc2d15d0c1a42d720abed06a
Parents: 71d3f30
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 15:27:45 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800

----------------------------------------------------------------------
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 23 +++++++++++++-------
 1 file changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b2a38612/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 38041b3..81d5baa 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -342,28 +342,35 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     builder.setBehavior(toggle);
                     builder.setStartTime(System.currentTimeMillis());
                     builder.setTimeout(epResultItr.getTimeout());
+                    String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
 
                     Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
                     try {
                         results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
                     } catch (Throwable throwable) {
-                        throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + "Error when visiting cubes by endpoint", throwable);
+                        throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
                     }
 
+                    boolean abnormalFinish = false;
                     for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
                         totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
-                        logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
+                        logger.info(logHeader + getStatsString(result));
 
                         if (result.getValue().getStats().getNormalComplete() != 1) {
-                            throw new RuntimeException("The coprocessor thread stopped itself due to scan timeout.");
+                            abnormalFinish = true;
                         }
-
-                        try {
-                            epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
-                        } catch (IOException | DataFormatException e) {
-                            throw new RuntimeException("Error when decompressing", e);
+                        else {
+                            try {
+                                epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
+                            } catch (IOException | DataFormatException e) {
+                                throw new RuntimeException(logHeader + "Error when decompressing", e);
+                            }
                         }
                     }
+
+                    if (abnormalFinish) {
+                        throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
+                    }
                 }
             });
         }


[7/8] kylin git commit: KYLIN-1579 IT preparation classes like BuildCubeWithEngine should exit with status code upon build exception

Posted by ma...@apache.org.
KYLIN-1579 IT preparation classes like BuildCubeWithEngine should exit with status code upon build exception


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/71d3f303
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/71d3f303
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/71d3f303

Branch: refs/heads/master
Commit: 71d3f303058afa66608440ae3db91e32e921740c
Parents: 11be1e3
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 11:35:54 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800

----------------------------------------------------------------------
 .../kylin/provision/BuildCubeWithEngine.java    | 26 ++++++++++--------
 .../kylin/provision/BuildCubeWithStream.java    | 28 ++++++++++++--------
 .../kylin/provision/BuildIIWithStream.java      | 22 ++++++++-------
 .../coprocessor/endpoint/CubeVisitService.java  |  4 +--
 4 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/71d3f303/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index e1cbe1f..71bb34f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
@@ -76,15 +75,20 @@ public class BuildCubeWithEngine {
     private static final Log logger = LogFactory.getLog(BuildCubeWithEngine.class);
 
     public static void main(String[] args) throws Exception {
-        beforeClass();
-
-        BuildCubeWithEngine buildCubeWithEngine = new BuildCubeWithEngine();
-        buildCubeWithEngine.before();
-        buildCubeWithEngine.build();
-        logger.info("Build is done");
-        afterClass();
-        logger.info("Going to exit");
-        System.exit(0);
+        try {
+            beforeClass();
+
+            BuildCubeWithEngine buildCubeWithEngine = new BuildCubeWithEngine();
+            buildCubeWithEngine.before();
+            buildCubeWithEngine.build();
+            logger.info("Build is done");
+            afterClass();
+            logger.info("Going to exit");
+            System.exit(0);
+        } catch (Exception e) {
+            logger.error("error", e);
+            System.exit(1);
+        }
     }
 
     public static void beforeClass() throws Exception {
@@ -99,7 +103,7 @@ public class BuildCubeWithEngine {
             logger.info("Will not use fast build mode");
         }
 
-        System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+        System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/71d3f303/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 99da26f..d8a9c21 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -23,7 +23,6 @@ import java.util.UUID;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -53,21 +52,28 @@ public class BuildCubeWithStream {
     private KylinConfig kylinConfig;
 
     public static void main(String[] args) throws Exception {
-        beforeClass();
-        BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
-        buildCubeWithStream.before();
-        buildCubeWithStream.build();
-        logger.info("Build is done");
-        afterClass();
-        logger.info("Going to exit");
-        System.exit(0);
-        
+
+        try {
+            beforeClass();
+
+            BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+            buildCubeWithStream.before();
+            buildCubeWithStream.build();
+            logger.info("Build is done");
+            afterClass();
+            logger.info("Going to exit");
+            System.exit(0);
+        } catch (Exception e) {
+            logger.error("error", e);
+            System.exit(1);
+        }
+
     }
 
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+        System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/71d3f303/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
index ace1a2f..6d077e7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -93,14 +92,19 @@ public class BuildIIWithStream {
     private KylinConfig kylinConfig;
 
     public static void main(String[] args) throws Exception {
-        beforeClass();
-        BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
-        buildCubeWithEngine.before();
-        buildCubeWithEngine.build();
-        logger.info("Build is done");
-        afterClass();
-        logger.info("Going to exit");
-        System.exit(0);
+        try {
+            beforeClass();
+            BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
+            buildCubeWithEngine.before();
+            buildCubeWithEngine.build();
+            logger.info("Build is done");
+            afterClass();
+            logger.info("Going to exit");
+            System.exit(0);
+        } catch (Exception e) {
+            logger.error("error", e);
+            System.exit(1);
+        }
     }
 
     public static void beforeClass() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/71d3f303/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5158b33..2f1bb9b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -227,7 +227,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             }
 
             final MutableBoolean normalComplete = new MutableBoolean(true);
-            final long startTime = request.getStartTime();
+            final long startTime = this.serviceStartTime;//request.getStartTime();
             final long timeout = (long) (request.getTimeout() * 0.95);
 
             final CellListIterator cellListIterator = new CellListIterator() {
@@ -294,7 +294,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 allRows = new byte[0];
             }
             compressedAllRows = CompressionUtils.compress(allRows);
-            
+
             appendProfileInfo(sb, "compress done");
 
             OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();


[6/8] kylin git commit: KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout

Posted by ma...@apache.org.
KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/11be1e38
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/11be1e38
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/11be1e38

Branch: refs/heads/master
Commit: 11be1e3826cdea8db8df8975ebdff5cf1d93444f
Parents: b26b248
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 09:47:52 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/gridtable/GTScanRequest.java   |   2 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  16 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  58 ++-
 .../endpoint/generated/CubeVisitProtos.java     | 436 +++++++++++++++++--
 .../endpoint/protobuf/CubeVisit.proto           |   3 +
 5 files changed, 463 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index c4abb57..5681057 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -187,6 +188,7 @@ public class GTScanRequest {
             }
         }
         System.out.println("Meaningless byte is " + meaninglessByte);
+        IOUtils.closeQuietly(scanner);
         return scanned;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1d3da28..38041b3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -162,6 +162,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 throw new RuntimeException("error when waiting queue", e);
             }
         }
+
+        public long getTimeout() {
+            return timeout;
+        }
     }
 
     static class EndpointResultsAsGTScanner implements IGTScanner {
@@ -313,7 +317,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
-
         logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg);
         for (RawScan rs : rawScans) {
             logScan(rs, cubeSeg.getStorageLocationIdentifier());
@@ -323,7 +326,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         final AtomicInteger totalScannedCount = new AtomicInteger(0);
         final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
-        final String currentThreadName = Thread.currentThread().getName();
 
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
             final ByteString finalScanRequestByteString = scanRequestByteString;
@@ -338,6 +340,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     }
                     builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
                     builder.setBehavior(toggle);
+                    builder.setStartTime(System.currentTimeMillis());
+                    builder.setTimeout(epResultItr.getTimeout());
 
                     Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
                     try {
@@ -348,7 +352,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
                     for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
                         totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
-                        logger.info("<sub-thread for GTScanRequest " +  Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
+                        logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
+
+                        if (result.getValue().getStats().getNormalComplete() != 1) {
+                            throw new RuntimeException("The coprocessor thread stopped itself due to scan timeout.");
+                        }
+
                         try {
                             epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
                         } catch (IOException | DataFormatException e) {
@@ -371,6 +380,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
         sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
         sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
+        sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
         return sb.toString();
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 9e8e251..5158b33 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -152,17 +153,17 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
     }
 
     private void appendProfileInfo(StringBuilder sb, String info) {
-        sb.append(System.currentTimeMillis() - this.serviceStartTime);
         if (info != null) {
-            sb.append(":").append(info);
+            sb.append(info);
         }
+        sb.append("@" + (System.currentTimeMillis() - this.serviceStartTime));
         sb.append(",");
     }
 
     @Override
     public void visitCube(RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
 
-        RegionScanner innerScanner = null;
+        List<RegionScanner> regionScanners = Lists.newArrayList();
         HRegion region = null;
 
         StringBuilder sb = new StringBuilder();
@@ -182,6 +183,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
             final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
 
+            appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - request.getStartTime()));
+
             MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
                 @Override
                 public DimensionEncoding getDimEnc(TblColRef col) {
@@ -190,6 +193,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             });
 
             final List<InnerScannerAsIterator> cellListsForeachRawScan = Lists.newArrayList();
+
             for (RawScan hbaseRawScan : hbaseRawScans) {
                 if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
                     //if has shard, fill region shard to raw scan start/end
@@ -197,20 +201,23 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 }
 
                 Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
-                innerScanner = region.getScanner(scan);
+                RegionScanner innerScanner = region.getScanner(scan);
+                regionScanners.add(innerScanner);
 
                 InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
                 cellListsForeachRawScan.add(cellListIterator);
             }
-            
+
             final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
 
             if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
                 //this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed
                 List<Cell> temp = Lists.newArrayList();
                 int counter = 0;
-                while (innerScanner.nextRaw(temp)) {
-                    counter++;
+                for (RegionScanner innerScanner : regionScanners) {
+                    while (innerScanner.nextRaw(temp)) {
+                        counter++;
+                    }
                 }
                 appendProfileInfo(sb, "scanned " + counter);
             }
@@ -219,7 +226,14 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 scanReq.setAggrCacheGB(0); // disable mem check if so told
             }
 
-            IGTStore store = new HBaseReadonlyStore(new CellListIterator() {
+            final MutableBoolean normalComplete = new MutableBoolean(true);
+            final long startTime = request.getStartTime();
+            final long timeout = (long) (request.getTimeout() * 0.95);
+
+            final CellListIterator cellListIterator = new CellListIterator() {
+
+                int counter = 0;
+
                 @Override
                 public void close() throws IOException {
                     for (CellListIterator closeable : cellListsForeachRawScan) {
@@ -229,6 +243,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
                 @Override
                 public boolean hasNext() {
+                    if (counter++ % 1000 == 1) {
+                        if (System.currentTimeMillis() - startTime > timeout) {
+                            normalComplete.setValue(false);
+                            return false;
+                        }
+                    }
                     return allCellLists.hasNext();
                 }
 
@@ -241,7 +261,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 public void remove() {
                     throw new UnsupportedOperationException();
                 }
-            }, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
+            };
+
+            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
 
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
@@ -260,13 +282,19 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
                 finalRowCount++;
             }
+            finalScanner.close();
 
             appendProfileInfo(sb, "agg done");
 
             //outputStream.close() is not necessary
-            allRows = outputStream.toByteArray();
-            byte[] compressedAllRows = CompressionUtils.compress(allRows);
-
+            byte[] compressedAllRows;
+            if (normalComplete.booleanValue()) {
+                allRows = outputStream.toByteArray();
+            } else {
+                allRows = new byte[0];
+            }
+            compressedAllRows = CompressionUtils.compress(allRows);
+            
             appendProfileInfo(sb, "compress done");
 
             OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
@@ -289,7 +317,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                             setFreeSwapSpaceSize(freeSwapSpaceSize).//
                             setHostname(InetAddress.getLocalHost().getHostName()).// 
                             setEtcMsg(sb.toString()).//
-                            build())
+                            setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build())
                     .//
                     build());
 
@@ -297,7 +325,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             logger.error(ioe.toString());
             ResponseConverter.setControllerException(controller, ioe);
         } finally {
-            IOUtils.closeQuietly(innerScanner);
+            for (RegionScanner innerScanner : regionScanners) {
+                IOUtils.closeQuietly(innerScanner);
+            }
             if (region != null) {
                 try {
                     region.closeRegionOperation();

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 6e3e2bb..53393e8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // source: storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
@@ -98,6 +80,42 @@ public final class CubeVisitProtos {
      */
     org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder getHbaseColumnsToGTOrBuilder(
         int index);
+
+    // required int64 startTime = 6;
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    boolean hasStartTime();
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    long getStartTime();
+
+    // required int64 timeout = 7;
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    boolean hasTimeout();
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    long getTimeout();
   }
   /**
    * Protobuf type {@code CubeVisitRequest}
@@ -178,6 +196,16 @@ public final class CubeVisitProtos {
               hbaseColumnsToGT_.add(input.readMessage(org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.PARSER, extensionRegistry));
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000010;
+              startTime_ = input.readInt64();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000020;
+              timeout_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -852,12 +880,62 @@ public final class CubeVisitProtos {
       return hbaseColumnsToGT_.get(index);
     }
 
+    // required int64 startTime = 6;
+    public static final int STARTTIME_FIELD_NUMBER = 6;
+    private long startTime_;
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    public boolean hasStartTime() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    public long getStartTime() {
+      return startTime_;
+    }
+
+    // required int64 timeout = 7;
+    public static final int TIMEOUT_FIELD_NUMBER = 7;
+    private long timeout_;
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    public boolean hasTimeout() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    public long getTimeout() {
+      return timeout_;
+    }
+
     private void initFields() {
       behavior_ = "";
       gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
       hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
       rowkeyPreambleSize_ = 0;
       hbaseColumnsToGT_ = java.util.Collections.emptyList();
+      startTime_ = 0L;
+      timeout_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -880,6 +958,14 @@ public final class CubeVisitProtos {
         memoizedIsInitialized = 0;
         return false;
       }
+      if (!hasStartTime()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasTimeout()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -902,6 +988,12 @@ public final class CubeVisitProtos {
       for (int i = 0; i < hbaseColumnsToGT_.size(); i++) {
         output.writeMessage(5, hbaseColumnsToGT_.get(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(6, startTime_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(7, timeout_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -931,6 +1023,14 @@ public final class CubeVisitProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, hbaseColumnsToGT_.get(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(6, startTime_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(7, timeout_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -976,6 +1076,16 @@ public final class CubeVisitProtos {
       }
       result = result && getHbaseColumnsToGTList()
           .equals(other.getHbaseColumnsToGTList());
+      result = result && (hasStartTime() == other.hasStartTime());
+      if (hasStartTime()) {
+        result = result && (getStartTime()
+            == other.getStartTime());
+      }
+      result = result && (hasTimeout() == other.hasTimeout());
+      if (hasTimeout()) {
+        result = result && (getTimeout()
+            == other.getTimeout());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1009,6 +1119,14 @@ public final class CubeVisitProtos {
         hash = (37 * hash) + HBASECOLUMNSTOGT_FIELD_NUMBER;
         hash = (53 * hash) + getHbaseColumnsToGTList().hashCode();
       }
+      if (hasStartTime()) {
+        hash = (37 * hash) + STARTTIME_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getStartTime());
+      }
+      if (hasTimeout()) {
+        hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getTimeout());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1133,6 +1251,10 @@ public final class CubeVisitProtos {
         } else {
           hbaseColumnsToGTBuilder_.clear();
         }
+        startTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        timeout_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -1186,6 +1308,14 @@ public final class CubeVisitProtos {
         } else {
           result.hbaseColumnsToGT_ = hbaseColumnsToGTBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.startTime_ = startTime_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.timeout_ = timeout_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1242,6 +1372,12 @@ public final class CubeVisitProtos {
             }
           }
         }
+        if (other.hasStartTime()) {
+          setStartTime(other.getStartTime());
+        }
+        if (other.hasTimeout()) {
+          setTimeout(other.getTimeout());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1263,6 +1399,14 @@ public final class CubeVisitProtos {
           
           return false;
         }
+        if (!hasStartTime()) {
+          
+          return false;
+        }
+        if (!hasTimeout()) {
+          
+          return false;
+        }
         return true;
       }
 
@@ -1704,6 +1848,104 @@ public final class CubeVisitProtos {
         return hbaseColumnsToGTBuilder_;
       }
 
+      // required int64 startTime = 6;
+      private long startTime_ ;
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public boolean hasStartTime() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public long getStartTime() {
+        return startTime_;
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public Builder setStartTime(long value) {
+        bitField0_ |= 0x00000020;
+        startTime_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public Builder clearStartTime() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        startTime_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // required int64 timeout = 7;
+      private long timeout_ ;
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public boolean hasTimeout() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public long getTimeout() {
+        return timeout_;
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public Builder setTimeout(long value) {
+        bitField0_ |= 0x00000040;
+        timeout_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public Builder clearTimeout() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        timeout_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:CubeVisitRequest)
     }
 
@@ -1952,6 +2194,24 @@ public final class CubeVisitProtos {
        */
       com.google.protobuf.ByteString
           getEtcMsgBytes();
+
+      // optional int32 normalComplete = 10;
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      boolean hasNormalComplete();
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      int getNormalComplete();
     }
     /**
      * Protobuf type {@code CubeVisitResponse.Stats}
@@ -2049,6 +2309,11 @@ public final class CubeVisitProtos {
                 etcMsg_ = input.readBytes();
                 break;
               }
+              case 80: {
+                bitField0_ |= 0x00000200;
+                normalComplete_ = input.readInt32();
+                break;
+              }
             }
           }
         } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2287,6 +2552,30 @@ public final class CubeVisitProtos {
         }
       }
 
+      // optional int32 normalComplete = 10;
+      public static final int NORMALCOMPLETE_FIELD_NUMBER = 10;
+      private int normalComplete_;
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      public boolean hasNormalComplete() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      public int getNormalComplete() {
+        return normalComplete_;
+      }
+
       private void initFields() {
         serviceStartTime_ = 0L;
         serviceEndTime_ = 0L;
@@ -2297,6 +2586,7 @@ public final class CubeVisitProtos {
         freeSwapSpaceSize_ = 0D;
         hostname_ = "";
         etcMsg_ = "";
+        normalComplete_ = 0;
       }
       private byte memoizedIsInitialized = -1;
       public final boolean isInitialized() {
@@ -2337,6 +2627,9 @@ public final class CubeVisitProtos {
         if (((bitField0_ & 0x00000100) == 0x00000100)) {
           output.writeBytes(9, getEtcMsgBytes());
         }
+        if (((bitField0_ & 0x00000200) == 0x00000200)) {
+          output.writeInt32(10, normalComplete_);
+        }
         getUnknownFields().writeTo(output);
       }
 
@@ -2382,6 +2675,10 @@ public final class CubeVisitProtos {
           size += com.google.protobuf.CodedOutputStream
             .computeBytesSize(9, getEtcMsgBytes());
         }
+        if (((bitField0_ & 0x00000200) == 0x00000200)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeInt32Size(10, normalComplete_);
+        }
         size += getUnknownFields().getSerializedSize();
         memoizedSerializedSize = size;
         return size;
@@ -2447,6 +2744,11 @@ public final class CubeVisitProtos {
           result = result && getEtcMsg()
               .equals(other.getEtcMsg());
         }
+        result = result && (hasNormalComplete() == other.hasNormalComplete());
+        if (hasNormalComplete()) {
+          result = result && (getNormalComplete()
+              == other.getNormalComplete());
+        }
         result = result &&
             getUnknownFields().equals(other.getUnknownFields());
         return result;
@@ -2499,6 +2801,10 @@ public final class CubeVisitProtos {
           hash = (37 * hash) + ETCMSG_FIELD_NUMBER;
           hash = (53 * hash) + getEtcMsg().hashCode();
         }
+        if (hasNormalComplete()) {
+          hash = (37 * hash) + NORMALCOMPLETE_FIELD_NUMBER;
+          hash = (53 * hash) + getNormalComplete();
+        }
         hash = (29 * hash) + getUnknownFields().hashCode();
         memoizedHashCode = hash;
         return hash;
@@ -2626,6 +2932,8 @@ public final class CubeVisitProtos {
           bitField0_ = (bitField0_ & ~0x00000080);
           etcMsg_ = "";
           bitField0_ = (bitField0_ & ~0x00000100);
+          normalComplete_ = 0;
+          bitField0_ = (bitField0_ & ~0x00000200);
           return this;
         }
 
@@ -2690,6 +2998,10 @@ public final class CubeVisitProtos {
             to_bitField0_ |= 0x00000100;
           }
           result.etcMsg_ = etcMsg_;
+          if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+            to_bitField0_ |= 0x00000200;
+          }
+          result.normalComplete_ = normalComplete_;
           result.bitField0_ = to_bitField0_;
           onBuilt();
           return result;
@@ -2737,6 +3049,9 @@ public final class CubeVisitProtos {
             etcMsg_ = other.etcMsg_;
             onChanged();
           }
+          if (other.hasNormalComplete()) {
+            setNormalComplete(other.getNormalComplete());
+          }
           this.mergeUnknownFields(other.getUnknownFields());
           return this;
         }
@@ -3143,6 +3458,55 @@ public final class CubeVisitProtos {
           return this;
         }
 
+        // optional int32 normalComplete = 10;
+        private int normalComplete_ ;
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public boolean hasNormalComplete() {
+          return ((bitField0_ & 0x00000200) == 0x00000200);
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public int getNormalComplete() {
+          return normalComplete_;
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public Builder setNormalComplete(int value) {
+          bitField0_ |= 0x00000200;
+          normalComplete_ = value;
+          onChanged();
+          return this;
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public Builder clearNormalComplete() {
+          bitField0_ = (bitField0_ & ~0x00000200);
+          normalComplete_ = 0;
+          onChanged();
+          return this;
+        }
+
         // @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats)
       }
 
@@ -3936,24 +4300,26 @@ public final class CubeVisitProtos {
     java.lang.String[] descriptorData = {
       "\npstorage-hbase/src/main/java/org/apache" +
       "/kylin/storage/hbase/cube/v2/coprocessor" +
-      "/endpoint/protobuf/CubeVisit.proto\"\273\001\n\020C" +
+      "/endpoint/protobuf/CubeVisit.proto\"\337\001\n\020C" +
       "ubeVisitRequest\022\020\n\010behavior\030\001 \002(\t\022\025\n\rgtS" +
       "canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 \002(\014\022\032" +
       "\n\022rowkeyPreambleSize\030\004 \002(\005\0223\n\020hbaseColum" +
-      "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\032" +
-      "\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\271\002\n\021CubeVisitRe" +
-      "sponse\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030" +
-      "\002 \002(\0132\030.CubeVisitResponse.Stats\032\342\001\n\005Stat",
-      "s\022\030\n\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEn" +
-      "dTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\005\022\032\n\022" +
-      "aggregatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoa" +
-      "d\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022" +
-      "\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010" +
-      " \001(\t\022\016\n\006etcMsg\030\t \001(\t2F\n\020CubeVisitService" +
-      "\0222\n\tvisitCube\022\021.CubeVisitRequest\032\022.CubeV" +
-      "isitResponseB`\nEorg.apache.kylin.storage" +
-      ".hbase.cube.v2.coprocessor.endpoint.gene" +
-      "ratedB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+      "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\022" +
+      "\021\n\tstartTime\030\006 \002(\003\022\017\n\007timeout\030\007 \002(\003\032\027\n\007I" +
+      "ntList\022\014\n\004ints\030\001 \003(\005\"\321\002\n\021CubeVisitRespon" +
+      "se\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(",
+      "\0132\030.CubeVisitResponse.Stats\032\372\001\n\005Stats\022\030\n" +
+      "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
+      "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\005\022\032\n\022aggr" +
+      "egatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoad\030\005 " +
+      "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" +
+      "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" +
+      "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005" +
+      "2F\n\020CubeVisitService\0222\n\tvisitCube\022\021.Cube" +
+      "VisitRequest\032\022.CubeVisitResponseB`\nEorg." +
+      "apache.kylin.storage.hbase.cube.v2.copro",
+      "cessor.endpoint.generatedB\017CubeVisitProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3965,7 +4331,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitRequest_descriptor,
-              new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", });
+              new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "StartTime", "Timeout", });
           internal_static_CubeVisitRequest_IntList_descriptor =
             internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
           internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new
@@ -3983,7 +4349,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitResponse_Stats_descriptor,
-              new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", });
+              new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index 5b66a56..ecaad35 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -35,6 +35,8 @@ message CubeVisitRequest {
     required bytes hbaseRawScan = 3;
     required int32 rowkeyPreambleSize = 4;
     repeated IntList hbaseColumnsToGT = 5;
+    required int64 startTime = 6;//when client start the request
+    required int64 timeout = 7;//how long client will wait
     message IntList {
         repeated int32 ints = 1;
     }
@@ -51,6 +53,7 @@ message CubeVisitResponse {
         optional double freeSwapSpaceSize = 7;
         optional string hostname = 8;
         optional string etcMsg = 9;
+        optional int32 normalComplete =10;//when time outs, normalComplete will be false
     }
     required bytes compressedRows = 1;
     required Stats stats = 2;


[3/8] kylin git commit: refactor

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index ef53cb7..938145b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -19,30 +19,35 @@
 package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
-import javax.annotation.Nullable;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
@@ -84,53 +89,77 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
         }
     }
 
-    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
+    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, final GTInfo fullGTInfo) {
         super(cubeSeg, cuboid, fullGTInfo);
+        MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
+            @Override
+            public DimensionEncoding getDimEnc(TblColRef col) {
+                return fullGTInfo.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
+            }
+        });
     }
 
     @Override
-    public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException {
-        final List<IGTScanner> scanners = Lists.newArrayList();
-        for (GTScanRequest request : scanRequests) {
-            scanners.add(getGTScanner(request));
-        }
+    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
+        final IGTScanner scanner = getGTScannerInternal(scanRequest);
 
         return new IGTScanner() {
             @Override
             public GTInfo getInfo() {
-                return scanners.get(0).getInfo();
+                return scanner.getInfo();
             }
 
             @Override
             public int getScannedRowCount() {
                 int sum = 0;
-                for (IGTScanner s : scanners) {
-                    sum += s.getScannedRowCount();
-                }
+                sum += scanner.getScannedRowCount();
                 return sum;
             }
 
             @Override
             public void close() throws IOException {
-                for (IGTScanner s : scanners) {
-                    s.close();
-                }
+                scanner.close();
             }
 
             @Override
             public Iterator<GTRecord> iterator() {
-                return Iterators.concat(Iterators.transform(scanners.iterator(), new Function<IGTScanner, Iterator<GTRecord>>() {
-                    @Nullable
-                    @Override
-                    public Iterator<GTRecord> apply(IGTScanner input) {
-                        return input.iterator();
-                    }
-                }));
+                return scanner.iterator();
             }
         };
     }
 
-    private IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
+    //for non-sharding cases it will only return one byte[] with not shard at beginning
+    private List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
+        final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+
+        if (!cubeSeg.isEnableSharding()) {
+            return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked
+        } else {
+            List<byte[]> ret = Lists.newArrayList();
+            for (short i = 0; i < cuboidShardNum; ++i) {
+                short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
+                byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length);
+                BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+                ret.add(cookedKey);
+            }
+            return ret;
+        }
+    }
+
+    private List<RawScan> spawnRawScansForAllShards(RawScan rawScan) {
+        List<RawScan> ret = Lists.newArrayList();
+        List<byte[]> startKeys = getRowKeysDifferentShards(rawScan.startKey);
+        List<byte[]> endKeys = getRowKeysDifferentShards(rawScan.endKey);
+        for (int i = 0; i < startKeys.size(); i++) {
+            RawScan temp = new RawScan(rawScan);
+            temp.startKey = startKeys.get(i);
+            temp.endKey = endKeys.get(i);
+            ret.add(temp);
+        }
+        return ret;
+    }
+
+    private IGTScanner getGTScannerInternal(final GTScanRequest scanRequest) throws IOException {
 
         // primary key (also the 0th column block) is always selected
         final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
@@ -138,22 +167,23 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
         HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
         final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
 
-        List<RawScan> rawScans = preparedHBaseScans(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+        List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
         List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
 
         final List<ResultScanner> scanners = Lists.newArrayList();
         final List<Iterator<Result>> resultIterators = Lists.newArrayList();
 
         for (RawScan rawScan : rawScans) {
+            for (RawScan rawScanWithShard : spawnRawScansForAllShards(rawScan)) {
+                logScan(rawScanWithShard, cubeSeg.getStorageLocationIdentifier());
+                Scan hbaseScan = buildScan(rawScanWithShard);
 
-            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
-            Scan hbaseScan = buildScan(rawScan);
-
-            final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
-            final Iterator<Result> iterator = scanner.iterator();
+                final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
+                final Iterator<Result> iterator = scanner.iterator();
 
-            scanners.add(scanner);
-            resultIterators.add(iterator);
+                scanners.add(scanner);
+                resultIterators.add(iterator);
+            }
         }
 
         final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator());

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 6bbb9c0..9ed914a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -19,204 +19,48 @@
 package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.BuildInFunctionTransformer;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.gridtable.EmptyGTScanner;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRangePlanner;
 import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.gridtable.ScannerWorker;
 import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 public class CubeSegmentScanner implements IGTScanner {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class);
 
-    private static final int MAX_SCAN_RANGES = 200;
-
     final CubeSegment cubeSeg;
-    final GTInfo info;
-    final List<GTScanRequest> scanRequests;
-    final Scanner scanner;
+    final ScannerWorker scanner;
     final Cuboid cuboid;
 
+    final GTScanRequest scanRequest;
+
     public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
             Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
         this.cuboid = cuboid;
         this.cubeSeg = cubeSeg;
-        this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
-
-        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
 
         // translate FunctionTupleFilter to IN clause
         ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
         filter = translator.transform(filter);
 
-        //replace the constant values in filter to dictionary codes 
-        TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);
-
-        ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions);
-        ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc()));
-        ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
-        String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
-
-        GTScanRangePlanner scanRangePlanner;
-        if (cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) {
-            TblColRef tblColRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-            TblColRef partitionColOfGT = null;
-            Pair<ByteArray, ByteArray> segmentStartAndEnd = null;
-            int index = mapping.getIndexOf(tblColRef);
-            if (index >= 0) {
-                segmentStartAndEnd = getSegmentStartAndEnd(index);
-                partitionColOfGT = info.colRef(index);
-            }
-            scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, partitionColOfGT);
-        } else {
-            scanRangePlanner = new GTScanRangePlanner(info, null, null);
-        }
-        List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
-
-        scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
-
-        KylinConfig config = cubeSeg.getCubeInstance().getConfig();
-        for (GTScanRange range : scanRanges) {
-            GTScanRequest req = new GTScanRequest(info, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, config.getQueryCoprocessorMemGB());
-            scanRequests.add(req);
-        }
-
-        scanner = new Scanner();
-    }
-
-    private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) {
-        ByteArray start;
-        if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) {
-            start = encodeTime(cubeSeg.getDateRangeStart(), index, 1);
-        } else {
-            start = new ByteArray();
-        }
-
-        ByteArray end;
-        if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) {
-            end = encodeTime(cubeSeg.getDateRangeEnd(), index, -1);
-        } else {
-            end = new ByteArray();
-        }
-        return Pair.newPair(start, end);
-
-    }
-
-    private ByteArray encodeTime(long ts, int index, int roundingFlag) {
-        String value;
-        DataType partitionColType = info.getColumnType(index);
-        if (partitionColType.isDate()) {
-            value = DateFormat.formatToDateStr(ts);
-        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
-            value = DateFormat.formatToTimeWithoutMilliStr(ts);
-        } else if (partitionColType.isStringFamily()) {
-            String partitionDateFormat = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
-            if (StringUtils.isEmpty(partitionDateFormat))
-                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
-            value = DateFormat.formatToDateStr(ts, partitionDateFormat);
-        } else {
-            throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
-        }
-
-        ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength());
-        info.getCodeSystem().encodeColumnValue(index, value, roundingFlag, buffer);
-
-        return ByteArray.copyOf(buffer.array(), 0, buffer.position());
-    }
-
-    private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
-        Set<TblColRef> ret = Sets.newHashSet();
-        for (TblColRef col : input) {
-            if (cubeDesc.hasHostColumn(col)) {
-                for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
-                    ret.add(host);
-                }
-            } else {
-                ret.add(col);
-            }
-        }
-        return ret;
-    }
-
-    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
-        BitSet result = new BitSet();
-        for (TblColRef dim : dimensions) {
-            int idx = mapping.getIndexOf(dim);
-            if (idx >= 0)
-                result.set(idx);
-        }
-        return new ImmutableBitSet(result);
-    }
-
-    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
-        BitSet result = new BitSet();
-        for (FunctionDesc metric : metrics) {
-            int idx = mapping.getIndexOf(metric);
-            if (idx < 0)
-                throw new IllegalStateException(metric + " not found in " + mapping);
-            result.set(idx);
-        }
-        return new ImmutableBitSet(result);
-    }
-
-    private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
-
-        //metrics are represented in ImmutableBitSet, which loses order information
-        //sort the aggrFuns to align with metrics natural order 
-        List<FunctionDesc> metricList = Lists.newArrayList(metrics);
-        Collections.sort(metricList, new Comparator<FunctionDesc>() {
-            @Override
-            public int compare(FunctionDesc o1, FunctionDesc o2) {
-                int a = mapping.getIndexOf(o1);
-                int b = mapping.getIndexOf(o2);
-                return a - b;
-            }
-        });
-
-        String[] result = new String[metricList.size()];
-        int i = 0;
-        for (FunctionDesc metric : metricList) {
-            result[i++] = metric.getExpression();
-        }
-        return result;
+        GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics);
+        scanRequest = scanRangePlanner.planScanRequest(allowPreAggregate);
+        scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest);
     }
 
     @Override
@@ -231,7 +75,7 @@ public class CubeSegmentScanner implements IGTScanner {
 
     @Override
     public GTInfo getInfo() {
-        return info;
+        return scanRequest == null ? null : scanRequest.getInfo();
     }
 
     @Override
@@ -239,47 +83,4 @@ public class CubeSegmentScanner implements IGTScanner {
         return scanner.getScannedRowCount();
     }
 
-    private class Scanner {
-        IGTScanner internal = null;
-
-        public Scanner() {
-            CubeHBaseRPC rpc;
-            if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
-                MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
-                    @Override
-                    public DimensionEncoding getDimEnc(TblColRef col) {
-                        return info.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
-                    }
-                });
-                rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); // for local debug
-            } else {
-                rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info); // default behavior
-            }
-
-            try {
-                if (scanRequests.size() == 0) {
-                    logger.info("Segment {} will be skipped", cubeSeg);
-                    internal = new EmptyGTScanner(0);
-                } else {
-                    internal = rpc.getGTScanner(scanRequests);
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("error", e);
-            }
-        }
-
-        public Iterator<GTRecord> iterator() {
-            return internal.iterator();
-        }
-
-        public void close() throws IOException {
-            internal.close();
-        }
-
-        public int getScannedRowCount() {
-            return internal.getScannedRowCount();
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 863dd67..e0e6d83 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -79,26 +79,26 @@ public class CubeStorageQuery implements IStorageQuery {
         Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
         buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
 
-        // all dimensions = groups + filter dimensions
-        Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
-        filterDims.removeAll(groups);
+        // all dimensions = groups + other(like filter) dimensions
+        Set<TblColRef> otherDims = Sets.newHashSet(dimensions);
+        otherDims.removeAll(groups);
 
         // expand derived (xxxD means contains host columns only, derived columns were translated)
         Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
         Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
-        Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
-        filterDimsD.removeAll(groupsD);
+        Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation);
+        otherDimsD.removeAll(groupsD);
 
         // identify cuboid
         Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
         dimensionsD.addAll(groupsD);
-        dimensionsD.addAll(filterDimsD);
-        Cuboid cuboid = identifyCuboid(dimensionsD, metrics);
+        dimensionsD.addAll(otherDimsD);
+        Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc,dimensionsD, metrics);
         context.setCuboid(cuboid);
 
         // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
         Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
+        boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
         context.setExactAggregation(isExactAggregation);
 
         // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
@@ -169,19 +169,7 @@ public class CubeStorageQuery implements IStorageQuery {
         return expanded;
     }
 
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
-        for (FunctionDesc metric : metrics) {
-            if (metric.getMeasureType().onlyAggrInBaseCuboid())
-                return Cuboid.getBaseCuboid(cubeDesc);
-        }
-
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return Cuboid.findById(cubeDesc, cuboidID);
-    }
+   
 
     @SuppressWarnings("unchecked")
     private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
index c2ffdba..c2eacc1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
@@ -47,6 +47,16 @@ public class RawScan {
         this.hbaseMaxResultSize = hbaseMaxResultSize;
     }
 
+    public RawScan(RawScan other) {
+
+        this.startKey = other.startKey;
+        this.endKey = other.endKey;
+        this.hbaseColumns = other.hbaseColumns;
+        this.fuzzyKeys = other.fuzzyKeys;
+        this.hbaseCaching = other.hbaseCaching;
+        this.hbaseMaxResultSize = other.hbaseMaxResultSize;
+    }
+
     public String getStartKeyAsString() {
         return BytesUtil.toHex(this.startKey);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 1e7b1b5..9e8e251 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.io.IOUtils;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dimension.DimensionEncoding;
@@ -58,6 +60,7 @@ import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryI
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.RpcCallback;
@@ -138,8 +141,21 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
     }
 
-    private void appendProfileInfo(StringBuilder sb) {
+    private List<RawScan> deserializeRawScans(ByteBuffer in) {
+        int rawScanCount = BytesUtil.readVInt(in);
+        List<RawScan> ret = Lists.newArrayList();
+        for (int i = 0; i < rawScanCount; i++) {
+            RawScan temp = RawScan.serializer.deserialize(in);
+            ret.add(temp);
+        }
+        return ret;
+    }
+
+    private void appendProfileInfo(StringBuilder sb, String info) {
         sb.append(System.currentTimeMillis() - this.serviceStartTime);
+        if (info != null) {
+            sb.append(":").append(info);
+        }
         sb.append(",");
     }
 
@@ -159,7 +175,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             region.startRegionOperation();
 
             final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
-            final RawScan hbaseRawScan = RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+            List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
+            for (IntList intList : request.getHbaseColumnsToGTList()) {
+                hbaseColumnsToGT.add(intList.getIntsList());
+            }
+            CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
+            final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
 
             MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
                 @Override
@@ -168,40 +189,61 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 }
             });
 
-            List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
-            for (IntList intList : request.getHbaseColumnsToGTList()) {
-                hbaseColumnsToGT.add(intList.getIntsList());
-            }
-
-            if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
-                //if has shard, fill region shard to raw scan start/end
-                updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
-            }
-
-            Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
+            final List<InnerScannerAsIterator> cellListsForeachRawScan = Lists.newArrayList();
+            for (RawScan hbaseRawScan : hbaseRawScans) {
+                if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
+                    //if has shard, fill region shard to raw scan start/end
+                    updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
+                }
 
-            appendProfileInfo(sb);
+                Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
+                innerScanner = region.getScanner(scan);
 
-            innerScanner = region.getScanner(scan);
-            CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
+                InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
+                cellListsForeachRawScan.add(cellListIterator);
+            }
+            
+            final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
 
             if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
+                //this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed
                 List<Cell> temp = Lists.newArrayList();
                 int counter = 0;
                 while (innerScanner.nextRaw(temp)) {
                     counter++;
                 }
-                sb.append("Scanned " + counter + " rows in " + (System.currentTimeMillis() - serviceStartTime) + ",");
+                appendProfileInfo(sb, "scanned " + counter);
             }
 
-            InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
             if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
                 scanReq.setAggrCacheGB(0); // disable mem check if so told
             }
 
-            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
-            IGTScanner rawScanner = store.scan(scanReq);
+            IGTStore store = new HBaseReadonlyStore(new CellListIterator() {
+                @Override
+                public void close() throws IOException {
+                    for (CellListIterator closeable : cellListsForeachRawScan) {
+                        closeable.close();
+                    }
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return allCellLists.hasNext();
+                }
+
+                @Override
+                public List<Cell> next() {
+                    return allCellLists.next();
+                }
 
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            }, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
+
+            IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
                     behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), //
                     behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
@@ -219,20 +261,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 finalRowCount++;
             }
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "agg done");
 
             //outputStream.close() is not necessary
             allRows = outputStream.toByteArray();
             byte[] compressedAllRows = CompressionUtils.compress(allRows);
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "compress done");
 
             OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
             double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
             double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
             double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize();
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "server stats done");
 
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
index b12451a..fa1687b 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
@@ -42,7 +42,7 @@ public class SandboxMetastoreCLI {
     public static void main(String[] args) throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }


[8/8] kylin git commit: update kylin metadata version from 1.5.1 to 1.5.2

Posted by ma...@apache.org.
update kylin metadata version from 1.5.1 to 1.5.2


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/59cb57ca
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/59cb57ca
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/59cb57ca

Branch: refs/heads/master
Commit: 59cb57ca6e1a545628d0057d5980c4fe7e789536
Parents: b6c893d
Author: Hongbin Ma <ma...@apache.org>
Authored: Wed Apr 13 11:39:42 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:39:42 2016 +0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kylin/common/KylinVersion.java        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/59cb57ca/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
index d55f969..0700968 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
@@ -59,7 +59,7 @@ public class KylinVersion {
     /**
      * Require MANUAL updating kylin version per ANY upgrading.
      */
-    private static final KylinVersion CURRENT_KYLIN_VERSION = new KylinVersion("1.5.1");
+    private static final KylinVersion CURRENT_KYLIN_VERSION = new KylinVersion("1.5.2");
 
     private static final Set<KylinVersion> SIGNATURE_INCOMPATIBLE_REVISIONS = new HashSet<KylinVersion>();
 


[4/8] kylin git commit: refactor

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
deleted file mode 100644
index d50baad..0000000
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.ErrorTestExecutable;
-import org.apache.kylin.job.FailedTestExecutable;
-import org.apache.kylin.job.SelfStopExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.junit.Test;
-
-/**
- */
-public class DefaultSchedulerTest extends BaseSchedulerTest {
-
-    @Test
-    public void testSingleTaskJob() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-    }
-
-    @Test
-    public void testSucceed() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        BaseTestExecutable task2 = new SucceedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testSucceedAndFailed() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        BaseTestExecutable task2 = new FailedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testSucceedAndError() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new ErrorTestExecutable();
-        BaseTestExecutable task2 = new SucceedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testDiscard() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SelfStopExecutable();
-        job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
-        jobService.discardJob(job.getId());
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
-        Thread.sleep(5000);
-        System.out.println(job);
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Test
-    public void testSchedulerPool() throws InterruptedException {
-        ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
-        final CountDownLatch countDownLatch = new CountDownLatch(3);
-        ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                countDownLatch.countDown();
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
-        assertTrue("future should still running", future.cancel(true));
-
-        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
-        ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                countDownLatch2.countDown();
-                throw new RuntimeException();
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
-        assertFalse("future2 should has been stopped", future2.cancel(true));
-
-        final CountDownLatch countDownLatch3 = new CountDownLatch(3);
-        ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    countDownLatch3.countDown();
-                    throw new RuntimeException();
-                } catch (Exception e) {
-                }
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
-        assertTrue("future3 should still running", future3.cancel(true));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
index 0ae7e6a..18da37a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.metadata.filter.UDF;
 
-import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.metadata.filter.function.Functions;
 import org.apache.kylin.metadata.model.TblColRef;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
new file mode 100644
index 0000000..3bb4540
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
@@ -0,0 +1,189 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ */
+public class StorageMockUtils {
+    public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) {
+        TupleInfo info = new TupleInfo();
+        int idx = 0;
+
+        for (TblColRef col : groups) {
+            info.setField(col.getName(), col, idx++);
+        }
+
+        TableDesc sourceTable = groups.get(0).getColumnDesc().getTable();
+        for (FunctionDesc func : aggregations) {
+            TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable));
+            info.setField(col.getName(), col, idx++);
+        }
+
+        return info;
+    }
+
+    public static List<TblColRef> buildGroups() {
+        List<TblColRef> groups = new ArrayList<TblColRef>();
+
+        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+        ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
+        TblColRef cf1 = new TblColRef(c1);
+        groups.add(cf1);
+
+        TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS");
+        ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string");
+        TblColRef cf2 = new TblColRef(c2);
+        groups.add(cf2);
+
+        return groups;
+    }
+
+    public static List<FunctionDesc> buildAggregations1() {
+        List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+        TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
+
+        FunctionDesc f1 = new FunctionDesc();
+        f1.setExpression("SUM");
+        ParameterDesc p1 = new ParameterDesc();
+        p1.setType("column");
+        p1.setValue("PRICE");
+        p1.setColRefs(ImmutableList.of(priceCol));
+        f1.setParameter(p1);
+        f1.setReturnType("decimal(19,4)");
+        functions.add(f1);
+
+
+        return functions;
+    }
+
+    public static List<FunctionDesc> buildAggregations() {
+        List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+        TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
+        TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint"));
+
+        FunctionDesc f1 = new FunctionDesc();
+        f1.setExpression("SUM");
+        ParameterDesc p1 = new ParameterDesc();
+        p1.setType("column");
+        p1.setValue("PRICE");
+        p1.setColRefs(ImmutableList.of(priceCol));
+        f1.setParameter(p1);
+        f1.setReturnType("decimal(19,4)");
+        functions.add(f1);
+
+        FunctionDesc f2 = new FunctionDesc();
+        f2.setExpression("COUNT_DISTINCT");
+        ParameterDesc p2 = new ParameterDesc();
+        p2.setType("column");
+        p2.setValue("SELLER_ID");
+        p2.setColRefs(ImmutableList.of(sellerCol));
+        f2.setParameter(p2);
+        f2.setReturnType("hllc(10)");
+        functions.add(f2);
+
+        return functions;
+    }
+
+    public static CompareTupleFilter buildTs2010Filter(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter1);
+        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01");
+        compareFilter.addChild(constantFilter1);
+        return compareFilter;
+    }
+
+    public static CompareTupleFilter buildTs2011Filter(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter1);
+        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01");
+        compareFilter.addChild(constantFilter1);
+        return compareFilter;
+    }
+
+    public static CompareTupleFilter buildFilter1(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter1);
+        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
+        compareFilter.addChild(constantFilter1);
+        return compareFilter;
+    }
+
+    public static CompareTupleFilter buildFilter2(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter2);
+        ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories");
+        compareFilter.addChild(constantFilter2);
+        return compareFilter;
+    }
+
+    public static CompareTupleFilter buildFilter3(TblColRef column) {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter1);
+        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
+        compareFilter.addChild(constantFilter1);
+        return compareFilter;
+    }
+
+
+    public static TupleFilter buildAndFilter(List<TblColRef> columns) {
+        CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
+        CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
+        LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        andFilter.addChild(compareFilter1);
+        andFilter.addChild(compareFilter2);
+        return andFilter;
+    }
+
+    public static TupleFilter buildOrFilter(List<TblColRef> columns) {
+        CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
+        CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
+        LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+        logicFilter.addChild(compareFilter1);
+        logicFilter.addChild(compareFilter2);
+        return logicFilter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
deleted file mode 100644
index 5f8f08f..0000000
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.cache;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- */
-public class StorageMockUtils {
-    public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) {
-        TupleInfo info = new TupleInfo();
-        int idx = 0;
-
-        for (TblColRef col : groups) {
-            info.setField(col.getName(), col, idx++);
-        }
-
-        TableDesc sourceTable = groups.get(0).getColumnDesc().getTable();
-        for (FunctionDesc func : aggregations) {
-            TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable));
-            info.setField(col.getName(), col, idx++);
-        }
-
-        return info;
-    }
-
-    public static List<TblColRef> buildGroups() {
-        List<TblColRef> groups = new ArrayList<TblColRef>();
-
-        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
-        ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
-        TblColRef cf1 = new TblColRef(c1);
-        groups.add(cf1);
-
-        TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS");
-        ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string");
-        TblColRef cf2 = new TblColRef(c2);
-        groups.add(cf2);
-
-        return groups;
-    }
-
-    public static List<FunctionDesc> buildAggregations() {
-        List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
-
-        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
-        TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
-        TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint"));
-
-        FunctionDesc f1 = new FunctionDesc();
-        f1.setExpression("SUM");
-        ParameterDesc p1 = new ParameterDesc();
-        p1.setType("column");
-        p1.setValue("PRICE");
-        p1.setColRefs(ImmutableList.of(priceCol));
-        f1.setParameter(p1);
-        f1.setReturnType("decimal(19,4)");
-        functions.add(f1);
-
-        FunctionDesc f2 = new FunctionDesc();
-        f2.setExpression("COUNT_DISTINCT");
-        ParameterDesc p2 = new ParameterDesc();
-        p2.setType("column");
-        p2.setValue("SELLER_ID");
-        p2.setColRefs(ImmutableList.of(sellerCol));
-        f2.setParameter(p2);
-        f2.setReturnType("hllc(10)");
-        functions.add(f2);
-
-        return functions;
-    }
-
-    public static CompareTupleFilter buildTs2010Filter(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
-        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter1);
-        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01");
-        compareFilter.addChild(constantFilter1);
-        return compareFilter;
-    }
-
-    public static CompareTupleFilter buildTs2011Filter(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
-        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter1);
-        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01");
-        compareFilter.addChild(constantFilter1);
-        return compareFilter;
-    }
-
-    public static CompareTupleFilter buildFilter1(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
-        ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter1);
-        ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
-        compareFilter.addChild(constantFilter1);
-        return compareFilter;
-    }
-
-    public static CompareTupleFilter buildFilter2(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
-        ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter2);
-        ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories");
-        compareFilter.addChild(constantFilter2);
-        return compareFilter;
-    }
-
-    public static TupleFilter buildAndFilter(List<TblColRef> columns) {
-        CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
-        CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
-        LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
-        andFilter.addChild(compareFilter1);
-        andFilter.addChild(compareFilter2);
-        return andFilter;
-    }
-
-    public static TupleFilter buildOrFilter(List<TblColRef> columns) {
-        CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
-        CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
-        LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
-        logicFilter.addChild(compareFilter1);
-        logicFilter.addChild(compareFilter2);
-        return logicFilter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 60815c7..1a3efb7 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -124,6 +124,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-job</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-storage-hbase</artifactId>
             <type>test-jar</type>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index 4291d91..809cfb7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -63,8 +63,8 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
         CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
 
         cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+        flatTable = LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
     }
 
     @AfterClass
@@ -84,7 +84,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
 
         {
             Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
-            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
             future.get();
         }
     }
@@ -101,7 +101,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
 
         @Override
         public void close() {
-            
+
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
new file mode 100644
index 0000000..ab9ac63
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -0,0 +1,163 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(ITDoggedCubeBuilderTest.class);
+
+    private static final int INPUT_ROWS = 10000;
+    private static final int SPLIT_ROWS = 5000;
+    private static final int THREADS = 4;
+
+    private static CubeInstance cube;
+    private static String flatTable;
+    private static Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+    @BeforeClass
+    public static void before() throws IOException {
+        staticCreateTestMetadata();
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        flatTable = LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        long randSeed = System.currentTimeMillis();
+
+        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        doggedBuilder.setConcurrentThreads(THREADS);
+        doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
+        FileRecordWriter doggedResult = new FileRecordWriter();
+
+        {
+            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            doggedResult.close();
+        }
+
+        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        inmemBuilder.setConcurrentThreads(THREADS);
+        FileRecordWriter inmemResult = new FileRecordWriter();
+
+        {
+            Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            inmemResult.close();
+        }
+
+        fileCompare(doggedResult.file, inmemResult.file);
+        doggedResult.file.delete();
+        inmemResult.file.delete();
+    }
+
+    private void fileCompare(File file, File file2) throws IOException {
+        BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+        BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
+
+        String line1, line2;
+        do {
+            line1 = r1.readLine();
+            line2 = r2.readLine();
+
+            assertEquals(line1, line2);
+
+        } while (line1 != null || line2 != null);
+
+        r1.close();
+        r2.close();
+    }
+
+    class FileRecordWriter implements ICuboidWriter {
+
+        File file;
+        PrintWriter writer;
+
+        FileRecordWriter() throws IOException {
+            file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
+            writer = new PrintWriter(file, "UTF-8");
+        }
+
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            writer.print(cuboidId);
+            writer.print(", ");
+            writer.print(record.toString());
+            writer.println();
+        }
+
+        @Override
+        public void flush() {
+
+        }
+
+        @Override
+        public void close() {
+            writer.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
new file mode 100644
index 0000000..ad02f2a
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -0,0 +1,271 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(ITInMemCubeBuilderTest.class);
+
+    private CubeInstance cube;
+    private String flatTable;
+    private Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+    private int nInpRows;
+    private int nThreads;
+
+    @Before
+    public void before() throws IOException {
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testKylinCube() throws Exception {
+        testBuild("test_kylin_cube_without_slr_left_join_empty", //
+                LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv", 70000, 4);
+    }
+
+    @Test
+    public void testSSBCube() throws Exception {
+        testBuild("ssb", //
+                LOCALMETA_TEST_DATA + "/data/kylin_intermediate_ssb_19920101000000_19920201000000.csv", 1000, 1);
+    }
+
+    public void testBuild(String cubeName, String flatTable, int nInpRows, int nThreads) throws Exception {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        this.nInpRows = nInpRows;
+        this.nThreads = nThreads;
+
+        this.cube = cubeManager.getCube(cubeName);
+        this.flatTable = flatTable;
+        this.dictionaryMap = getDictionaryMap(cube, flatTable);
+
+        testBuildInner();
+    }
+
+    private void testBuildInner() throws Exception {
+
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        cubeBuilder.setConcurrentThreads(nThreads);
+
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        try {
+            // round 1
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, nInpRows);
+                future.get();
+            }
+
+            // round 2, zero input
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, 0);
+                future.get();
+            }
+
+            // round 3
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, nInpRows);
+                future.get();
+            }
+
+        } catch (Exception e) {
+            logger.error("stream build failed", e);
+            throw new IOException("Failed to build cube ", e);
+        }
+    }
+
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+        feedData(cube, flatTable, queue, count, 0);
+    }
+
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
+        int nColumns = flatTableDesc.getColumnList().size();
+
+        @SuppressWarnings("unchecked")
+        Set<String>[] distinctSets = new Set[nColumns];
+        for (int i = 0; i < nColumns; i++)
+            distinctSets[i] = new TreeSet<String>();
+
+        // get distinct values on each column
+        List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+        for (String line : lines) {
+            String[] row = line.trim().split(",");
+            assert row.length == nColumns;
+            for (int i = 0; i < nColumns; i++)
+                distinctSets[i].add(row[i]);
+        }
+
+        List<String[]> distincts = new ArrayList<String[]>();
+        for (int i = 0; i < nColumns; i++) {
+            distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
+        }
+
+        Random rand = new Random();
+        if (randSeed != 0)
+            rand.setSeed(randSeed);
+
+        // output with random data
+        for (; count > 0; count--) {
+            ArrayList<String> row = new ArrayList<String>(nColumns);
+            for (int i = 0; i < nColumns; i++) {
+                String[] candidates = distincts.get(i);
+                row.add(candidates[rand.nextInt(candidates.length)]);
+            }
+            queue.put(row);
+        }
+        queue.put(new ArrayList<String>(0));
+    }
+
+    static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+        CubeDesc desc = cube.getDescriptor();
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+        int nColumns = flatTableDesc.getColumnList().size();
+
+        List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
+        for (int c = 0; c < columns.size(); c++) {
+            TblColRef col = columns.get(c);
+            if (desc.getRowkey().isUseDictionary(col)) {
+                logger.info("Building dictionary for " + col);
+                List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+                Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+                result.put(col, dict);
+            }
+        }
+
+        for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) {
+            MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx);
+            FunctionDesc func = measureDesc.getFunction();
+            List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func);
+            if (dictCols.isEmpty())
+                continue;
+
+            int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
+            List<TblColRef> paramCols = func.getParameter().getColRefs();
+            for (int i = 0; i < paramCols.size(); i++) {
+                TblColRef col = paramCols.get(i);
+                if (dictCols.contains(col)) {
+                    int colIdxOnFlat = flatTableIdx[i];
+                    logger.info("Building dictionary for " + col);
+                    List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat);
+                    Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+
+                    result.put(col, dict);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+        List<byte[]> result = Lists.newArrayList();
+        List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+        for (String line : lines) {
+            String[] row = line.trim().split(",");
+            if (row.length != nColumns) {
+                throw new IllegalStateException();
+            }
+            if (row[c] != null) {
+                result.add(Bytes.toBytes(row[c]));
+            }
+        }
+        return result;
+    }
+
+    class ConsoleGTRecordWriter implements ICuboidWriter {
+
+        boolean verbose = false;
+
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            if (verbose)
+                System.out.println(record.toString());
+        }
+
+        @Override
+        public void flush() {
+            if (verbose) {
+                System.out.println("flush");
+            }
+        }
+
+        @Override
+        public void close() {
+            if (verbose) {
+                System.out.println("close");
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
new file mode 100644
index 0000000..ad1ddd3
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * 
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.job.impl.threadpool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.job.BaseTestExecutable;
+import org.apache.kylin.job.ErrorTestExecutable;
+import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.SelfStopExecutable;
+import org.apache.kylin.job.SucceedTestExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class ITDefaultSchedulerTest extends BaseSchedulerTest {
+
+    @Test
+    public void testSingleTaskJob() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        job.addTask(task1);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+    }
+
+    @Test
+    public void testSucceed() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        BaseTestExecutable task2 = new SucceedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testSucceedAndFailed() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        BaseTestExecutable task2 = new FailedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testSucceedAndError() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new ErrorTestExecutable();
+        BaseTestExecutable task2 = new SucceedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testDiscard() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SelfStopExecutable();
+        job.addTask(task1);
+        jobService.addJob(job);
+        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+        jobService.discardJob(job.getId());
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
+        Thread.sleep(5000);
+        System.out.println(job);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testSchedulerPool() throws InterruptedException {
+        ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
+        final CountDownLatch countDownLatch = new CountDownLatch(3);
+        ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                countDownLatch.countDown();
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
+        assertTrue("future should still running", future.cancel(true));
+
+        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
+        ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                countDownLatch2.countDown();
+                throw new RuntimeException();
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
+        assertFalse("future2 should has been stopped", future2.cancel(true));
+
+        final CountDownLatch countDownLatch3 = new CountDownLatch(3);
+        ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    countDownLatch3.countDown();
+                    throw new RuntimeException();
+                } catch (Exception e) {
+                }
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
+        assertTrue("future3 should still running", future3.cancel(true));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index c945485..e1cbe1f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -99,12 +99,12 @@ public class BuildCubeWithEngine {
             logger.info("Will not use fast build mode");
         }
 
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
 
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
 
         try {
             //check hdfs permission

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
index 5ab5e83..d862dbf 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
@@ -80,7 +80,7 @@ public class BuildCubeWithSpark {
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
@@ -89,7 +89,7 @@ public class BuildCubeWithSpark {
 
     @Before
     public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
 
         DeployUtil.initCliWorkDir();
         DeployUtil.deployMetadata();
@@ -117,7 +117,7 @@ public class BuildCubeWithSpark {
     @Test
     public void test() throws Exception {
         final CubeSegment segment = createSegment();
-        String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
+        String confPath = new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
         KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
         String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
         logger.info("confPath location:" + confPath);

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index eeff999..99da26f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -67,11 +67,11 @@ public class BuildCubeWithStream {
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
     }
 
     public void before() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
index 4b8ce24..643b122 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
@@ -92,7 +92,7 @@ public class BuildIIWithEngine {
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
@@ -100,7 +100,7 @@ public class BuildIIWithEngine {
 
     @Before
     public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
 
         //DeployUtil.initCliWorkDir();
         //        DeployUtil.deployMetadata();

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
index 9b7cd14..ace1a2f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -109,7 +109,7 @@ public class BuildIIWithStream {
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
     }
 
     public void before() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index ec3e60f..46aa68d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -19,11 +19,9 @@
 package org.apache.kylin.query;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
@@ -38,22 +36,18 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.logging.LogManager;
 
-import com.google.common.base.Throwables;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionContext;
 import org.apache.kylin.common.KylinConfig;
 import org.dbunit.Assertion;
 import org.dbunit.database.DatabaseConfig;
 import org.dbunit.database.DatabaseConnection;
 import org.dbunit.database.IDatabaseConnection;
 import org.dbunit.dataset.DataSetException;
-import org.dbunit.dataset.DefaultTable;
 import org.dbunit.dataset.ITable;
 import org.dbunit.dataset.SortedTable;
 import org.dbunit.dataset.datatype.DataType;
@@ -63,8 +57,6 @@ import org.dbunit.ext.h2.H2DataTypeFactory;
 import org.junit.Assert;
 
 import com.google.common.io.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  */
@@ -287,7 +279,6 @@ public class KylinTestBase {
         return ret;
     }
 
-
     protected void execQueryUsingH2(String queryFolder, boolean needSort) throws Exception {
         printInfo("---------- Running H2 queries: " + queryFolder);
 
@@ -349,7 +340,6 @@ public class KylinTestBase {
             h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
             ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort);
 
-
             try {
                 // compare the result
                 Assert.assertEquals(h2Table.getRowCount(), kylinTable.getRowCount());
@@ -427,7 +417,6 @@ public class KylinTestBase {
             h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
             ITable h2Table = executeDynamicQuery(h2Conn, queryName, sql, parameters, needSort);
 
-
             // compare the result
             Assertion.assertEquals(h2Table, kylinTable);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index 15e435e..136342d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -38,7 +38,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.StorageFactory;
-import org.apache.kylin.storage.cache.StorageMockUtils;
+import org.apache.kylin.storage.StorageMockUtils;
 import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.junit.After;
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index eb36eed..796b9c1 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -44,6 +44,13 @@
 
         <!-- Env & Test -->
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
index 70c11b3..83c50c0 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
@@ -24,18 +24,16 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * Created by dongli on 2/22/16.
- */
 public class HiveCmdBuilderTest {
 
     @Before
     public void setup() {
-        System.setProperty("KYLIN_CONF", "../examples/test_case_data/localmeta");
+        System.setProperty("KYLIN_CONF", LocalFileMetadataTestCase.LOCALMETA_TEST_DATA);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 6bbb0b7..1d3da28 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -56,6 +57,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -250,20 +252,21 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
     }
 
     @Override
-    public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException {
+    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
 
         final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
+
         logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
 
         short cuboidBaseShard = cubeSeg.getCuboidBaseShard(this.cuboid.getId());
         short shardNum = cubeSeg.getCuboidShardNum(this.cuboid.getId());
         int totalShards = cubeSeg.getTotalShards();
 
-        final List<ByteString> scanRequestByteStrings = Lists.newArrayList();
-        final List<ByteString> rawScanByteStrings = Lists.newArrayList();
+        ByteString scanRequestByteString = null;
+        ByteString rawScanByteString = null;
 
         // primary key (also the 0th column block) is always selected
-        final ImmutableBitSet selectedColBlocks = scanRequests.get(0).getSelectedColBlocks().set(0);
+        final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
 
         // globally shared connection, does not require close
         final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
@@ -274,65 +277,89 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
         }
 
-        for (GTScanRequest req : scanRequests) {
-            ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-            GTScanRequest.serializer.serialize(req, buffer);
-            buffer.flip();
-            scanRequestByteStrings.add(HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()));
+        //TODO: raw scan can be constructed at region side to reduce traffic
+        List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
+        int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
+                BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
+                for (RawScan rs : rawScans) {
+                    RawScan.serializer.serialize(rs, rawScanBuffer);
+                }
+                rawScanBuffer.flip();
+                rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
+                break;
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
+                rawScanBufferSize *= 4;
+            }
+        }
+        scanRequest.setGTScanRanges(Lists.<GTScanRange> newArrayList());//since raw scans are sent to coprocessor, we don't need to duplicate sending it
 
-            RawScan rawScan = preparedHBaseScan(req.getPkStart(), req.getPkEnd(), req.getFuzzyKeys(), selectedColBlocks);
+        int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
+                GTScanRequest.serializer.serialize(scanRequest, buffer);
+                buffer.flip();
+                scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
+                break;
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
+                scanRequestBufferSize *= 4;
+            }
+        }
 
-            ByteBuffer rawScanBuffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-            RawScan.serializer.serialize(rawScan, rawScanBuffer);
-            rawScanBuffer.flip();
-            rawScanByteStrings.add(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit()));
+        logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
-            logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", buffer.limit() - buffer.position(), rawScanBuffer.limit() - rawScanBuffer.position());
 
-            logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(req)), cubeSeg);
-            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+        logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg);
+        for (RawScan rs : rawScans) {
+            logScan(rs, cubeSeg.getStorageLocationIdentifier());
         }
 
-        logger.debug("Submitting rpc to {} shards starting from shard {}, scan requests count {}", new Object[] { shardNum, cuboidBaseShard, scanRequests.size() });
+        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", new Object[] { shardNum, cuboidBaseShard, rawScans.size() });
 
         final AtomicInteger totalScannedCount = new AtomicInteger(0);
-        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(scanRequests.size() * shardNum);
+        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
+        final String currentThreadName = Thread.currentThread().getName();
 
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
+            final ByteString finalScanRequestByteString = scanRequestByteString;
+            final ByteString finalRawScanByteString = rawScanByteString;
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
-                    for (int i = 0; i < scanRequests.size(); ++i) {
-                        CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
-                        builder.setGtScanRequest(scanRequestByteStrings.get(i)).setHbaseRawScan(rawScanByteStrings.get(i));
-                        for (IntList intList : hbaseColumnsToGTIntList) {
-                            builder.addHbaseColumnsToGT(intList);
-                        }
-                        builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
-                        builder.setBehavior(toggle);
+                    CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
+                    builder.setGtScanRequest(finalScanRequestByteString).setHbaseRawScan(finalRawScanByteString);
+                    for (IntList intList : hbaseColumnsToGTIntList) {
+                        builder.addHbaseColumnsToGT(intList);
+                    }
+                    builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
+                    builder.setBehavior(toggle);
+
+                    Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+                    try {
+                        results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
+                    } catch (Throwable throwable) {
+                        throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + "Error when visiting cubes by endpoint", throwable);
+                    }
 
-                        Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+                    for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
+                        totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
+                        logger.info("<sub-thread for GTScanRequest " +  Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
                         try {
-                            results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
-                        } catch (Throwable throwable) {
-                            throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + "Error when visiting cubes by endpoint", throwable);
-                        }
-
-                        for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
-                            totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
-                            logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + getStatsString(result));
-                            try {
-                                epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
-                            } catch (IOException | DataFormatException e) {
-                                throw new RuntimeException("Error when decompressing", e);
-                            }
+                            epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
+                        } catch (IOException | DataFormatException e) {
+                            throw new RuntimeException("Error when decompressing", e);
                         }
                     }
                 }
             });
         }
 
-        return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequests.get(0).getColumns(), totalScannedCount.get());
+        return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
     }
 
     private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 8563a5e..49e8593 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.storage.hbase.cube.v2;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -43,16 +42,15 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.cube.model.HBaseMappingDesc;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.IGTStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public abstract class CubeHBaseRPC {
+public abstract class CubeHBaseRPC implements IGTStorage {
 
     public static final Logger logger = LoggerFactory.getLogger(CubeHBaseRPC.class);
 
@@ -71,8 +69,6 @@ public abstract class CubeHBaseRPC {
         this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
     }
 
-    abstract IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException;
-
     public static Scan buildScan(RawScan rawScan) {
         Scan scan = new Scan();
         scan.setCaching(rawScan.hbaseCaching);
@@ -96,7 +92,7 @@ public abstract class CubeHBaseRPC {
         return scan;
     }
 
-    protected RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+    private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
         final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
 
         LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
@@ -123,46 +119,12 @@ public abstract class CubeHBaseRPC {
         return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize);
     }
 
-    protected List<RawScan> preparedHBaseScans(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
-        final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
-        List<RawScan> ret = Lists.newArrayList();
-
-        LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
-        byte[] start = encoder.createBuf();
-        byte[] end = encoder.createBuf();
-        List<byte[]> startKeys;
-        List<byte[]> endKeys;
-
-        encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
-        encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start);
-        startKeys = encoder.getRowKeysDifferentShards(start);
-
-        encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
-        encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end);
-        endKeys = encoder.getRowKeysDifferentShards(end);
-        endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() {
-            @Override
-            public byte[] apply(byte[] input) {
-                byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning
-                System.arraycopy(input, 0, shardEnd, 0, input.length);
-                return shardEnd;
-            }
-        });
-
-        Preconditions.checkState(startKeys.size() == endKeys.size());
-        List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys);
-
-        KylinConfig config = cubeSeg.getCubeDesc().getConfig();
-        int hbaseCaching = config.getHBaseScanCacheRows();
-        int hbaseMaxResultSize = config.getHBaseScanMaxResultSize();
-        if (isMemoryHungry(selectedColBlocks))
-            hbaseCaching /= 10;
-
-        for (short i = 0; i < startKeys.size(); ++i) {
-            ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize));
+    protected List<RawScan> preparedHBaseScans(List<GTScanRange> ranges, ImmutableBitSet selectedColBlocks) {
+        List<RawScan> allRawScans = Lists.newArrayList();
+        for (GTScanRange range : ranges) {
+            allRawScans.add(preparedHBaseScan(range.pkStart, range.pkEnd, range.fuzzyKeys, selectedColBlocks));
         }
-        return ret;
-
+        return allRawScans;
     }
 
     private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) {