You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/11/29 06:11:42 UTC

kylin git commit: KYLIN-2847 avoid doing useless work by checking query deadline

Repository: kylin
Updated Branches:
  refs/heads/master 87bf2a143 -> 1b665cb88


KYLIN-2847 avoid doing useless work by checking query deadline

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1b665cb8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1b665cb8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1b665cb8

Branch: refs/heads/master
Commit: 1b665cb8824b891deb2107ff0ee68f2e785afe6a
Parents: 87bf2a1
Author: gaodayue <ga...@meituan.com>
Authored: Tue Sep 5 17:04:30 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 29 14:11:34 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/QueryContext.java   | 32 +++++++++++++++++++-
 .../apache/kylin/storage/StorageContext.java    | 14 ---------
 .../gtrecord/GTCubeStorageQueryBase.java        |  6 ++--
 .../gtrecord/SequentialCubeTupleIterator.java   |  6 ++--
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  2 +-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     | 12 +++++---
 .../hbase/cube/v2/ExpectedSizeIterator.java     |  4 +--
 .../coprocessor/endpoint/CubeVisitService.java  | 30 ++++++++++++------
 8 files changed, 70 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 5750e03..c7ded10 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +45,9 @@ public class QueryContext {
         }
     };
 
+    private long queryStartMillis;
+    private long deadline = Long.MAX_VALUE;
+
     private String queryId;
     private String username;
     private AtomicLong scannedRows = new AtomicLong();
@@ -54,7 +58,7 @@ public class QueryContext {
 
     private QueryContext() {
         // use QueryContext.current() instead
-        
+        queryStartMillis = System.currentTimeMillis();
         queryId = UUID.randomUUID().toString();
     }
 
@@ -66,6 +70,32 @@ public class QueryContext {
         contexts.remove();
     }
 
+    public long getQueryStartMillis() {
+        return queryStartMillis;
+    }
+
+    public void setDeadline(long timeoutMillis) {
+        if (timeoutMillis > 0) {
+            deadline  = queryStartMillis + timeoutMillis;
+        }
+    }
+
+    public long getDeadline() {
+        return deadline;
+    }
+
+    /**
+     * @return millis before deadline
+     * @throws KylinTimeoutException if deadline has passed
+     */
+    public long checkMillisBeforeDeadline() {
+        long remain = deadline - System.currentTimeMillis();
+        if (remain <= 0) {
+            throw new KylinTimeoutException("Query timeout");
+        }
+        return remain;
+    }
+
     public String getQueryId() {
         return queryId == null ? "" : queryId;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 87f7bd0..501dff8 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -47,7 +47,6 @@ public class StorageContext {
     private StorageLimitLevel storageLimitLevel = StorageLimitLevel.NO_LIMIT;
     private boolean hasSort = false;
     private boolean acceptPartialResult = false;
-    private long deadline;
 
     private boolean exactAggregation = false;
     private boolean needStorageAggregation = false;
@@ -174,19 +173,6 @@ public class StorageContext {
         return isValidPushDownLimit(finalPushDownLimit);
     }
 
-    public long getDeadline() {
-        return this.deadline;
-    }
-
-    public void setDeadline(IRealization realization) {
-        int timeout = realization.getConfig().getQueryTimeoutSeconds() * 1000;
-        if (timeout == 0) {
-            this.deadline = Long.MAX_VALUE;
-        } else {
-            this.deadline = timeout + System.currentTimeMillis();
-        }
-    }
-
     public void markSort() {
         this.hasSort = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 0a9d828..e9c6885 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -158,8 +159,9 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
                 sqlDigest.aggregations, context);
         // set whether to aggregate results from multiple partitions
         enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
-        // set query deadline
-        context.setDeadline(cubeInstance);
+        // set and check query deadline
+        QueryContext.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
+        QueryContext.current().checkMillisBeforeDeadline();
 
         // push down having clause filter if possible
         TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations,

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 3cbb538..ede5ff9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -141,8 +141,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
 
     @Override
     public ITuple next() {
-        if (scanCount++ % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) {
-            throw new KylinTimeoutException("Query timeout after \"kylin.query.timeout-seconds\" seconds");
+        if (scanCount++ % 100 == 1) {
+            QueryContext.current().checkMillisBeforeDeadline();
         }
 
         if (++scanCountDelta >= 1000)

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 03f8937..96f8f06 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -130,7 +130,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
         rawScanByteString = serializeRawScans(rawScans);
 
-        int coprocessorTimeout = getCoprocessorTimeoutMillis();
+        long coprocessorTimeout = getCoprocessorTimeoutMillis();
         scanRequest.setTimeout(coprocessorTimeout);
         scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
         scanRequestByteString = serializeGTScanReq(scanRequest);

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 188f554..6b4ac32 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -287,8 +287,8 @@ public abstract class CubeHBaseRPC implements IGTStorage {
         logger.info(info.toString());
     }
 
-    protected int getCoprocessorTimeoutMillis() {
-        int coopTimeout;
+    protected long getCoprocessorTimeoutMillis() {
+        long coopTimeout;
         if (BackdoorToggles.getQueryTimeout() != -1) {
             coopTimeout = BackdoorToggles.getQueryTimeout();
         } else {
@@ -307,10 +307,14 @@ public abstract class CubeHBaseRPC implements IGTStorage {
         
         // coprocessor timeout is 0 by default
         if (coopTimeout <= 0) {
-            coopTimeout = (int) (rpcTimeout * 0.9);
+            coopTimeout = (long) (rpcTimeout * 0.9);
         }
+
+        long millisBeforeDeadline = queryContext.checkMillisBeforeDeadline();
+        coopTimeout = Math.min(coopTimeout, millisBeforeDeadline);
         
-        logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout);
+        logger.debug("{} = {} ms, {} ms before deadline, use {} ms as timeout for coprocessor",
+                HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, millisBeforeDeadline, coopTimeout);
         return coopTimeout;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index 59fe9e0..60d85b4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -32,11 +32,11 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
     private BlockingQueue<byte[]> queue;
     private int expectedSize;
     private int current = 0;
-    private int coprocessorTimeout;
+    private long coprocessorTimeout;
     private long deadline;
     private volatile Throwable coprocException;
 
-    public ExpectedSizeIterator(int expectedSize, int coprocessorTimeout) {
+    public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) {
         this.expectedSize = expectedSize;
         this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index d99e6c5..b784ab9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
@@ -143,19 +144,17 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         private final Iterator<List<Cell>> delegate;
         private final long rowCountLimit;
         private final long bytesLimit;
-        private final long timeout;
         private final long deadline;
 
         private long rowCount;
         private long rowBytes;
 
-        ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, long rowCountLimit, long bytesLimit,
-                long timeout) {
+        ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate,
+                                         long rowCountLimit, long bytesLimit, long deadline) {
             this.delegate = delegate;
             this.rowCountLimit = rowCountLimit;
             this.bytesLimit = bytesLimit;
-            this.timeout = timeout;
-            this.deadline = System.currentTimeMillis() + timeout;
+            this.deadline = deadline;
         }
 
         @Override
@@ -167,8 +166,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 throw new ResourceLimitExceededException(
                         "scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit);
             }
-            if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) {
-                throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms");
+            if ((rowCount % GTScanRequest.terminateCheckInterval == 0) && System.currentTimeMillis() > deadline) {
+                throw new KylinTimeoutException("coprocessor timeout after scanning " + rowCount + " rows");
             }
             return delegate.hasNext();
         }
@@ -220,6 +219,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         sb.append(",");
     }
 
+    private void checkDeadline(long deadline) throws DoNotRetryIOException {
+        if (System.currentTimeMillis() > deadline) {
+            logger.info("Deadline has passed, abort now!");
+            throw new DoNotRetryIOException("Coprocessor passed deadline! Maybe server is overloaded");
+        }
+    }
+
     @SuppressWarnings("checkstyle:methodlength")
     @Override
     public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request,
@@ -234,6 +240,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = null;
 
         String queryId = request.hasQueryId() ? request.getQueryId() : "UnknownId";
+        logger.info("start query {} in thread {}", queryId, Thread.currentThread().getName());
         try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) {
             final long serviceStartTime = System.currentTimeMillis();
 
@@ -248,6 +255,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
             final GTScanRequest scanReq = GTScanRequest.serializer
                     .deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
+            final long deadline = scanReq.getStartTime() + scanReq.getTimeout();
+            checkDeadline(deadline);
+
             List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
             for (IntList intList : request.getHbaseColumnsToGTList()) {
                 hbaseColumnsToGT.add(intList.getIntsList());
@@ -298,7 +308,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(allCellLists,
                     scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold)
                     !request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client
-                    scanReq.getTimeout());
+                    deadline);
 
             IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns,
                     hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(),
@@ -401,6 +411,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                             .setNormalComplete(errorInfo == null ? 1 : 0).build())
                     .build());
 
+        } catch (DoNotRetryIOException e) {
+            ResponseConverter.setControllerException(controller, e);
+
         } catch (IOException ioe) {
             logger.error(ioe.toString(), ioe);
             IOException wrapped = new IOException("Error in coprocessor " + debugGitTag, ioe);
@@ -413,7 +426,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 try {
                     region.closeRegionOperation();
                 } catch (IOException e) {
-                    e.printStackTrace();
                     throw new RuntimeException(e);
                 }
             }