You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/02/12 14:31:29 UTC

[05/13] kylin git commit: KYLIN-2438 replace scan threshold with max scan bytes

KYLIN-2438 replace scan threshold with max scan bytes


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/09a08668
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/09a08668
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/09a08668

Branch: refs/heads/master-hbase0.98
Commit: 09a086688a664585c57b715046a9869b75351a52
Parents: edf6cef
Author: gaodayue <ga...@meituan.com>
Authored: Thu Feb 9 20:18:54 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Fri Feb 10 18:07:58 2017 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  16 ++-
 .../apache/kylin/common/KylinConfigBase.java    |  15 ++-
 .../org/apache/kylin/common/QueryContext.java   |  54 ++++----
 .../kylin-backward-compatibility.properties     |   8 +-
 .../apache/kylin/storage/StorageContext.java    |  14 ---
 .../exception/ScanOutOfLimitException.java      |  31 -----
 .../storage/gtrecord/CubeScanRangePlanner.java  |   4 +-
 .../gtrecord/GTCubeStorageQueryBase.java        |  28 -----
 .../gtrecord/SequentialCubeTupleIterator.java   |  15 +--
 .../org/apache/kylin/query/KylinTestBase.java   |   4 +-
 .../kylin/storage/hbase/ITStorageTest.java      |  12 --
 .../kylin/query/enumerator/OLAPEnumerator.java  |  18 ---
 .../kylin/query/enumerator/OLAPQuery.java       |   2 -
 .../apache/kylin/rest/service/CacheService.java |   6 -
 .../apache/kylin/rest/service/QueryService.java |   9 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  12 +-
 .../coprocessor/endpoint/CubeVisitService.java  |   4 +-
 .../endpoint/generated/CubeVisitProtos.java     | 124 ++++++++++++++++---
 .../endpoint/protobuf/CubeVisit.proto           |   1 +
 19 files changed, 186 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 1232c47..095a53f 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -86,6 +86,14 @@ kylin.storage.hbase.owner-tag=whoami@kylin.apache.org
 
 kylin.storage.hbase.coprocessor-mem-gb=3
 
+# By default kylin can spill query's intermediate results to disks when it's consuming too much memory.
+# Set it to false if you want query to abort immediately in such condition.
+kylin.storage.hbase.coprocessor-spill-enabled=true
+
+# The maximum number of bytes each coprocessor is allowed to scan.
+# To allow arbitrary large scan, you can set it to 0.
+kylin.storage.hbase.coprocessor-max-scan-bytes=3221225472
+
 # The default coprocessor timeout is (hbase.rpc.timeout * 0.9) / 1000 seconds,
 # You can set it to a smaller value. 0 means use default.
 # kylin.storage.hbase.coprocessor-timeout-seconds=0
@@ -148,13 +156,13 @@ kylin.snapshot.max-mb=300
 
 ### QUERY ###
 
-kylin.query.scan-threshold=10000000
+# Controls the maximum number of bytes a query is allowed to scan storage.
+# The default value 0 means no limit.
+# The counterpart kylin.storage.hbase.coprocessor-max-scan-bytes sets the maximum per coprocessor.
+kylin.query.max-scan-bytes=0
 
 kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF
 
-# 3G
-kylin.query.memory-budget-bytes=3221225472
-
 kylin.query.cache-enabled=true
 
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 894e28a..c77788b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -613,6 +613,11 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(this.getOptional("kylin.storage.hbase.coprocessor-spill-enabled", "true"));
     }
 
+    public long getQueryCoprocessorMaxScanBytes() {
+        long value = Long.parseLong(this.getOptional("kylin.storage.hbase.coprocessor-max-scan-bytes", String.valueOf(3L * 1024 * 1024 * 1024)));
+        return value > 0 ? value : Long.MAX_VALUE;
+    }
+
     public int getQueryCoprocessorTimeoutSeconds() {
         return Integer.parseInt(this.getOptional("kylin.storage.hbase.coprocessor-timeout-seconds", "0"));
     }
@@ -807,10 +812,16 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));
     }
 
+    @Deprecated
     public int getScanThreshold() {
         return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000"));
     }
 
+    public long getQueryMaxScanBytes() {
+        long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0"));
+        return value > 0 ? value : Long.MAX_VALUE;
+    }
+
     public int getLargeQueryThreshold() {
         return Integer.parseInt(getOptional("kylin.query.large-query-threshold", String.valueOf((int) (getScanThreshold() * 0.1))));
     }
@@ -851,10 +862,6 @@ abstract public class KylinConfigBase implements Serializable {
         return Long.parseLong(this.getOptional("kylin.query.cache-threshold-scan-count", String.valueOf(10 * 1024)));
     }
 
-    public long getQueryMemBudget() {
-        return Long.parseLong(this.getOptional("kylin.query.memory-budget-bytes", String.valueOf(3L * 1024 * 1024 * 1024)));
-    }
-
     public boolean isQuerySecureEnabled() {
         return Boolean.parseBoolean(this.getOptional("kylin.query.security-enabled", "true"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 93b8556..3a73993 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -18,44 +18,48 @@
 
 package org.apache.kylin.common;
 
-import java.util.Map;
-
-import com.google.common.collect.Maps;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * checkout {@link org.apache.kylin.common.debug.BackdoorToggles} for comparison
  */
 public class QueryContext {
-    private static final ThreadLocal<Map<String, String>> _queryContext = new ThreadLocal<Map<String, String>>();
 
-    public final static String KEY_QUERY_ID = "QUERY_ID";
+    private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
+        @Override
+        protected QueryContext initialValue() {
+            return new QueryContext();
+        }
+    };
+
+    private String queryId;
+    private AtomicLong scanBytes = new AtomicLong();
 
-    public static String getQueryId() {
-        return getString(KEY_QUERY_ID);
+    private QueryContext() {
+        // use QueryContext.current() instead
     }
 
-    public static void setQueryId(String uuid) {
-        setString(KEY_QUERY_ID, uuid);
+    public static QueryContext current() {
+        return contexts.get();
     }
 
-    private static void setString(String key, String value) {
-        Map<String, String> context = _queryContext.get();
-        if (context == null) {
-            Map<String, String> newMap = Maps.newHashMap();
-            newMap.put(key, value);
-            _queryContext.set(newMap);
-        } else {
-            context.put(key, value);
-        }
+    public static void reset() {
+        contexts.remove();
     }
 
-    private static String getString(String key) {
-        Map<String, String> context = _queryContext.get();
-        if (context == null) {
-            return null;
-        } else {
-            return context.get(key);
-        }
+    public String getQueryId() {
+        return queryId;
     }
 
+    public void setQueryId(String queryId) {
+        this.queryId = queryId;
+    }
+
+    public long getScanBytes() {
+        return scanBytes.get();
+    }
+
+    public long addAndGetScanBytes(long delta) {
+        return scanBytes.addAndGet(delta);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-common/src/main/resources/kylin-backward-compatibility.properties
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/kylin-backward-compatibility.properties b/core-common/src/main/resources/kylin-backward-compatibility.properties
index 16871d8..ab9bcb1 100644
--- a/core-common/src/main/resources/kylin-backward-compatibility.properties
+++ b/core-common/src/main/resources/kylin-backward-compatibility.properties
@@ -142,8 +142,6 @@ kylin.query.hbase.hconnection.threads.alive.seconds=kylin.storage.hbase.hconnect
 
 ### QUERY ###
 
-kylin.query.pushdown.limit.max=kylin.query.max-limit-pushdown
-kylin.query.scan.threshold=kylin.query.scan-threshold
 kylin.query.filter.derived_in.max=kylin.query.derived-filter-translation-threshold
 kylin.query.badquery.stacktrace.depth=kylin.query.badquery-stacktrace-depth
 kylin.query.badquery.history.num=kylin.query.badquery-history-number
@@ -154,13 +152,17 @@ kylin.query.transformers=kylin.query.transformers
 kylin.query.cache.enabled=kylin.query.cache-enabled
 kylin.query.cache.threshold.duration=kylin.query.cache-threshold-duration
 kylin.query.cache.threshold.scancount=kylin.query.cache-threshold-scan-count
-kylin.query.mem.budget=kylin.query.memory-budget-bytes
+kylin.query.mem.budget=kylin.storage.hbase.coprocessor-max-scan-bytes
 kylin.query.ignore_unknown_function=kylin.query.ignore-unknown-function
 kylin.query.dim.distinct.max=kylin.query.max-dimension-count-distinct
 kylin.query.security.enabled=kylin.query.security-enabled
 kylin.query.access.controller=kylin.query.access-controller
 kylin.query.udf.=kylin.query.udf.
 
+#deprecated
+kylin.query.pushdown.limit.max=kylin.query.max-limit-pushdown
+kylin.query.scan.threshold=kylin.query.scan-threshold
+
 
 ### SERVER ###
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 708dfde..0f52c53 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -20,7 +20,6 @@ package org.apache.kylin.storage;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.slf4j.Logger;
@@ -35,7 +34,6 @@ public class StorageContext {
     private static final Logger logger = LoggerFactory.getLogger(StorageContext.class);
 
     private String connUrl;
-    private int threshold;
     private int limit = Integer.MAX_VALUE;
     private int offset = 0;
     private int finalPushDownLimit = Integer.MAX_VALUE;
@@ -54,10 +52,6 @@ public class StorageContext {
     private Cuboid cuboid;
     private boolean partialResultReturned = false;
 
-    public StorageContext() {
-        this.threshold = KylinConfig.getInstanceFromEnv().getScanThreshold();
-    }
-
     private Range<Long> reusedPeriod;
 
     public String getConnUrl() {
@@ -68,14 +62,6 @@ public class StorageContext {
         this.connUrl = connUrl;
     }
 
-    public int getThreshold() {
-        return threshold;
-    }
-
-    public void setThreshold(int t) {
-        threshold = t;
-    }
-
     public int getLimit() {
         return limit;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/exception/ScanOutOfLimitException.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/exception/ScanOutOfLimitException.java b/core-storage/src/main/java/org/apache/kylin/storage/exception/ScanOutOfLimitException.java
deleted file mode 100644
index f77cc35..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/exception/ScanOutOfLimitException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.exception;
-
-/**
- * @author ysong1
- * 
- */
-public class ScanOutOfLimitException extends RuntimeException {
-    private static final long serialVersionUID = 2045169570038227895L;
-
-    public ScanOutOfLimitException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index b05a629..6911827 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -150,8 +150,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         if (scanRanges != null && scanRanges.size() != 0) {
             scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).//
                     setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).//
-                    setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()).//
-                    setStoragePushDownLimit(context.getFinalPushDownLimit()).setStorageScanRowNumThreshold(context.getThreshold()).createGTScanRequest();
+                    setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB()).//
+                    setStoragePushDownLimit(context.getFinalPushDownLimit()).createGTScanRequest();
         } else {
             scanRequest = null;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 4dbdf94..a72460c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -122,8 +122,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context);
         // set query deadline
         context.setDeadline(cubeInstance);
-        // set cautious threshold to prevent out of memory
-        setThresholdIfNecessary(dimensionsD, metrics, context);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
         for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
@@ -323,32 +321,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
     }
 
-    private void setThresholdIfNecessary(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
-        boolean hasMemHungryMeasure = false;
-        for (FunctionDesc func : metrics) {
-            hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry();
-        }
-
-        // need to limit the memory usage for memory hungry measures
-        if (hasMemHungryMeasure == false) {
-            return;
-        }
-
-        int rowSizeEst = dimensions.size() * 3;
-        for (FunctionDesc func : metrics) {
-            // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage)
-            rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
-        }
-
-        long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
-        if (rowEst > 0) {
-            logger.info("Memory budget is set to " + rowEst + " rows");
-            context.setThreshold((int) rowEst);
-        } else {
-            logger.info("Memory budget is not set.");
-        }
-    }
-
     private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) {
         boolean possible = true;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index c6b2c6c..bb2d7f9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -26,7 +26,6 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTScanTimeoutException;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -35,7 +34,6 @@ import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,8 +46,6 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
 
     private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
 
-    private final int SCAN_THRESHOLD = KylinConfig.getInstanceFromEnv().getScanThreshold();
-
     protected List<CubeSegmentScanner> scanners;
     protected List<SegmentCubeTupleIterator> segmentCubeTupleIterators;
     protected Iterator<ITuple> tupleIterator;
@@ -142,15 +138,10 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
 
     @Override
     public ITuple next() {
-        if (scanCount % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) {
+        if (scanCount++ % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) {
             throw new GTScanTimeoutException("Query Timeout!");
         }
 
-        // prevent the big query to make the Query Server OOM
-        if (scanCount++ > SCAN_THRESHOLD) {
-            throw new ScanOutOfLimitException("Scan count exceed the scan threshold: " + SCAN_THRESHOLD);
-        }
-
         if (++scanCountDelta >= 1000)
             flushScanCountDelta();
 
@@ -181,10 +172,6 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
         }
     }
 
-    public int getScanCount() {
-        return scanCount;
-    }
-
     private void flushScanCountDelta() {
         context.increaseTotalScanCount(scanCountDelta);
         scanCountDelta = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 6b9397d..d83ad75 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -39,7 +39,6 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.logging.LogManager;
@@ -642,8 +641,7 @@ public class KylinTestBase {
 
         //setup cube conn
         File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
-        Properties props = new Properties();
-        cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
+        cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath());
 
         //setup h2
         h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa", "");

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index aea8bef..733ca06 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -39,13 +39,11 @@ import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.kylin.storage.StorageMockUtils;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Sets;
@@ -84,16 +82,6 @@ public class ITStorageTest extends HBaseMetadataTestCase {
         this.cleanupTestMetadata();
     }
 
-    @Test(expected = ScanOutOfLimitException.class)
-    @Ignore
-    public void testScanOutOfLimit() {
-        context.setThreshold(1);
-        List<TblColRef> groups = mockup.buildGroups();
-        List<FunctionDesc> aggregations = mockup.buildAggregations();
-
-        search(groups, aggregations, null, context);
-    }
-
     @Test
     public void test01() {
         List<TblColRef> groups = mockup.buildGroups();

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index f012638..56b82b9 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -20,12 +20,9 @@ package org.apache.kylin.query.enumerator;
 
 import java.util.Arrays;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.calcite.DataContext;
-import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.linq4j.Enumerator;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
@@ -111,9 +108,6 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
     private ITupleIterator queryStorage() {
         logger.debug("query storage...");
 
-        // set connection properties
-        setConnectionProperties();
-
         // bind dynamic variables
         bindVariable(olapContext.filter);
 
@@ -156,16 +150,4 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
             }
         }
     }
-
-    private void setConnectionProperties() {
-        CalciteConnection conn = (CalciteConnection) optiqContext.getQueryProvider();
-        Properties connProps = conn.getProperties();
-
-        String propThreshold = connProps.getProperty(OLAPQuery.PROP_SCAN_THRESHOLD);
-        if (!StringUtils.isBlank(propThreshold)) {
-            int threshold = Integer.valueOf(propThreshold);
-            olapContext.storageContext.setThreshold(threshold);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index 27d8c94..8318a07 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -28,8 +28,6 @@ import org.apache.kylin.query.relnode.OLAPContext;
  */
 public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerable<Object[]> {
 
-    public static final String PROP_SCAN_THRESHOLD = "scan_threshold";
-
     public enum EnumeratorTypeEnum {
         OLAP, //finish query with Cube or II, or a combination of both
         LOOKUP_TABLE, //using a snapshot of lookup table

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 0938e95..af680a5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -21,7 +21,6 @@ package org.apache.kylin.rest.service;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -30,11 +29,9 @@ import javax.sql.DataSource;
 import org.apache.calcite.jdbc.Driver;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -166,9 +163,6 @@ public class CacheService extends BasicService {
             }
 
             DriverManagerDataSource ds = new DriverManagerDataSource();
-            Properties props = new Properties();
-            props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold()));
-            ds.setConnectionProperties(props);
             ds.setDriverClassName(Driver.class.getName());
             ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 9ccda03..7d9e24d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -63,6 +63,7 @@ import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTScanExceedThresholdException;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.relnode.OLAPContext;
@@ -79,7 +80,6 @@ import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.util.QueryUtil;
 import org.apache.kylin.rest.util.Serializer;
 import org.apache.kylin.rest.util.TableauInterceptor;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.slf4j.Logger;
@@ -267,7 +267,7 @@ public class QueryService extends BasicService {
         StringBuilder stringBuilder = new StringBuilder();
         stringBuilder.append(newLine);
         stringBuilder.append("==========================[QUERY]===============================").append(newLine);
-        stringBuilder.append("Query Id: ").append(QueryContext.getQueryId()).append(newLine);
+        stringBuilder.append("Query Id: ").append(QueryContext.current().getQueryId()).append(newLine);
         stringBuilder.append("SQL: ").append(request.getSql()).append(newLine);
         stringBuilder.append("User: ").append(user).append(newLine);
         stringBuilder.append("Success: ").append((null == response.getExceptionMessage())).append(newLine);
@@ -331,7 +331,7 @@ public class QueryService extends BasicService {
         final String queryId = UUID.randomUUID().toString();
         if (sqlRequest.getBackdoorToggles() != null)
             BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
-        QueryContext.setQueryId(queryId);
+        QueryContext.current().setQueryId(queryId);
 
         try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) {
             String sql = sqlRequest.getSql();
@@ -383,7 +383,7 @@ public class QueryService extends BasicService {
                 sqlResponse = new SQLResponse(null, null, 0, true, errMsg);
 
                 // for exception queries, only cache ScanOutOfLimitException
-                if (queryCacheEnabled && e instanceof ScanOutOfLimitException) {
+                if (queryCacheEnabled && e instanceof GTScanExceedThresholdException) {
                     Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
                     exceptionCache.put(new Element(sqlRequest, sqlResponse));
                 }
@@ -400,6 +400,7 @@ public class QueryService extends BasicService {
 
         } finally {
             BackdoorToggles.cleanToggles();
+            QueryContext.reset();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index a2b2611..68a84c1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.util.LoggableCachedThreadPool;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTScanExceedThresholdException;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
 import org.apache.kylin.gridtable.IGTScanner;
@@ -106,6 +107,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
     @SuppressWarnings("checkstyle:methodlength")
     @Override
     public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
+        final QueryContext queryContext = QueryContext.current();
 
         Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
         short shardNum = shardNumAndBaseShard.getFirst();
@@ -160,11 +162,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         }
         builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
         builder.setKylinProperties(kylinConfig.getConfigAsString());
-        final String queryId = QueryContext.getQueryId();
+        final String queryId = queryContext.getQueryId();
         if (queryId != null) {
             builder.setQueryId(queryId);
         }
         builder.setSpillEnabled(cubeSeg.getConfig().getQueryCoprocessorSpillEnabled());
+        builder.setMaxScanBytes(cubeSeg.getConfig().getQueryCoprocessorMaxScanBytes());
 
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
             executorService.submit(new Runnable() {
@@ -199,10 +202,15 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                         if (region == null)
                                             return;
 
-                                        context.increaseTotalScanBytes(result.getStats().getScannedBytes());
+                                        final long scanBytes = result.getStats().getScannedBytes();
+                                        context.increaseTotalScanBytes(scanBytes);
                                         totalScannedCount.addAndGet(result.getStats().getScannedRowCount());
                                         logger.info(logHeader + getStatsString(region, result));
 
+                                        if (queryContext.addAndGetScanBytes(scanBytes) > cubeSeg.getConfig().getQueryMaxScanBytes()) {
+                                            throw new GTScanExceedThresholdException("Query scanned " + queryContext.getScanBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes());
+                                        }
+
                                         if (result.getStats().getNormalComplete() != 1) {
                                             abnormalFinish[0] = true;
                                             return;

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 1f6425f..5fd9740 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -297,8 +297,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
             ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(
                     allCellLists,
-                    scanReq.getStorageScanRowNumThreshold(),
-                    Long.MAX_VALUE,
+                    scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold)
+                    request.getMaxScanBytes() == 0 ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client
                     scanReq.getTimeout());
 
             IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn());

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 5a3aa5a..4b6fc95 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -117,6 +117,16 @@ public final class CubeVisitProtos {
      * <code>optional bool spillEnabled = 7 [default = true];</code>
      */
     boolean getSpillEnabled();
+
+    // optional int64 maxScanBytes = 8;
+    /**
+     * <code>optional int64 maxScanBytes = 8;</code>
+     */
+    boolean hasMaxScanBytes();
+    /**
+     * <code>optional int64 maxScanBytes = 8;</code>
+     */
+    long getMaxScanBytes();
   }
   /**
    * Protobuf type {@code CubeVisitRequest}
@@ -207,6 +217,11 @@ public final class CubeVisitProtos {
               spillEnabled_ = input.readBool();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000040;
+              maxScanBytes_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -952,6 +967,22 @@ public final class CubeVisitProtos {
       return spillEnabled_;
     }
 
+    // optional int64 maxScanBytes = 8;
+    public static final int MAXSCANBYTES_FIELD_NUMBER = 8;
+    private long maxScanBytes_;
+    /**
+     * <code>optional int64 maxScanBytes = 8;</code>
+     */
+    public boolean hasMaxScanBytes() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional int64 maxScanBytes = 8;</code>
+     */
+    public long getMaxScanBytes() {
+      return maxScanBytes_;
+    }
+
     private void initFields() {
       gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
       hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
@@ -960,6 +991,7 @@ public final class CubeVisitProtos {
       kylinProperties_ = "";
       queryId_ = "";
       spillEnabled_ = true;
+      maxScanBytes_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1010,6 +1042,9 @@ public final class CubeVisitProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBool(7, spillEnabled_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeInt64(8, maxScanBytes_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1047,6 +1082,10 @@ public final class CubeVisitProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(7, spillEnabled_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(8, maxScanBytes_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1102,6 +1141,11 @@ public final class CubeVisitProtos {
         result = result && (getSpillEnabled()
             == other.getSpillEnabled());
       }
+      result = result && (hasMaxScanBytes() == other.hasMaxScanBytes());
+      if (hasMaxScanBytes()) {
+        result = result && (getMaxScanBytes()
+            == other.getMaxScanBytes());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1143,6 +1187,10 @@ public final class CubeVisitProtos {
         hash = (37 * hash) + SPILLENABLED_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getSpillEnabled());
       }
+      if (hasMaxScanBytes()) {
+        hash = (37 * hash) + MAXSCANBYTES_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getMaxScanBytes());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1271,6 +1319,8 @@ public final class CubeVisitProtos {
         bitField0_ = (bitField0_ & ~0x00000020);
         spillEnabled_ = true;
         bitField0_ = (bitField0_ & ~0x00000040);
+        maxScanBytes_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -1332,6 +1382,10 @@ public final class CubeVisitProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.spillEnabled_ = spillEnabled_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.maxScanBytes_ = maxScanBytes_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1396,6 +1450,9 @@ public final class CubeVisitProtos {
         if (other.hasSpillEnabled()) {
           setSpillEnabled(other.getSpillEnabled());
         }
+        if (other.hasMaxScanBytes()) {
+          setMaxScanBytes(other.getMaxScanBytes());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1989,6 +2046,39 @@ public final class CubeVisitProtos {
         return this;
       }
 
+      // optional int64 maxScanBytes = 8;
+      private long maxScanBytes_ ;
+      /**
+       * <code>optional int64 maxScanBytes = 8;</code>
+       */
+      public boolean hasMaxScanBytes() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional int64 maxScanBytes = 8;</code>
+       */
+      public long getMaxScanBytes() {
+        return maxScanBytes_;
+      }
+      /**
+       * <code>optional int64 maxScanBytes = 8;</code>
+       */
+      public Builder setMaxScanBytes(long value) {
+        bitField0_ |= 0x00000080;
+        maxScanBytes_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 maxScanBytes = 8;</code>
+       */
+      public Builder clearMaxScanBytes() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        maxScanBytes_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:CubeVisitRequest)
     }
 
@@ -4433,27 +4523,27 @@ public final class CubeVisitProtos {
     java.lang.String[] descriptorData = {
       "\npstorage-hbase/src/main/java/org/apache" +
       "/kylin/storage/hbase/cube/v2/coprocessor" +
-      "/endpoint/protobuf/CubeVisit.proto\"\357\001\n\020C" +
+      "/endpoint/protobuf/CubeVisit.proto\"\205\002\n\020C" +
       "ubeVisitRequest\022\025\n\rgtScanRequest\030\001 \002(\014\022\024" +
       "\n\014hbaseRawScan\030\002 \002(\014\022\032\n\022rowkeyPreambleSi" +
       "ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 \003(\0132\031.Cub" +
       "eVisitRequest.IntList\022\027\n\017kylinProperties" +
       "\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" +
-      "\007 \001(\010:\004true\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\347\002\n" +
-      "\021CubeVisitResponse\022\026\n\016compressedRows\030\001 \002",
-      "(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitResponse.St" +
-      "ats\032\220\002\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022" +
-      "\026\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scannedRowCou" +
-      "nt\030\003 \001(\003\022\032\n\022aggregatedRowCount\030\004 \001(\003\022\025\n\r" +
-      "systemCpuLoad\030\005 \001(\001\022\036\n\026freePhysicalMemor" +
-      "ySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020" +
-      "\n\010hostname\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016norm" +
-      "alComplete\030\n \001(\005\022\024\n\014scannedBytes\030\013 \001(\0032F" +
-      "\n\020CubeVisitService\0222\n\tvisitCube\022\021.CubeVi" +
-      "sitRequest\032\022.CubeVisitResponseB`\nEorg.ap",
-      "ache.kylin.storage.hbase.cube.v2.coproce" +
-      "ssor.endpoint.generatedB\017CubeVisitProtos" +
-      "H\001\210\001\001\240\001\001"
+      "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\032\027\n\007Int" +
+      "List\022\014\n\004ints\030\001 \003(\005\"\347\002\n\021CubeVisitResponse",
+      "\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132" +
+      "\030.CubeVisitResponse.Stats\032\220\002\n\005Stats\022\030\n\020s" +
+      "erviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTime\030" +
+      "\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022\032\n\022aggreg" +
+      "atedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 \001(" +
+      "\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021fre" +
+      "eSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t\022\016" +
+      "\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005\022\024" +
+      "\n\014scannedBytes\030\013 \001(\0032F\n\020CubeVisitService" +
+      "\0222\n\tvisitCube\022\021.CubeVisitRequest\032\022.CubeV",
+      "isitResponseB`\nEorg.apache.kylin.storage" +
+      ".hbase.cube.v2.coprocessor.endpoint.gene" +
+      "ratedB\017CubeVisitProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4465,7 +4555,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitRequest_descriptor,
-              new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", });
+              new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", });
           internal_static_CubeVisitRequest_IntList_descriptor =
             internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
           internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/kylin/blob/09a08668/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index f416669..00015fc 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -37,6 +37,7 @@ message CubeVisitRequest {
     required string kylinProperties = 5; // kylin properties
     optional string queryId = 6;
     optional bool spillEnabled = 7 [default = true];
+    optional int64 maxScanBytes = 8; // 0 means no limit
     message IntList {
         repeated int32 ints = 1;
     }