You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/01/21 12:02:20 UTC

[10/13] kylin git commit: KYLIN-2881 Improve hbase coprocessor exception handling at kylin server side

KYLIN-2881 Improve hbase coprocessor exception handling at kylin server side

Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fcac5fcd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fcac5fcd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fcac5fcd

Branch: refs/heads/KYLIN-2881-review
Commit: fcac5fcddd2d7ba56495217a337c617a4dff9026
Parents: 54266f8
Author: Zhong <nj...@apache.org>
Authored: Wed Sep 20 09:46:44 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Sun Jan 21 20:01:34 2018 +0800

----------------------------------------------------------------------
 .../apache/kylin/query/ITKylinQueryTest.java    |   4 +-
 .../apache/kylin/query/ITMassInQueryTest.java   |   4 +-
 .../org/apache/kylin/query/KylinTestBase.java   |  34 ++-
 .../apache/kylin/rest/service/QueryService.java |  14 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 252 ++++++++++++-------
 .../hbase/cube/v2/ExpectedSizeIterator.java     |  34 ++-
 6 files changed, 224 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 4edfb3d..02a50ce 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -156,7 +156,7 @@ public class ITKylinQueryTest extends KylinTestBase {
         String sql = getTextFromFile(sqlFile);
         IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
 
-        executeQuery(kylinConn, queryFileName, sql, true);
+        execQueryUsingKylin(kylinConn, queryFileName, sql, true);
     }
 
     @Ignore
@@ -403,7 +403,7 @@ public class ITKylinQueryTest extends KylinTestBase {
         // execute Kylin
         logger.info("Query Result from Kylin - " + queryName);
         IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-        ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+        ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
         String queriedVersion = String.valueOf(kylinTable.getValue(0, "version"));
 
         // compare the result

http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
index cca0be6..16395fc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
@@ -118,7 +118,7 @@ public class ITMassInQueryTest extends KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
             printResult(kylinTable);
 
         }
@@ -139,7 +139,7 @@ public class ITMassInQueryTest extends KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
 
             // execute H2
             sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")");

http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 2c5b556..e38bb1a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -44,6 +44,7 @@ import java.util.logging.LogManager;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
@@ -228,6 +229,16 @@ public class KylinTestBase {
 
     // ////////////////////////////////////////////////////////////////////////////////////////
     // execute
+    private void initExecQueryUsingKylin(String sql) {
+        QueryContextManager.resetCurrent();
+        QueryContextManager.current();
+    }
+
+    protected ITable execQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
+            throws Exception {
+        initExecQueryUsingKylin(sql);
+        return executeQuery(dbConn, queryName, sql, needSort);
+    }
 
     protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
             throws Exception {
@@ -251,6 +262,7 @@ public class KylinTestBase {
     }
 
     protected int executeQuery(String sql, boolean needDisplay) throws Exception {
+        initExecQueryUsingKylin(sql);
 
         // change join type to match current setting
         sql = changeJoinType(sql, joinType);
@@ -302,6 +314,12 @@ public class KylinTestBase {
         return PushDownUtil.tryPushDownNonSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", isPrepare);
     }
 
+    protected ITable execDynamicQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql,
+            List<String> parameters, boolean needSort) throws Exception {
+        initExecQueryUsingKylin(sql);
+        return executeDynamicQuery(dbConn, queryName, sql, parameters, needSort);
+    }
+
     protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql,
             List<String> parameters, boolean needSort) throws Exception {
 
@@ -382,7 +400,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
 
             // compare the result
             if (BackdoorToggles.getPrepareOnly())
@@ -426,7 +444,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
 
             // compare the result
             assertTableEquals(expectTable, kylinTable);
@@ -449,7 +467,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -478,7 +496,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + sql);
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, sql, sql, false);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, sql, sql, false);
 
             try {
                 // compare the result
@@ -510,7 +528,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sqlWithLimit, false);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -561,7 +579,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql1, needSort);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql1, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -601,7 +619,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, parameters, needSort);
+            ITable kylinTable = execDynamicQueryUsingKylin(kylinConn, queryName, sql, parameters, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -709,7 +727,7 @@ public class KylinTestBase {
 
         //setup cube conn
         String project = ProjectInstance.DEFAULT_PROJECT_NAME;
-        cubeConnection = QueryConnection.getConnection(project);
+        cubeConnection = QueryDataSource.create(project, config).getConnection();
 
         //setup h2
         h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa",

http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 8e6642c..71b54e3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -464,7 +464,7 @@ public class QueryService extends BasicService {
 
             sqlResponse.setDuration(System.currentTimeMillis() - startTime);
             sqlResponse.setTraceUrl(traceUrl);
-            logQuery(sqlRequest, sqlResponse);
+            logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse);
             try {
                 recordMetric(sqlRequest, sqlResponse);
             } catch (Throwable th) {
@@ -477,7 +477,7 @@ public class QueryService extends BasicService {
 
         } finally {
             BackdoorToggles.cleanToggles();
-            QueryContext.reset();
+            QueryContextManager.resetCurrent();
             if (scope != null) {
                 scope.close();
             }
@@ -487,6 +487,7 @@ public class QueryService extends BasicService {
     private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Message msg = MsgPicker.getMsg();
+        final QueryContext queryContext = QueryContextManager.current();
 
         SQLResponse sqlResponse = null;
         try {
@@ -530,13 +531,15 @@ public class QueryService extends BasicService {
             Trace.addTimelineAnnotation("response from execution");
 
         } catch (Throwable e) { // calcite may throw AssertError
+            queryContext.stop(e);
+
             logger.error("Exception while executing query", e);
             String errMsg = makeErrorMsgUserFriendly(e);
 
             sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
-            QueryContext queryContext = QueryContext.current();
             sqlResponse.setTotalScanCount(queryContext.getScannedRows());
             sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
+            sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
 
             if (queryCacheEnabled && e.getCause() != null
                     && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
@@ -1046,6 +1049,8 @@ public class QueryService extends BasicService {
         QueryContext queryContext = QueryContextManager.current();
         if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
             for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
+                String realizationName = "NULL";
+                int realizationType = -1;
                 if (ctx.realization != null) {
                     isPartialResult |= ctx.storageContext.isPartialResultReturned();
                     if (cubeSb.length() > 0) {
@@ -1053,6 +1058,9 @@ public class QueryService extends BasicService {
                     }
                     cubeSb.append(ctx.realization.getCanonicalName());
                     logSb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
+
+                    realizationName = ctx.realization.getName();
+                    realizationType = ctx.realization.getStorageType();
                 }
                 queryContext.setContextRealization(ctx.id, realizationName, realizationType);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 26ab039..ddf62b7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -19,20 +19,23 @@
 package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.DataFormatException;
 
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
 import org.apache.kylin.common.util.Bytes;
@@ -52,7 +55,6 @@ import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
 import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
-import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
@@ -103,6 +105,16 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         return Pair.newPair(cubeSeg.getCuboidShardNum(cuboid.getId()), cubeSeg.getCuboidBaseShard(cuboid.getId()));
     }
 
+    static Field channelRowField = null;
+    static {
+        try {
+            channelRowField = RegionCoprocessorRpcChannel.class.getDeclaredField("row");
+            channelRowField.setAccessible(true);
+        } catch (Throwable t) {
+            logger.warn("error when get row field from RegionCoprocessorRpcChannel class", t);
+        }
+    }
+
     @SuppressWarnings("checkstyle:methodlength")
     @Override
     public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
@@ -135,7 +147,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
         scanRequestByteString = serializeGTScanReq(scanRequest);
 
-        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout);
+        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(queryContext, shardNum, coprocessorTimeout);
 
         logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
@@ -165,97 +177,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes());
         builder.setIsExactAggregate(storageContext.isExactAggregation());
 
+        final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryContext.getQueryId(),
+                Integer.toHexString(System.identityHashCode(scanRequest)));
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
-
-                    final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest)));
-                    final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>();
-
-                    try {
-                        Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
-
-                        final CubeVisitRequest request = builder.build();
-                        final byte[] startKey = epRange.getFirst();
-                        final byte[] endKey = epRange.getSecond();
-
-                        table.coprocessorService(CubeVisitService.class, startKey, endKey, //
-                                new Batch.Call<CubeVisitService, CubeVisitResponse>() {
-                                    public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
-                                        ServerRpcController controller = new ServerRpcController();
-                                        BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
-                                        rowsService.visitCube(controller, request, rpcCallback);
-                                        CubeVisitResponse response = rpcCallback.get();
-                                        if (controller.failedOnException()) {
-                                            throw controller.getFailedOn();
-                                        }
-                                        return response;
-                                    }
-                                }, new Batch.Callback<CubeVisitResponse>() {
-                                    @Override
-                                    public void update(byte[] region, byte[] row, CubeVisitResponse result) {
-                                        if (region == null) {
-                                            return;
-                                        }
-
-                                        logger.info(logHeader + getStatsString(region, result));
-
-                                        Stats stats = result.getStats();
-                                        queryContext.addAndGetScannedRows(stats.getScannedRowCount());
-                                        queryContext.addAndGetScannedBytes(stats.getScannedBytes());
-
-                                        RuntimeException rpcException = null;
-                                        if (result.getStats().getNormalComplete() != 1) {
-                                            rpcException = getCoprocessorException(result);
-                                        }
-                                        queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(),
-                                                cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(),
-                                                cuboid.getId(), storageContext.getFilterMask(), rpcException,
-                                                stats.getServiceEndTime() - stats.getServiceStartTime(), 0,
-                                                stats.getScannedRowCount(),
-                                                stats.getScannedRowCount() - stats.getAggregatedRowCount()
-                                                        - stats.getFilteredRowCount(),
-                                                stats.getAggregatedRowCount(), stats.getScannedBytes());
-
-                                        // if any other region has responded with error, skip further processing
-                                        if (regionErrorHolder.get() != null) {
-                                            return;
-                                        }
-
-                                        // record coprocessor error if happened
-                                        if (rpcException != null) {
-                                            regionErrorHolder.compareAndSet(null, rpcException);
-                                            return;
-                                        }
-
-                                        if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
-                                            throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes());
-                                        }
-
-                                        try {
-                                            if (compressionResult) {
-                                                epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
-                                            } else {
-                                                epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
-                                            }
-                                        } catch (IOException | DataFormatException e) {
-                                            throw new RuntimeException(logHeader + "Error when decompressing", e);
-                                        }
-                                    }
-                                });
-
-                    } catch (Throwable ex) {
-                        logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout
-                        epResultItr.notifyCoprocException(ex);
-                        return;
-                    }
-
-                    if (regionErrorHolder.get() != null) {
-                        RuntimeException exception = regionErrorHolder.get();
-                        logger.error(logHeader + "Error when visiting cubes by endpoint", exception); // double log coz the query thread may already timeout
-                        epResultItr.notifyCoprocException(exception);
-                    }
+                    runEPRange(queryContext, logHeader, compressionResult, builder.build(), conn, epRange.getFirst(),
+                            epRange.getSecond(), epResultItr);
                 }
             });
         }
@@ -263,6 +192,149 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext);
     }
 
+    private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult,
+            final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey,
+            final ExpectedSizeIterator epResultItr) {
+
+        final String queryId = queryContext.getQueryId();
+
+        try {
+            final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
+                    HBaseConnection.getCoprocessorPool());
+
+            table.coprocessorService(CubeVisitService.class, startKey, endKey, //
+                    new Batch.Call<CubeVisitService, CubeVisitResponse>() {
+                        public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
+                            if (queryContext.isStopped()) {
+                                logger.warn(
+                                        "Query-{}: the query has been stopped, not send request to region server any more.",
+                                        queryId);
+                                return null;
+                            }
+
+                            HRegionLocation regionLocation = getStartRegionLocation(rowsService);
+                            String regionServerName = regionLocation == null ? "UNKNOWN" : regionLocation.getHostname();
+                            logger.info("Query-{}: send request to the init region server {} on table {} ", queryId,
+                                    regionServerName, table.getName());
+
+                            queryContext.addQueryStopListener(new QueryContext.QueryStopListener() {
+                                private Thread hConnThread = Thread.currentThread();
+
+                                @Override
+                                public void stop(QueryContext query) {
+                                    try {
+                                        hConnThread.interrupt();
+                                    } catch (Exception e) {
+                                        logger.warn("Exception happens during interrupt thread {} due to {}",
+                                                hConnThread.getName(), e);
+                                    }
+                                }
+                            });
+
+                            ServerRpcController controller = new ServerRpcController();
+                            BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
+                            try {
+                                rowsService.visitCube(controller, request, rpcCallback);
+                                CubeVisitResponse response = rpcCallback.get();
+                                if (controller.failedOnException()) {
+                                    throw controller.getFailedOn();
+                                }
+                                return response;
+                            } catch (Exception e) {
+                                throw e;
+                            } finally {
+                                // Reset the interrupted state
+                                Thread.interrupted();
+                            }
+                        }
+
+                        private HRegionLocation getStartRegionLocation(CubeVisitProtos.CubeVisitService rowsService) {
+                            try {
+                                CubeVisitProtos.CubeVisitService.Stub rowsServiceStub = (CubeVisitProtos.CubeVisitService.Stub) rowsService;
+                                RegionCoprocessorRpcChannel channel = (RegionCoprocessorRpcChannel) rowsServiceStub
+                                        .getChannel();
+                                byte[] row = (byte[]) channelRowField.get(channel);
+                                return conn.getRegionLocator(table.getName()).getRegionLocation(row, false);
+                            } catch (Throwable throwable) {
+                                logger.warn("error when get region server name", throwable);
+                            }
+                            return null;
+                        }
+                    }, new Batch.Callback<CubeVisitResponse>() {
+                        @Override
+                        public void update(byte[] region, byte[] row, CubeVisitResponse result) {
+                            if (result == null) {
+                                return;
+                            }
+                            if (region == null) {
+                                return;
+                            }
+
+                            // if the query is stopped, skip further processing
+                            // this may be caused by
+                            //      * Any other region has responded with error
+                            //      * ServerRpcController.failedOnException
+                            //      * ResourceLimitExceededException
+                            //      * Exception happened during CompressionUtils.decompress()
+                            //      * Outside exceptions, like KylinTimeoutException in SequentialCubeTupleIterator
+                            if (queryContext.isStopped()) {
+                                return;
+                            }
+
+                            logger.info(logHeader + getStatsString(region, result));
+
+                            Stats stats = result.getStats();
+                            queryContext.addAndGetScannedRows(stats.getScannedRowCount());
+                            queryContext.addAndGetScannedBytes(stats.getScannedBytes());
+
+                            RuntimeException rpcException = null;
+                            if (result.getStats().getNormalComplete() != 1) {
+                                // record coprocessor error if happened
+                                rpcException = getCoprocessorException(result);
+                            }
+                            queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(),
+                                    cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(),
+                                    cuboid.getId(), storageContext.getFilterMask(), rpcException,
+                                    stats.getServiceEndTime() - stats.getServiceStartTime(), 0,
+                                    stats.getScannedRowCount(),
+                                    stats.getScannedRowCount() - stats.getAggregatedRowCount()
+                                            - stats.getFilteredRowCount(),
+                                    stats.getAggregatedRowCount(), stats.getScannedBytes());
+
+                            if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
+                                rpcException = new ResourceLimitExceededException(
+                                        "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold "
+                                                + cubeSeg.getConfig().getQueryMaxScanBytes());
+                            }
+
+                            if (rpcException != null) {
+                                queryContext.stop(rpcException);
+                                return;
+                            }
+
+                            try {
+                                if (compressionResult) {
+                                    epResultItr.append(CompressionUtils.decompress(
+                                            HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
+                                } else {
+                                    epResultItr.append(
+                                            HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                                }
+                            } catch (IOException | DataFormatException e) {
+                                throw new RuntimeException(logHeader + "Error when decompressing", e);
+                            }
+                        }
+                    });
+
+        } catch (Throwable ex) {
+            queryContext.stop(ex);
+        }
+
+        if (queryContext.isStopped()) {
+            logger.error(logHeader + "Error when visiting cubes by endpoint", queryContext.getThrowable()); // double log coz the query thread may already timeout
+        }
+    }
+
     private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
         ByteString scanRequestByteString;
         int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;

http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index 60d85b4..2cb0c7f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -24,19 +24,21 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.gridtable.GTScanRequest;
 
 import com.google.common.base.Throwables;
 
 class ExpectedSizeIterator implements Iterator<byte[]> {
-    private BlockingQueue<byte[]> queue;
-    private int expectedSize;
+    private final QueryContext queryContext;
+    private final int expectedSize;
+    private final BlockingQueue<byte[]> queue;
+    private final long coprocessorTimeout;
+    private final long deadline;
     private int current = 0;
-    private long coprocessorTimeout;
-    private long deadline;
-    private volatile Throwable coprocException;
 
-    public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) {
+    public ExpectedSizeIterator(QueryContext queryContext, int expectedSize, long coprocessorTimeout) {
+        this.queryContext = queryContext;
         this.expectedSize = expectedSize;
         this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
 
@@ -59,14 +61,11 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
             current++;
             byte[] ret = null;
 
-            while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) {
+            while (ret == null && deadline > System.currentTimeMillis()) {
+                checkState();
                 ret = queue.poll(1000, TimeUnit.MILLISECONDS);
             }
 
-            if (coprocException != null) {
-                throw Throwables.propagate(coprocException);
-            }
-
             if (ret == null) {
                 throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
                         GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?");
@@ -85,6 +84,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
     }
 
     public void append(byte[] data) {
+        checkState();
+
         try {
             queue.put(data);
         } catch (InterruptedException e) {
@@ -93,7 +94,14 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
         }
     }
 
-    public void notifyCoprocException(Throwable ex) {
-        coprocException = ex;
+    private void checkState() {
+        if (queryContext.isStopped()) {
+            Throwable throwable = queryContext.getThrowable();
+            if (throwable != null) {
+                throw Throwables.propagate(throwable);
+            } else {
+                throw new IllegalStateException("the query is stopped: " + queryContext.getStopReason());
+            }
+        }
     }
 }