You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/09/09 10:53:11 UTC

[3/5] kylin git commit: KYLIN-1922 optimize needStorageAggregation check logic and make sure self-termination in coprocessor works

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
new file mode 100644
index 0000000..0cdfa7e
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.dict.NumberDictionaryBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.GTFilterScanner.FilterResultCache;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.LongMutable;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.ExtractTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class DictGridTableTest extends LocalFileMetadataTestCase {
+
+    private GridTable table;
+    private GTInfo info;
+    private CompareTupleFilter timeComp0;
+    private CompareTupleFilter timeComp1;
+    private CompareTupleFilter timeComp2;
+    private CompareTupleFilter timeComp3;
+    private CompareTupleFilter timeComp4;
+    private CompareTupleFilter timeComp5;
+    private CompareTupleFilter timeComp6;
+    private CompareTupleFilter ageComp1;
+    private CompareTupleFilter ageComp2;
+    private CompareTupleFilter ageComp3;
+    private CompareTupleFilter ageComp4;
+
+    @After
+    public void after() throws Exception {
+
+        this.cleanupTestMetadata();
+    }
+
+    @Before
+    public void setup() throws IOException {
+
+        this.createTestMetadata();
+
+        table = newTestTable();
+        info = table.getInfo();
+
+        timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
+        timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+        timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+        timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+        timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
+        timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
+        ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+        ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+        ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+        ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+
+    }
+
+    @Test
+    public void verifySegmentSkipping() {
+
+        ByteArray segmentStart = enc(info, 0, "2015-01-14");
+        ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
+        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
+        assertEquals(segmentStart, segmentStartX);
+
+        {
+            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());//scan range are [close,close]
+            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+            assertEquals(1, r.get(0).fuzzyKeys.size());
+            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp2, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp4, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+        {
+            LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
+            assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+        {
+            LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
+            assertEquals(0, r.get(0).fuzzyKeys.size());
+        }
+        {
+            //skip FALSE filter
+            LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+        {
+            //TRUE or FALSE filter
+            LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[null, null]-[null, null]", r.get(0).toString());
+        }
+        {
+            //TRUE or other filter
+            LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[null, null]-[null, null]", r.get(0).toString());
+        }
+    }
+
+    @Test
+    public void verifySegmentSkipping2() {
+        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
+
+        {
+            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());//scan range are [close,close]
+            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+            assertEquals(1, r.get(0).fuzzyKeys.size());
+            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+
+        {
+            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());//scan range are [close,close]
+        }
+    }
+
+    @Test
+    public void verifyScanRangePlanner() {
+
+        // flatten or-and & hbase fuzzy value
+        {
+            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
+            assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+
+        // pre-evaluate ever false
+        {
+            LogicalTupleFilter filter = and(timeComp1, timeComp2);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+
+        // pre-evaluate ever true
+        {
+            LogicalTupleFilter filter = or(timeComp1, ageComp4);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+
+        // merge overlap range
+        {
+            LogicalTupleFilter filter = or(timeComp1, timeComp3);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+
+        // merge too many ranges
+        {
+            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(3, r.size());
+            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+            planner.setMaxScanRanges(2);
+            List<GTScanRange> r2 = planner.planScanRanges();
+            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+        }
+    }
+
+    @Test
+    public void verifyFirstRow() throws IOException {
+        doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", //
+                "[1421193600000, 30, Luke, 10, 10.5]", //
+                "[1421280000000, 20, Dong, 10, 10.5]", //
+                "[1421280000000, 20, Jason, 10, 10.5]", //
+                "[1421280000000, 30, Xu, 10, 10.5]", //
+                "[1421366400000, 20, Mahone, 10, 10.5]", //
+                "[1421366400000, 20, Qianhao, 10, 10.5]", //
+                "[1421366400000, 30, George, 10, 10.5]", //
+                "[1421366400000, 30, Shaofeng, 10, 10.5]", //
+                "[1421452800000, 10, Kejia, 10, 10.5]");
+    }
+
+    //for testing GTScanRequest serialization and deserialization
+    public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
+        ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
+        GTScanRequest.serializer.serialize(origin, buffer);
+        buffer.flip();
+        GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
+
+        Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
+        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
+        return sGTScanRequest;
+    }
+
+    @Test
+    public void verifyScanWithUnevaluatableFilter() throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" }).setFilterPushDown(filter).createGTScanRequest();
+
+        // note the unEvaluatable column 1 in filter is added to group by
+        assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
+    }
+
+    @Test
+    public void verifyScanWithEvaluatableFilter() throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" }).setFilterPushDown(filter).createGTScanRequest();
+        // note the evaluatable column 1 in filter is added to returned columns but not in group by
+        assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
+    }
+
+    @Test
+    public void testFilterScannerPerf() throws IOException {
+        GridTable table = newTestPerfTable();
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        FilterResultCache.ENABLED = false;
+        testFilterScannerPerfInner(table, info, filter);
+        FilterResultCache.ENABLED = true;
+        testFilterScannerPerfInner(table, info, filter);
+        FilterResultCache.ENABLED = false;
+        testFilterScannerPerfInner(table, info, filter);
+        FilterResultCache.ENABLED = true;
+        testFilterScannerPerfInner(table, info, filter);
+    }
+
+    @SuppressWarnings("unused")
+    private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException {
+        long start = System.currentTimeMillis();
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest();
+        IGTScanner scanner = table.scan(req);
+        int i = 0;
+        for (GTRecord r : scanner) {
+            i++;
+        }
+        scanner.close();
+        long end = System.currentTimeMillis();
+        System.out.println((end - start) + "ms with filter cache enabled=" + FilterResultCache.ENABLED + ", " + i + " rows");
+    }
+
+    @Test
+    public void verifyConvertFilterConstants1() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef();
+        TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef();
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString());
+    }
+
+    @Test
+    public void verifyConvertFilterConstants2() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef();
+        TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef();
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+
+        // $1<"9" round up to $1<"10"
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString());
+    }
+
+    @Test
+    public void verifyConvertFilterConstants3() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef();
+        TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef();
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+
+        // $1<="9" round down to FALSE
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString());
+    }
+
+    @Test
+    public void verifyConvertFilterConstants4() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef();
+        TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef();
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+
+        // $1 in ("9", "10", "15") has only "10" left
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString());
+    }
+
+    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+        System.out.println(req);
+        IGTScanner scanner = table.scan(req);
+        int i = 0;
+        for (GTRecord r : scanner) {
+            System.out.println(r);
+            if (verifyRows == null || i >= verifyRows.length) {
+                Assert.fail();
+            }
+            assertEquals(verifyRows[i], r.toString());
+            i++;
+        }
+        scanner.close();
+    }
+
+    public static ByteArray enc(GTInfo info, int col, String value) {
+        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+        info.getCodeSystem().encodeColumnValue(col, value, buf);
+        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+    }
+
+    public static ExtractTupleFilter unevaluatable(TblColRef col) {
+        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+        r.addChild(new ColumnTupleFilter(col));
+        return r;
+    }
+
+    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+        CompareTupleFilter result = new CompareTupleFilter(op);
+        result.addChild(new ColumnTupleFilter(col));
+        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+        return result;
+    }
+
+    public static LogicalTupleFilter and(TupleFilter... children) {
+        return logic(FilterOperatorEnum.AND, children);
+    }
+
+    public static LogicalTupleFilter or(TupleFilter... children) {
+        return logic(FilterOperatorEnum.OR, children);
+    }
+
+    public static LogicalTupleFilter not(TupleFilter child) {
+        return logic(FilterOperatorEnum.NOT, child);
+    }
+
+    public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+        LogicalTupleFilter result = new LogicalTupleFilter(op);
+        for (TupleFilter c : children) {
+            result.addChild(c);
+        }
+        return result;
+    }
+
+    public static GridTable newTestTable() throws IOException {
+        GTInfo info = newInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTRecord r = new GTRecord(table.getInfo());
+        GTBuilder builder = table.rebuild();
+
+        builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
+        builder.close();
+
+        return table;
+    }
+
+    static GridTable newTestPerfTable() throws IOException {
+        GTInfo info = newInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTRecord r = new GTRecord(table.getInfo());
+        GTBuilder builder = table.rebuild();
+
+        for (int i = 0; i < 100000; i++) {
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
+        }
+        builder.close();
+
+        return table;
+    }
+
+    static GTInfo newInfo() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(newDictCodeSystem());
+        builder.setColumns( //
+                DataType.getType("timestamp"), //
+                DataType.getType("integer"), //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("bigint"), //
+                DataType.getType("decimal") //
+        );
+        builder.setPrimaryKey(setOf(0, 1));
+        builder.setColumnPreferIndex(setOf(0));
+        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+        builder.enableRowBlock(4);
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static CubeCodeSystem newDictCodeSystem() {
+        DimensionEncoding[] dimEncs = new DimensionEncoding[3];
+        dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
+        dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
+        return new CubeCodeSystem(dimEncs);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Dictionary newDictionaryOfString() {
+        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+        builder.addValue("Dong");
+        builder.addValue("George");
+        builder.addValue("Jason");
+        builder.addValue("Kejia");
+        builder.addValue("Luke");
+        builder.addValue("Mahone");
+        builder.addValue("Qianhao");
+        builder.addValue("Shaofeng");
+        builder.addValue("Xu");
+        builder.addValue("Yang");
+        return builder.build(0);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Dictionary newDictionaryOfInteger() {
+        NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<>(new StringBytesConverter());
+        builder.addValue("10");
+        builder.addValue("20");
+        builder.addValue("30");
+        builder.addValue("40");
+        builder.addValue("50");
+        builder.addValue("60");
+        builder.addValue("70");
+        builder.addValue("80");
+        builder.addValue("90");
+        builder.addValue("100");
+        return builder.build(0);
+    }
+
+    public static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 375b198..fc2fd52 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -22,13 +22,16 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.enumerator.OLAPQuery;
@@ -37,18 +40,26 @@ import org.apache.kylin.query.routing.Candidate;
 import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.storage.hbase.HBaseStorage;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
 import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.dbunit.database.DatabaseConnection;
 import org.dbunit.database.IDatabaseConnection;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import com.google.common.collect.Maps;
 
 public class ITKylinQueryTest extends KylinTestBase {
 
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
     @BeforeClass
     public static void setUp() throws Exception {
         printInfo("setUp in ITKylinQueryTest");
@@ -108,10 +119,52 @@ public class ITKylinQueryTest extends KylinTestBase {
         return "";
     }
 
-    @Ignore("this is only for debug")
+    @Test
+    public void testTimeoutQuery() throws Exception {
+
+        thrown.expect(SQLException.class);
+
+        //should not break at table duplicate check, should fail at model duplicate check
+        thrown.expectCause(new BaseMatcher<Throwable>() {
+            @Override
+            public boolean matches(Object item) {
+                if (item instanceof GTScanSelfTerminatedException) {
+                    return true;
+                }
+                return false;
+            }
+
+            @Override
+            public void describeTo(Description description) {
+            }
+        });
+
+        Map<String, String> toggles = Maps.newHashMap();
+        toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
+        BackdoorToggles.setToggles(toggles);
+
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.03");//set timeout to 9s
+
+        //these two cubes has RAW measure, will disturb limit push down
+        RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
+        RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
+
+        execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_timeout", null, true);
+
+        //these two cubes has RAW measure, will disturb limit push down
+        RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
+        RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
+
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "1");//set timeout to 9s 
+        BackdoorToggles.cleanToggles();
+    }
+
+    //don't try to ignore this test, try to clean your "temp" folder
     @Test
     public void testTempQuery() throws Exception {
+        PRINT_RESULT = true;
         execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/temp", null, true);
+        PRINT_RESULT = false;
     }
 
     @Ignore

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/kylin-it/src/test/resources/query/sql_timeout/query01.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_timeout/query01.sql b/kylin-it/src/test/resources/query/sql_timeout/query01.sql
new file mode 100644
index 0000000..3b9a837
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_timeout/query01.sql
@@ -0,0 +1,19 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select * from test_kylin_fact limit 1200

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
index 75533cd..5f21351 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
@@ -26,4 +26,5 @@ public enum CoprocessorBehavior {
     SCAN_FILTER, //only scan+filter used,used for profiling filter speed.  Will not return any result
     SCAN_FILTER_AGGR, //aggregate the result.  Will return results
     SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results
+    SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 07a3cc3..5b48351 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -43,8 +43,8 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.ISegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
@@ -106,7 +106,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
 
-        logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
+        logger.info("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
 
         Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
         short shardNum = shardNumAndBaseShard.getFirst();
@@ -146,7 +146,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 rawScanBufferSize *= 4;
             }
         }
-        scanRequest.setGTScanRanges(Lists.<GTScanRange> newArrayList());//since raw scans are sent to coprocessor, we don't need to duplicate sending it
+        scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
 
         int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
         while (true) {
@@ -248,7 +248,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     }
 
                     if (abnormalFinish[0]) {
-                        Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query...");
+                        Throwable ex = new GTScanSelfTerminatedException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query...");
                         logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout
                         epResultItr.notifyCoprocException(ex);
                         return;

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index a359d19..f1e5dab 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -213,7 +213,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
             }
         };
 
-        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize());
+        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false);
         IGTScanner rawScanner = store.scan(scanRequest);
 
         final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index 7d48c1a..442963f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -18,20 +18,22 @@
 
 package org.apache.kylin.storage.hbase.cube.v2;
 
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 class ExpectedSizeIterator implements Iterator<byte[]> {
     private static final Logger logger = LoggerFactory.getLogger(ExpectedSizeIterator.class);
 
@@ -48,22 +50,24 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
         this.expectedSize = expectedSize;
         this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
 
+        StringBuilder sb = new StringBuilder();
         Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+
         this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
         this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-        logger.info("rpc timeout is {} and after multiply retry times become {}", this.rpcTimeout, this.timeout);
-        this.timeout = Math.max(this.timeout, 5 * 60000);
+        sb.append("rpc timeout is " + this.rpcTimeout + " and after multiply retry times becomes " + this.timeout);
+
         this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
+        sb.append(" after multiply kylin.query.cube.visit.timeout.times becomes " + this.timeout);
+
+        logger.info(sb.toString());
 
         if (BackdoorToggles.getQueryTimeout() != -1) {
             this.timeout = BackdoorToggles.getQueryTimeout();
+            logger.info("rpc timeout is overwritten to " + this.timeout);
         }
 
-        this.timeout *= 1.1; // allow for some delay
-
-        logger.info("Final Timeout for ExpectedSizeIterator is: " + this.timeout);
-
-        this.timeoutTS = System.currentTimeMillis() + this.timeout;
+        this.timeoutTS = System.currentTimeMillis() + 2 * this.timeout;//longer timeout than coprocessor so that query thread will not timeout faster than coprocessor
     }
 
     @Override
@@ -85,9 +89,14 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
             }
 
             if (coprocException != null) {
-                throw new RuntimeException("Error in coprocessor", coprocException);
+                if (coprocException instanceof GTScanSelfTerminatedException)
+                    throw (GTScanSelfTerminatedException) coprocException;
+                else
+                    throw new RuntimeException("Error in coprocessor",coprocException);
+                
             } else if (ret == null) {
-                throw new RuntimeException("Timeout visiting cube!");
+                throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
+                        GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?");
             } else {
                 return ret;
             }
@@ -110,7 +119,7 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
     }
 
     public long getRpcTimeout() {
-        return this.rpcTimeout;
+        return this.timeout;
     }
 
     public void notifyCoprocException(Throwable ex) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 4b9b4fa..1d8ad79 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -43,13 +43,15 @@ public class HBaseReadonlyStore implements IGTStore {
     private List<Pair<byte[], byte[]>> hbaseColumns;
     private List<List<Integer>> hbaseColumnsToGT;
     private int rowkeyPreambleSize;
+    private boolean withDelay = false;
 
-    public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize) {
+    public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) {
         this.cellListIterator = cellListIterator;
         this.info = gtScanRequest.getInfo();
         this.hbaseColumns = hbaseColumns;
         this.hbaseColumnsToGT = hbaseColumnsToGT;
         this.rowkeyPreambleSize = rowkeyPreambleSize;
+        this.withDelay = withDelay;
     }
 
     @Override
@@ -95,6 +97,13 @@ public class HBaseReadonlyStore implements IGTStore {
 
                     @Override
                     public boolean hasNext() {
+                        if (withDelay) {
+                            try {
+                                Thread.sleep(10);
+                            } catch (InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                        }
                         return cellListIterator.hasNext();
                     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/dcd47f86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index b29d0d1..064d100 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -170,7 +170,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
     @SuppressWarnings("checkstyle:methodlength")
     @Override
-    public void visitCube(final RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
+    public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
         List<RegionScanner> regionScanners = Lists.newArrayList();
         HRegion region = null;
 
@@ -241,7 +241,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             }
 
             if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
-                scanReq.setAggCacheMemThreshold(0); // disable mem check if so told
+                scanReq.disableAggCacheMemCheck(); // disable mem check if so told
             }
 
             final MutableBoolean scanNormalComplete = new MutableBoolean(true);
@@ -266,7 +266,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                         throw new GTScanExceedThresholdException("Exceed scan threshold at " + counter);
                     }
 
-                    if (counter % 100000 == 1) {
+                    if (counter % (10 * GTScanRequest.terminateCheckInterval) == 1) {
                         logger.info("Scanned " + counter + " rows from HBase.");
                     }
                     counter++;
@@ -284,7 +284,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 }
             };
 
-            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
+            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, //
+                    request.getRowkeyPreambleSize(), CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(request.getBehavior()));
 
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
@@ -299,14 +300,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             try {
                 for (GTRecord oneRecord : finalScanner) {
 
-                    if (finalRowCount > storagePushDownLimit) {
-                        logger.info("The finalScanner aborted because storagePushDownLimit is satisfied");
-                        break;
-                    }
-
-                    if (finalRowCount % 100000 == 1) {
+                    if (finalRowCount % GTScanRequest.terminateCheckInterval == 1) {
                         if (System.currentTimeMillis() > deadline) {
-                            throw new GTScanTimeoutException("finalScanner timeouts after scanned " + finalRowCount);
+                            throw new GTScanTimeoutException("finalScanner timeouts after contributed " + finalRowCount);
                         }
                     }
 
@@ -319,7 +315,15 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                     }
 
                     outputStream.write(buffer.array(), 0, buffer.position());
+
                     finalRowCount++;
+
+                    //if it's doing storage aggr, then should rely on GTAggregateScanner's limit check
+                    if (!scanReq.isDoingStorageAggregation() && finalRowCount >= storagePushDownLimit) {
+                        //read one more record than limit
+                        logger.info("The finalScanner aborted because storagePushDownLimit is satisfied");
+                        break;
+                    }
                 }
             } catch (GTScanTimeoutException e) {
                 scanNormalComplete.setValue(false);