You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/09/12 15:54:21 UTC

[1/2] kylin git commit: KYLIN-1922 refactors

Repository: kylin
Updated Branches:
  refs/heads/master 6db4b1723 -> 4e8ed97d1


KYLIN-1922 refactors


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4e8ed97d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4e8ed97d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4e8ed97d

Branch: refs/heads/master
Commit: 4e8ed97d12c53c19d09f736be3baaa9112dcf413
Parents: a201c5b
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 12 23:52:43 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Sep 12 23:53:48 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     |  10 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   3 +-
 .../apache/kylin/gridtable/GTScanRequest.java   |   4 +-
 .../metadata/realization/IRealization.java      |   1 +
 .../apache/kylin/storage/StorageContext.java    |  34 ++++-
 .../storage/gtrecord/CubeScanRangePlanner.java  |   6 +-
 .../gtrecord/GTCubeStorageQueryBase.java        |  10 +-
 .../gtrecord/SequentialCubeTupleIterator.java   |   7 +-
 .../gtrecord/StorageResponseGTScatter.java      | 117 +++++++++++++++
 .../kylin/storage/hybrid/HybridInstance.java    |   5 +
 .../apache/kylin/query/ITCombinationTest.java   |   6 +-
 .../apache/kylin/query/ITKylinQueryTest.java    | 150 +++++++------------
 .../org/apache/kylin/query/KylinTestBase.java   |  56 +++++++
 .../src/test/resources/query/sql/query45.sql    |  23 +++
 .../test/resources/query/sql_limit/query01.sql  |  21 +++
 .../test/resources/query/sql_limit/query02.sql  |  24 +++
 .../query/sql_optimize/enable-limit01.sql       |  19 ---
 .../resources/query/sql_timeout/query02.sql     |  19 +++
 .../rules/RemoveBlackoutRealizationsRule.java   |  11 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   3 +-
 .../hbase/cube/v2/ExpectedSizeIterator.java     |   2 +-
 .../storage/hbase/cube/v2/GTBlobScatter.java    | 150 -------------------
 .../coprocessor/endpoint/CubeVisitService.java  |   1 +
 23 files changed, 393 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 151e142..851b016 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -35,17 +35,17 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.CapabilityResult;
-import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
@@ -305,7 +305,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
 
         return result;
     }
-    
 
     public CubeSegment getSegment(String name, SegmentStatusEnum status) {
         for (CubeSegment segment : segments) {
@@ -404,6 +403,11 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return endTime;
     }
 
+    @Override
+    public boolean supportsLimitPushDown() {
+        return getDescriptor().supportsLimitPushDown();
+    }
+
     public int getRowKeyColumnCount() {
         return getDescriptor().getRowkey().getRowKeyColumns().length;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2d9945a..e6b3d3f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -992,8 +992,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
     }
 
     public boolean supportsLimitPushDown() {
-        //currently only ID_SHARDED_HBASE supports limit push down
-        return getStorageType() == IStorageAware.ID_SHARDED_HBASE;
+        return getStorageType() != IStorageAware.ID_HBASE && getStorageType() != IStorageAware.ID_HYBRID;
     }
 
     public int getStorageType() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 4f68806..dc90ed6 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -42,8 +42,10 @@ import com.google.common.collect.Sets;
 public class GTScanRequest {
 
     private static final Logger logger = LoggerFactory.getLogger(GTScanRequest.class);
+    
     //it's not necessary to increase the checkInterval to very large because the check cost is not high
-    public static final int terminateCheckInterval = 1000;
+    //changing it might break org.apache.kylin.query.ITKylinQueryTest.testTimeoutQuery()
+    public static final int terminateCheckInterval = 100;
 
     private GTInfo info;
     private List<GTScanRange> ranges;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index fda05ce..040cdc5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -57,4 +57,5 @@ public interface IRealization extends IStorageAware {
 
     public long getDateRangeEnd();
 
+    public boolean supportsLimitPushDown();
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index acb4960..cc39918 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -20,7 +20,11 @@ package org.apache.kylin.storage;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Range;
 
@@ -28,6 +32,7 @@ import com.google.common.collect.Range;
  * @author xjiang
  */
 public class StorageContext {
+    private static final Logger logger = LoggerFactory.getLogger(StorageContext.class);
 
     public static final int DEFAULT_THRESHOLD = 1000000;
 
@@ -35,6 +40,7 @@ public class StorageContext {
     private int threshold;
     private int limit;
     private int offset;
+    private int finalPushDownLimit;
     private boolean hasSort;
     private boolean acceptPartialResult;
 
@@ -62,6 +68,7 @@ public class StorageContext {
 
         this.acceptPartialResult = false;
         this.partialResultReturned = false;
+        this.finalPushDownLimit = Integer.MAX_VALUE;
     }
 
     public String getConnUrl() {
@@ -104,10 +111,33 @@ public class StorageContext {
         return this.enableLimit;
     }
 
-    public int getStoragePushDownLimit() {
+    private int getStoragePushDownLimit() {
         return this.isLimitEnabled() ? this.getOffset() + this.getLimit() : Integer.MAX_VALUE;
     }
-    
+
+    public int getFinalPushDownLimit() {
+        return finalPushDownLimit;
+    }
+
+    public void setFinalPushDownLimit(IRealization realization) {
+
+        //decide the final limit push down
+        int tempPushDownLimit = this.getStoragePushDownLimit();
+        if (tempPushDownLimit == Integer.MAX_VALUE) {
+            return;
+        }
+        
+        int pushDownLimitMax = KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax();
+        if (!realization.supportsLimitPushDown()) {
+            logger.info("Not enabling limit push down because cube storage type not supported");
+        } else if (tempPushDownLimit > pushDownLimitMax) {
+            logger.info("Not enabling limit push down because the limit(including offset) {} is larger than kylin.query.pushdown.limit.max {}", //
+                    tempPushDownLimit, pushDownLimitMax);
+        } else {
+            this.finalPushDownLimit = tempPushDownLimit;
+        }
+    }
+
     public void markSort() {
         this.hasSort = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 9f505f3..b011f40 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -152,9 +152,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
                     setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()).//
                     setStorageScanRowNumThreshold(context.getThreshold());
 
-            if (cubeDesc.supportsLimitPushDown()) {
-                builder.setStoragePushDownLimit(context.getStoragePushDownLimit());
-            }
+            if (context.getFinalPushDownLimit() != Integer.MAX_VALUE)
+                builder.setStoragePushDownLimit(context.getFinalPushDownLimit());
+
             scanRequest = builder.createGTScanRequest();
         } else {
             scanRequest = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index f0c2494..31663d0 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -113,9 +113,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
         TupleFilter filterD = translateDerived(filter, groupsD);
 
+        //set whether to aggr at storage
         context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
+        // set limit push down
         enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
-        setThresholdIfNecessary(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+        context.setFinalPushDownLimit(cubeInstance);
+        // set cautious threshold to prevent out of memory
+        setThresholdIfNecessary(dimensionsD, metrics, context);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
         for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
@@ -135,7 +139,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         if (scanners.isEmpty())
             return ITupleIterator.EMPTY_TUPLE_ITERATOR;
 
-        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context, cubeDesc.supportsLimitPushDown());
+        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
     }
 
     protected boolean skipZeroInputSegment(CubeSegment cubeSegment) {
@@ -398,7 +402,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
     private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
         boolean possible = true;
 
-        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+        boolean goodFilter = filter == null || TupleFilter.isEvaluableRecursively(filter);
         if (!goodFilter) {
             possible = false;
             logger.info("Storage limit push down is impossible because the filter is unevaluatable");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 7059473..bef0e88 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,7 +25,6 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -54,7 +53,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
     private int scanCountDelta;
 
     public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
-            Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, boolean supportLimitPushDown) {
+            Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
         this.context = context;
         this.scanners = scanners;
 
@@ -63,8 +62,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
             segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
         }
 
-        this.storagePushDownLimit = context.getStoragePushDownLimit();
-        if (!supportLimitPushDown || storagePushDownLimit > KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
+        this.storagePushDownLimit = context.getFinalPushDownLimit();
+        if (storagePushDownLimit == Integer.MAX_VALUE) {
             //normal case
             tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
new file mode 100644
index 0000000..fe1afd3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * scatter the blob returned from region server to a iterable of gtrecords
+ */
+public class StorageResponseGTScatter implements IGTScanner {
+
+    private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class);
+
+    private GTInfo info;
+    private Iterator<byte[]> blocks;
+    private ImmutableBitSet columns;
+    private long totalScannedCount;
+    private int storagePushDownLimit = -1;
+
+    public StorageResponseGTScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) {
+        this.info = info;
+        this.blocks = blocks;
+        this.columns = columns;
+        this.totalScannedCount = totalScannedCount;
+        this.storagePushDownLimit = storagePushDownLimit;
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public long getScannedRowCount() {
+        return totalScannedCount;
+    }
+
+    @Override
+    public void close() throws IOException {
+        //do nothing
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc());
+        if (storagePushDownLimit != Integer.MAX_VALUE) {
+            return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
+        } else {
+            return Iterators.concat(shardSubsets);
+        }
+    }
+
+    class EndpointResponseGTScatterFunc implements Function<byte[], Iterator<GTRecord>> {
+        @Nullable
+        @Override
+        public Iterator<GTRecord> apply(@Nullable final byte[] input) {
+
+            return new Iterator<GTRecord>() {
+                private ByteBuffer inputBuffer = null;
+                //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
+                private GTRecord firstRecord = null;
+
+                @Override
+                public boolean hasNext() {
+                    if (inputBuffer == null) {
+                        inputBuffer = ByteBuffer.wrap(input);
+                        firstRecord = new GTRecord(info);
+                    }
+
+                    return inputBuffer.position() < inputBuffer.limit();
+                }
+
+                @Override
+                public GTRecord next() {
+                    firstRecord.loadColumns(columns, inputBuffer);
+                    return firstRecord;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index a0262e3..9b3a0fc 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -257,6 +257,11 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
     }
 
     @Override
+    public boolean supportsLimitPushDown() {
+        return false;
+    }
+
+    @Override
     public List<TblColRef> getAllDimensions() {
         init();
         return allDimensions;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
index cbd4e44..f4667af 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
@@ -85,10 +85,10 @@ public class ITCombinationTest extends ITKylinQueryTest {
             // unset
         }
 
-        RemoveBlackoutRealizationsRule.blackouts.clear();
+        RemoveBlackoutRealizationsRule.blackList.clear();
         if (excludeViewCubes) {
-            RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
-            RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
+            RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
+            RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
         }
 
         if ("v1".equalsIgnoreCase(queryEngine))

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index c1c9767..b9895e8 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -21,31 +21,22 @@ package org.apache.kylin.query;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
-import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
+import org.apache.kylin.gridtable.GTScanTimeoutException;
 import org.apache.kylin.gridtable.StorageSideBehavior;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.enumerator.OLAPQuery;
-import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.routing.Candidate;
 import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
-import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.storage.hbase.HBaseStorage;
-import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.dbunit.database.DatabaseConnection;
 import org.dbunit.database.IDatabaseConnection;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -73,8 +64,8 @@ public class ITKylinQueryTest extends KylinTestBase {
 
         setupAll();
 
-        RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
-        RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
+        RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
+        RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
     }
 
     @AfterClass
@@ -84,105 +75,69 @@ public class ITKylinQueryTest extends KylinTestBase {
         clean();
     }
 
-    protected static void setupAll() throws Exception {
-        //setup env
-        HBaseMetadataTestCase.staticCreateTestMetadata();
-        config = KylinConfig.getInstanceFromEnv();
-
-        //setup cube conn
-        File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
-        Properties props = new Properties();
-        props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10001");
-        cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
-
-        //setup h2
-        h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
-        // Load H2 Tables (inner join)
-        H2Database h2DB = new H2Database(h2Connection, config);
-        h2DB.loadAllTables();
-
-    }
-
-    protected static void clean() {
-        if (cubeConnection != null)
-            closeConnection(cubeConnection);
-        if (h2Connection != null)
-            closeConnection(h2Connection);
-
-        ObserverEnabler.forceCoprocessorUnset();
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-        RemoveBlackoutRealizationsRule.blackouts.clear();
-
-    }
-
     protected String getQueryFolderPrefix() {
         return "";
     }
 
+    protected Throwable findRoot(Throwable throwable) {
+        while (true) {
+            if (throwable.getCause() != null) {
+                throwable = throwable.getCause();
+            } else {
+                break;
+            }
+        }
+        return throwable;
+    }
+
     @Test
     public void testTimeoutQuery() throws Exception {
         if (HBaseStorage.overwriteStorageQuery != null) {
             //v1 engine does not suit
             return;
         }
-
-        thrown.expect(SQLException.class);
-
-        //should not break at table duplicate check, should fail at model duplicate check
-        thrown.expect(new BaseMatcher<Throwable>() {
-            @Override
-            public boolean matches(Object item) {
-
-                //find the "root"
-                Throwable throwable = (Throwable) item;
-                while (true) {
-                    if (throwable.getCause() != null) {
-                        throwable = throwable.getCause();
-                    } else {
-                        break;
-                    }
-                }
-
-                if (throwable instanceof GTScanSelfTerminatedException) {
-                    return true;
-                }
-                return false;
-            }
-
-            @Override
-            public void describeTo(Description description) {
-            }
-        });
-
-        runTimetoutQueries();
-
-    }
-
-    protected void runTimetoutQueries() throws Exception {
         try {
 
             Map<String, String> toggles = Maps.newHashMap();
             toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
             BackdoorToggles.setToggles(toggles);
 
-            KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.03");//set timeout to 9s
+            KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.01");//set timeout to 3s
 
             //these two cubes has RAW measure, will disturb limit push down
-            RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
-            RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
+            RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
+            RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
 
-            execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_timeout", null, true);
+            runTimeoutQueries();
         } finally {
 
             //these two cubes has RAW measure, will disturb limit push down
-            RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
-            RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
+            RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
+            RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
 
             KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "1");//set timeout to 9s 
             BackdoorToggles.cleanToggles();
         }
     }
 
+    protected void runTimeoutQueries() throws Exception {
+        List<File> sqlFiles = getFilesFromFolder(new File(getQueryFolderPrefix() + "src/test/resources/query/sql_timeout"), ".sql");
+        for (File sqlFile : sqlFiles) {
+            try {
+                runSQL(sqlFile, false, false);
+            } catch (SQLException e) {
+
+                System.out.println(e.getMessage());
+
+                if (findRoot(e) instanceof GTScanSelfTerminatedException) {
+                    //expected
+                    continue;
+                }
+            }
+            throw new RuntimeException("Expecting GTScanTimeoutException");
+        }
+    }
+
     //don't try to ignore this test, try to clean your "temp" folder
     @Test
     public void testTempQuery() throws Exception {
@@ -346,11 +301,25 @@ public class ITKylinQueryTest extends KylinTestBase {
         execAndCompDynamicQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_dynamic", null, true);
     }
 
-    @Ignore("simple query will be supported by ii")
     @Test
     public void testLimitEnabled() throws Exception {
-        runSqlFile(getQueryFolderPrefix() + "src/test/resources/query/sql_optimize/enable-limit01.sql");
-        assertLimitWasEnabled();
+        if (HBaseStorage.overwriteStorageQuery == null) {//v1 query engine will not work
+
+            try {
+                //other cubes have strange aggregation groups
+                RemoveBlackoutRealizationsRule.whiteList.add("CUBE[name=test_kylin_cube_with_slr_empty]");
+
+                List<File> sqlFiles = getFilesFromFolder(new File(getQueryFolderPrefix() + "src/test/resources/query/sql_limit"), ".sql");
+                for (File sqlFile : sqlFiles) {
+                    runSQL(sqlFile, false, false);
+                    assertTrue(checkLimitEnabled());
+                    assertTrue(checkFinalPushDownLimit());
+                }
+
+            } finally {
+                RemoveBlackoutRealizationsRule.whiteList.remove("CUBE[name=test_kylin_cube_with_slr_empty]");
+            }
+        }
     }
 
     @Test
@@ -377,13 +346,4 @@ public class ITKylinQueryTest extends KylinTestBase {
         this.batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_window");
     }
 
-    private void assertLimitWasEnabled() {
-        OLAPContext context = getFirstOLAPContext();
-        assertTrue(context.storageContext.isLimitEnabled());
-    }
-
-    private OLAPContext getFirstOLAPContext() {
-        return OLAPContext.getThreadLocalContexts().iterator().next();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 2ad1105..294750e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -38,12 +39,20 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.logging.LogManager;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
+import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.dbunit.Assertion;
 import org.dbunit.database.DatabaseConfig;
 import org.dbunit.database.DatabaseConnection;
@@ -568,4 +577,51 @@ public class KylinTestBase {
         printInfo(sb.toString());
         return count;
     }
+
+    protected static void setupAll() throws Exception {
+        //setup env
+        HBaseMetadataTestCase.staticCreateTestMetadata();
+        config = KylinConfig.getInstanceFromEnv();
+
+        //setup cube conn
+        File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
+        Properties props = new Properties();
+        props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10001");
+        cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
+
+        //setup h2
+        h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
+        // Load H2 Tables (inner join)
+        H2Database h2DB = new H2Database(h2Connection, config);
+        h2DB.loadAllTables();
+
+    }
+
+    protected static void clean() {
+        if (cubeConnection != null)
+            closeConnection(cubeConnection);
+        if (h2Connection != null)
+            closeConnection(h2Connection);
+
+        ObserverEnabler.forceCoprocessorUnset();
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+        RemoveBlackoutRealizationsRule.blackList.clear();
+
+    }
+
+    protected boolean checkLimitEnabled() {
+        OLAPContext context = getFirstOLAPContext();
+        return (context.storageContext.isLimitEnabled());
+    }
+
+    protected boolean checkFinalPushDownLimit() {
+        OLAPContext context = getFirstOLAPContext();
+        return (context.storageContext.getFinalPushDownLimit() != Integer.MAX_VALUE);
+
+    }
+
+    private OLAPContext getFirstOLAPContext() {
+        return OLAPContext.getThreadLocalContexts().iterator().next();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql/query45.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql/query45.sql b/kylin-it/src/test/resources/query/sql/query45.sql
new file mode 100644
index 0000000..0c78657
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql/query45.sql
@@ -0,0 +1,23 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+
+select seller_id, sum(price) from test_kylin_fact
+  where lstg_format_name='FP-GTC' 
+  group by seller_id limit 20

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_limit/query01.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_limit/query01.sql b/kylin-it/src/test/resources/query/sql_limit/query01.sql
new file mode 100644
index 0000000..fca8175
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_limit/query01.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select * from test_kylin_fact
+  where lstg_format_name='FP-GTC' 
+ limit 20

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_limit/query02.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_limit/query02.sql b/kylin-it/src/test/resources/query/sql_limit/query02.sql
new file mode 100644
index 0000000..53f7bd7
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_limit/query02.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+
+select seller_id, sum(price) from test_kylin_fact
+  where lstg_format_name='FP-GTC' 
+  group by seller_id limit 20
+ 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql b/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql
deleted file mode 100644
index 4a62d92..0000000
--- a/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql
+++ /dev/null
@@ -1,19 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-
-select * from test_kylin_fact limit 10

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_timeout/query02.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_timeout/query02.sql b/kylin-it/src/test/resources/query/sql_timeout/query02.sql
new file mode 100644
index 0000000..2f187a4
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_timeout/query02.sql
@@ -0,0 +1,19 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select seller_id,lstg_format_name,sum(price) from test_kylin_fact group by seller_id,lstg_format_name
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java
index 9c3d7c9..f299d17 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java
@@ -31,15 +31,22 @@ import com.google.common.collect.Sets;
  * for IT use, exclude some cubes 
  */
 public class RemoveBlackoutRealizationsRule extends RoutingRule {
-    public static Set<String> blackouts = Sets.newHashSet();
+    public static Set<String> blackList = Sets.newHashSet();
+    public static Set<String> whiteList = Sets.newHashSet();
 
     @Override
     public void apply(List<Candidate> candidates) {
         for (Iterator<Candidate> iterator = candidates.iterator(); iterator.hasNext();) {
             Candidate candidate = iterator.next();
 
-            if (blackouts.contains(candidate.getRealization().getCanonicalName())) {
+            if (blackList.contains(candidate.getRealization().getCanonicalName())) {
                 iterator.remove();
+                continue;
+            }
+
+            if (!whiteList.isEmpty() && !whiteList.contains(candidate.getRealization().getCanonicalName())) {
+                iterator.remove();
+                continue;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 573951b..c7de287 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -45,6 +45,7 @@ import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
@@ -222,7 +223,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             });
         }
 
-        return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
+        return new StorageResponseGTScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
     }
 
     private ByteString serializeGTScanReq(GTScanRequest scanRequest) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index f4729a3..c27e5fc 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -85,7 +85,7 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
             byte[] ret = null;
 
             while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) {
-                ret = queue.poll(5000, TimeUnit.MILLISECONDS);
+                ret = queue.poll(10000, TimeUnit.MILLISECONDS);
             }
 
             if (coprocException != null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
deleted file mode 100644
index 631510e..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *  
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.storage.gtrecord.SortedIteratorMergerWithLimit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-
-/**
- * scatter the blob returned from region server to a iterable of gtrecords
- */
-class GTBlobScatter implements IGTScanner {
-
-    private static final Logger logger = LoggerFactory.getLogger(GTBlobScatter.class);
-
-    private GTInfo info;
-    private Iterator<byte[]> blocks;
-    private ImmutableBitSet columns;
-    private long totalScannedCount;
-    private int storagePushDownLimit = -1;
-
-    public GTBlobScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) {
-        this.info = info;
-        this.blocks = blocks;
-        this.columns = columns;
-        this.totalScannedCount = totalScannedCount;
-        this.storagePushDownLimit = storagePushDownLimit;
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public long getScannedRowCount() {
-        return totalScannedCount;
-    }
-
-    @Override
-    public void close() throws IOException {
-        //do nothing
-    }
-
-    @Override
-    public Iterator<GTRecord> iterator() {
-        Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new GTBlobScatterFunc());
-        if (storagePushDownLimit <= KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
-            return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
-        } else {
-            return Iterators.concat(shardSubsets);
-        }
-    }
-
-    class GTBlobScatterFunc implements Function<byte[], Iterator<GTRecord>> {
-        @Nullable
-        @Override
-        public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
-            return new Iterator<GTRecord>() {
-                private ByteBuffer inputBuffer = null;
-                //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
-                private GTRecord firstRecord = null;
-                private GTRecord secondRecord = null;
-                private GTRecord thirdRecord = null;
-                private GTRecord fourthRecord = null;
-                private int counter = 0;
-
-                @Override
-                public boolean hasNext() {
-                    if (inputBuffer == null) {
-                        inputBuffer = ByteBuffer.wrap(input);
-                        firstRecord = new GTRecord(info);
-                        secondRecord = new GTRecord(info);
-                        thirdRecord = new GTRecord(info);
-                        fourthRecord = new GTRecord(info);
-                    }
-
-                    return inputBuffer.position() < inputBuffer.limit();
-                }
-
-                @Override
-                public GTRecord next() {
-                    firstRecord.loadColumns(columns, inputBuffer);
-                    //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord));
-                    return firstRecord;
-                    //                    GTRecord temp = new GTRecord(info);
-                    //                    temp.loadColumns(columns, inputBuffer);
-                    //                    return temp;
-
-                    //                    counter++;
-                    //                    int index = counter % 4;
-                    //                    if (index == 1) {
-                    //                        firstRecord.loadColumns(columns, inputBuffer);
-                    //                        //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord));
-                    //                        return firstRecord;
-                    //                    } else if (index == 2) {
-                    //                        secondRecord.loadColumns(columns, inputBuffer);
-                    //                        //logger.info("B GTRecord: " + System.identityHashCode(this) + " " + secondRecord + " " + System.identityHashCode(secondRecord));
-                    //                        return secondRecord;
-                    //                    } else if (index == 3) {
-                    //                        thirdRecord.loadColumns(columns, inputBuffer);
-                    //                        //logger.info("C GTRecord: " + System.identityHashCode(this) + " " + thirdRecord + " " + System.identityHashCode(thirdRecord));
-                    //                        return thirdRecord;
-                    //                    } else {
-                    //                        fourthRecord.loadColumns(columns, inputBuffer);
-                    //                        //logger.info("D GTRecord: " + System.identityHashCode(this) + " " + fourthRecord + " " + System.identityHashCode(fourthRecord));
-                    //                        return fourthRecord;
-                    //                    }
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index ffe41c5..13a7b53 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -246,6 +246,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
             final MutableBoolean scanNormalComplete = new MutableBoolean(true);
             final long deadline = scanReq.getTimeout() + this.serviceStartTime;
+            logger.info("deadline is " + deadline);
             final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
 
             final CellListIterator cellListIterator = new CellListIterator() {


[2/2] kylin git commit: minor refactors on StorageSideBehavior

Posted by ma...@apache.org.
minor refactors on StorageSideBehavior


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a201c5b0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a201c5b0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a201c5b0

Branch: refs/heads/master
Commit: a201c5b0f8e7706ef2cf7cbf9b6d43d3a6bc4a57
Parents: 6db4b17
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 12 13:41:05 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Sep 12 23:53:48 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/gridtable/GTScanRequest.java   | 10 +++++-----
 .../kylin/gridtable/StorageSideBehavior.java    | 14 +++++++++++++-
 .../apache/kylin/query/ITKylinQueryTest.java    | 20 ++++++++++++++++++--
 .../observer/AggregationScanner.java            |  6 +++---
 .../hbase/cube/v2/HBaseReadonlyStore.java       |  4 ++++
 .../coprocessor/endpoint/CubeVisitService.java  |  7 ++-----
 6 files changed, 45 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a201c5b0/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 3e57e86..4f68806 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -158,14 +158,14 @@ public class GTScanRequest {
     }
 
     /**
-     * doFilter,doAggr,doMemCheck are only for profiling use.
+     * filterToggledOn,aggrToggledOn are only for profiling/test use.
      * in normal cases they are all true.
-     * <p/>
+     * 
      * Refer to CoprocessorBehavior for explanation
      */
-    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, long deadline) throws IOException {
+    public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, long deadline) throws IOException {
         IGTScanner result = scanner;
-        if (!doFilter) { //Skip reading this section if you're not profiling! 
+        if (!filterToggledOn) { //Skip reading this section if you're not profiling! 
             int scanned = lookAndForget(result);
             return new EmptyGTScanner(scanned);
         } else {
@@ -174,7 +174,7 @@ public class GTScanRequest {
                 result = new GTFilterScanner(result, this);
             }
 
-            if (!doAggr) {//Skip reading this section if you're not profiling! 
+            if (!aggrToggledOn) {//Skip reading this section if you're not profiling! 
                 long scanned = result.getScannedRowCount();
                 lookAndForget(result);
                 return new EmptyGTScanner(scanned);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a201c5b0/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
index 7fa93e7..b01ac3f 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
@@ -26,5 +26,17 @@ public enum StorageSideBehavior {
     SCAN_FILTER, //only scan+filter used,used for profiling filter speed.  Will not return any result
     SCAN_FILTER_AGGR, //aggregate the result.  Will return results
     SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results
-    SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use
+    SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY; // on each scan operation, delay for 10s to simulate slow queries, for test use
+
+    public boolean filterToggledOn() {
+        return this.ordinal() >= SCAN_FILTER.ordinal();
+    }
+
+    public boolean aggrToggledOn() {
+        return this.ordinal() >= SCAN_FILTER_AGGR.ordinal();
+    }
+
+    public boolean delayToggledOn() {
+        return this.ordinal() >= SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.ordinal();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a201c5b0/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index de68c7a..c1c9767 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -129,10 +129,21 @@ public class ITKylinQueryTest extends KylinTestBase {
         thrown.expect(SQLException.class);
 
         //should not break at table duplicate check, should fail at model duplicate check
-        thrown.expectCause(new BaseMatcher<Throwable>() {
+        thrown.expect(new BaseMatcher<Throwable>() {
             @Override
             public boolean matches(Object item) {
-                if (item instanceof GTScanSelfTerminatedException) {
+
+                //find the "root"
+                Throwable throwable = (Throwable) item;
+                while (true) {
+                    if (throwable.getCause() != null) {
+                        throwable = throwable.getCause();
+                    } else {
+                        break;
+                    }
+                }
+
+                if (throwable instanceof GTScanSelfTerminatedException) {
                     return true;
                 }
                 return false;
@@ -143,6 +154,11 @@ public class ITKylinQueryTest extends KylinTestBase {
             }
         });
 
+        runTimetoutQueries();
+
+    }
+
+    protected void runTimetoutQueries() throws Exception {
         try {
 
             Map<String, String> toggles = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/kylin/blob/a201c5b0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
index a77f988..a900ea1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
@@ -25,9 +25,9 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -86,11 +86,11 @@ public class AggregationScanner implements RegionScanner {
                     meaninglessByte += cell.getRowArray()[i];
                 }
             } else {
-                if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER.ordinal()) {
+                if (behavior.filterToggledOn()) {
                     if (filter != null && filter.evaluate(tuple) == false)
                         continue;
 
-                    if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR.ordinal()) {
+                    if (behavior.aggrToggledOn()) {
                         AggrKey aggKey = projector.getAggrKey(results);
                         MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
                         aggregators.aggregate(bufs, results);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a201c5b0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 1d8ad79..4c02dff 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -45,6 +45,10 @@ public class HBaseReadonlyStore implements IGTStore {
     private int rowkeyPreambleSize;
     private boolean withDelay = false;
 
+
+    /**
+     * @param withDelay is for test use
+     */
     public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) {
         this.cellListIterator = cellListIterator;
         this.info = gtScanRequest.getInfo();

http://git-wip-us.apache.org/repos/asf/kylin/blob/a201c5b0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 36adca1..ffe41c5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -284,13 +284,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 }
             };
 
-            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, //
-                    request.getRowkeyPreambleSize(), StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(scanReq.getStorageBehavior()));
+            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn());
 
             IGTScanner rawScanner = store.scan(scanReq);
-            IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
-                    behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER.ordinal(), //
-                    behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR.ordinal(), deadline);
+            IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), deadline);
 
             ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);