You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/11/29 06:11:42 UTC
kylin git commit: KYLIN-2847 avoid doing useless work by checking
query deadline
Repository: kylin
Updated Branches:
refs/heads/master 87bf2a143 -> 1b665cb88
KYLIN-2847 avoid doing useless work by checking query deadline
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1b665cb8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1b665cb8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1b665cb8
Branch: refs/heads/master
Commit: 1b665cb8824b891deb2107ff0ee68f2e785afe6a
Parents: 87bf2a1
Author: gaodayue <ga...@meituan.com>
Authored: Tue Sep 5 17:04:30 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 29 14:11:34 2017 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/QueryContext.java | 32 +++++++++++++++++++-
.../apache/kylin/storage/StorageContext.java | 14 ---------
.../gtrecord/GTCubeStorageQueryBase.java | 6 ++--
.../gtrecord/SequentialCubeTupleIterator.java | 6 ++--
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 12 +++++---
.../hbase/cube/v2/ExpectedSizeIterator.java | 4 +--
.../coprocessor/endpoint/CubeVisitService.java | 30 ++++++++++++------
8 files changed, 70 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 5750e03..c7ded10 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,9 @@ public class QueryContext {
}
};
+ private long queryStartMillis;
+ private long deadline = Long.MAX_VALUE;
+
private String queryId;
private String username;
private AtomicLong scannedRows = new AtomicLong();
@@ -54,7 +58,7 @@ public class QueryContext {
private QueryContext() {
// use QueryContext.current() instead
-
+ queryStartMillis = System.currentTimeMillis();
queryId = UUID.randomUUID().toString();
}
@@ -66,6 +70,32 @@ public class QueryContext {
contexts.remove();
}
+ public long getQueryStartMillis() {
+ return queryStartMillis;
+ }
+
+ public void setDeadline(long timeoutMillis) {
+ if (timeoutMillis > 0) {
+ deadline = queryStartMillis + timeoutMillis;
+ }
+ }
+
+ public long getDeadline() {
+ return deadline;
+ }
+
+ /**
+ * @return millis before deadline
+ * @throws KylinTimeoutException if deadline has passed
+ */
+ public long checkMillisBeforeDeadline() {
+ long remain = deadline - System.currentTimeMillis();
+ if (remain <= 0) {
+ throw new KylinTimeoutException("Query timeout");
+ }
+ return remain;
+ }
+
public String getQueryId() {
return queryId == null ? "" : queryId;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 87f7bd0..501dff8 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -47,7 +47,6 @@ public class StorageContext {
private StorageLimitLevel storageLimitLevel = StorageLimitLevel.NO_LIMIT;
private boolean hasSort = false;
private boolean acceptPartialResult = false;
- private long deadline;
private boolean exactAggregation = false;
private boolean needStorageAggregation = false;
@@ -174,19 +173,6 @@ public class StorageContext {
return isValidPushDownLimit(finalPushDownLimit);
}
- public long getDeadline() {
- return this.deadline;
- }
-
- public void setDeadline(IRealization realization) {
- int timeout = realization.getConfig().getQueryTimeoutSeconds() * 1000;
- if (timeout == 0) {
- this.deadline = Long.MAX_VALUE;
- } else {
- this.deadline = timeout + System.currentTimeMillis();
- }
- }
-
public void markSort() {
this.hasSort = true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 0a9d828..e9c6885 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -158,8 +159,9 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
sqlDigest.aggregations, context);
// set whether to aggregate results from multiple partitions
enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
- // set query deadline
- context.setDeadline(cubeInstance);
+ // set and check query deadline
+ QueryContext.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
+ QueryContext.current().checkMillisBeforeDeadline();
// push down having clause filter if possible
TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations,
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 3cbb538..ede5ff9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.QueryContext;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -141,8 +141,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
@Override
public ITuple next() {
- if (scanCount++ % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) {
- throw new KylinTimeoutException("Query timeout after \"kylin.query.timeout-seconds\" seconds");
+ if (scanCount++ % 100 == 1) {
+ QueryContext.current().checkMillisBeforeDeadline();
}
if (++scanCountDelta >= 1000)
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 03f8937..96f8f06 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -130,7 +130,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
rawScanByteString = serializeRawScans(rawScans);
- int coprocessorTimeout = getCoprocessorTimeoutMillis();
+ long coprocessorTimeout = getCoprocessorTimeoutMillis();
scanRequest.setTimeout(coprocessorTimeout);
scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
scanRequestByteString = serializeGTScanReq(scanRequest);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 188f554..6b4ac32 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -287,8 +287,8 @@ public abstract class CubeHBaseRPC implements IGTStorage {
logger.info(info.toString());
}
- protected int getCoprocessorTimeoutMillis() {
- int coopTimeout;
+ protected long getCoprocessorTimeoutMillis() {
+ long coopTimeout;
if (BackdoorToggles.getQueryTimeout() != -1) {
coopTimeout = BackdoorToggles.getQueryTimeout();
} else {
@@ -307,10 +307,14 @@ public abstract class CubeHBaseRPC implements IGTStorage {
// coprocessor timeout is 0 by default
if (coopTimeout <= 0) {
- coopTimeout = (int) (rpcTimeout * 0.9);
+ coopTimeout = (long) (rpcTimeout * 0.9);
}
+
+ long millisBeforeDeadline = queryContext.checkMillisBeforeDeadline();
+ coopTimeout = Math.min(coopTimeout, millisBeforeDeadline);
- logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout);
+ logger.debug("{} = {} ms, {} ms before deadline, use {} ms as timeout for coprocessor",
+ HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, millisBeforeDeadline, coopTimeout);
return coopTimeout;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index 59fe9e0..60d85b4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -32,11 +32,11 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
private BlockingQueue<byte[]> queue;
private int expectedSize;
private int current = 0;
- private int coprocessorTimeout;
+ private long coprocessorTimeout;
private long deadline;
private volatile Throwable coprocException;
- public ExpectedSizeIterator(int expectedSize, int coprocessorTimeout) {
+ public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) {
this.expectedSize = expectedSize;
this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b665cb8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index d99e6c5..b784ab9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
@@ -143,19 +144,17 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
private final Iterator<List<Cell>> delegate;
private final long rowCountLimit;
private final long bytesLimit;
- private final long timeout;
private final long deadline;
private long rowCount;
private long rowBytes;
- ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, long rowCountLimit, long bytesLimit,
- long timeout) {
+ ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate,
+ long rowCountLimit, long bytesLimit, long deadline) {
this.delegate = delegate;
this.rowCountLimit = rowCountLimit;
this.bytesLimit = bytesLimit;
- this.timeout = timeout;
- this.deadline = System.currentTimeMillis() + timeout;
+ this.deadline = deadline;
}
@Override
@@ -167,8 +166,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
throw new ResourceLimitExceededException(
"scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit);
}
- if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) {
- throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms");
+ if ((rowCount % GTScanRequest.terminateCheckInterval == 0) && System.currentTimeMillis() > deadline) {
+ throw new KylinTimeoutException("coprocessor timeout after scanning " + rowCount + " rows");
}
return delegate.hasNext();
}
@@ -220,6 +219,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
sb.append(",");
}
+ private void checkDeadline(long deadline) throws DoNotRetryIOException {
+ if (System.currentTimeMillis() > deadline) {
+ logger.info("Deadline has passed, abort now!");
+ throw new DoNotRetryIOException("Coprocessor passed deadline! Maybe server is overloaded");
+ }
+ }
+
@SuppressWarnings("checkstyle:methodlength")
@Override
public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request,
@@ -234,6 +240,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = null;
String queryId = request.hasQueryId() ? request.getQueryId() : "UnknownId";
+ logger.info("start query {} in thread {}", queryId, Thread.currentThread().getName());
try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) {
final long serviceStartTime = System.currentTimeMillis();
@@ -248,6 +255,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
final GTScanRequest scanReq = GTScanRequest.serializer
.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
+ final long deadline = scanReq.getStartTime() + scanReq.getTimeout();
+ checkDeadline(deadline);
+
List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
for (IntList intList : request.getHbaseColumnsToGTList()) {
hbaseColumnsToGT.add(intList.getIntsList());
@@ -298,7 +308,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(allCellLists,
scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold)
!request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client
- scanReq.getTimeout());
+ deadline);
IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns,
hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(),
@@ -401,6 +411,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
.setNormalComplete(errorInfo == null ? 1 : 0).build())
.build());
+ } catch (DoNotRetryIOException e) {
+ ResponseConverter.setControllerException(controller, e);
+
} catch (IOException ioe) {
logger.error(ioe.toString(), ioe);
IOException wrapped = new IOException("Error in coprocessor " + debugGitTag, ioe);
@@ -413,7 +426,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
try {
region.closeRegionOperation();
} catch (IOException e) {
- e.printStackTrace();
throw new RuntimeException(e);
}
}