You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/01/26 11:20:38 UTC
[12/33] kylin git commit: KYLIN-3157,
enhancement kylin's query timeout.
KYLIN-3157, enhancement kylin's query timeout.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/13f64f48
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/13f64f48
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/13f64f48
Branch: refs/heads/sync
Commit: 13f64f48cc8111382840f4b0510b40df08448ace
Parents: 98b1047
Author: Li Yang <li...@apache.org>
Authored: Fri Jan 26 18:39:57 2018 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Jan 26 18:39:57 2018 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 13 ++-
.../org/apache/kylin/common/QueryContext.java | 21 +---
.../apache/kylin/storage/StorageFactory.java | 4 +
.../gtrecord/GTCubeStorageQueryBase.java | 3 +-
.../gtrecord/SequentialCubeTupleIterator.java | 5 +-
.../kylin/rest/service/BadQueryDetector.java | 17 ++-
.../apache/kylin/rest/service/QueryService.java | 1 +
.../rest/service/BadQueryDetectorTest.java | 4 +-
.../rest/service/KyilnQueryTimeoutTest.java | 104 +++++++++++++++++++
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 7 +-
10 files changed, 144 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 761bb9a..df01842 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1266,7 +1266,11 @@ abstract public class KylinConfigBase implements Serializable {
}
public int getBadQueryDefaultDetectIntervalSeconds() {
- return Integer.parseInt(getOptional("kylin.query.badquery-detect-interval", "60"));
+ int time = getQueryTimeoutSeconds() / 2; // half of query timeout
+ if (time == 0) {
+ time = 60; // 60 sec
+ }
+ return time;
}
public boolean getBadQueryPersistentEnabled() {
@@ -1324,7 +1328,12 @@ abstract public class KylinConfigBase implements Serializable {
}
public int getQueryTimeoutSeconds() {
- return Integer.parseInt(this.getOptional("kylin.query.timeout-seconds", "0"));
+ int time = Integer.parseInt(this.getOptional("kylin.query.timeout-seconds", "0"));
+ if (time != 0 && time <= 60) {
+ logger.warn("query timeout seconds less than 60 sec, set to 60 sec.");
+ time = 60;
+ }
+ return time;
}
public boolean isPushDownEnabled() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index cb1b09c..1aa94d3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -45,7 +45,6 @@ public class QueryContext {
}
private long queryStartMillis;
- private long deadline = Long.MAX_VALUE;
private final String queryId;
private String username;
@@ -75,26 +74,10 @@ public class QueryContext {
return queryStartMillis;
}
- public void setDeadline(long timeoutMillis) {
- if (timeoutMillis > 0) {
- deadline = queryStartMillis + timeoutMillis;
- }
- }
-
- public long getDeadline() {
- return deadline;
- }
-
- /**
- * @return millis before deadline
- * @throws KylinTimeoutException if deadline has passed
- */
- public long checkMillisBeforeDeadline() {
- long remain = deadline - System.currentTimeMillis();
- if (remain <= 0) {
+ public void checkMillisBeforeDeadline() {
+ if (Thread.interrupted()) {
throw new KylinTimeoutException("Query timeout");
}
- return remain;
}
public String getQueryId() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index 79b93fe..3505708 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -39,6 +39,10 @@ public class StorageFactory {
return current.get(aware.getStorageType());
}
+ public static void clearCache() {
+ storages.remove();
+ }
+
public static IStorageQuery createQuery(IRealization realization) {
return storage(realization).createQuery(realization);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index ae1f64f..2f69b76 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -160,8 +160,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
sqlDigest.aggregations, context);
// set whether to aggregate results from multiple partitions
enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
- // set and check query deadline
- QueryContextFacade.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
+ // check query deadline
QueryContextFacade.current().checkMillisBeforeDeadline();
// push down having clause filter if possible
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 72417bf..c067e33 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -144,10 +144,9 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
if (scanCount++ % 100 == 1) {
QueryContextFacade.current().checkMillisBeforeDeadline();
}
-
- if (++scanCountDelta >= 1000)
+ if (++scanCountDelta >= 1000) {
flushScanCountDelta();
-
+ }
return tupleIterator.next();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
index 7410c9c..4f7bccf 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -44,7 +44,8 @@ public class BadQueryDetector extends Thread {
private final int alertMB;
private final int alertRunningSec;
private KylinConfig kylinConfig;
- private ArrayList<Notifier> notifiers = new ArrayList<Notifier>();
+ private ArrayList<Notifier> notifiers = new ArrayList<>();
+ private int queryTimeoutSeconds;
public BadQueryDetector() {
super("BadQueryDetector");
@@ -53,17 +54,19 @@ public class BadQueryDetector extends Thread {
this.detectionInterval = kylinConfig.getBadQueryDefaultDetectIntervalSeconds() * 1000L;
this.alertMB = 100;
this.alertRunningSec = kylinConfig.getBadQueryDefaultAlertingSeconds();
+ this.queryTimeoutSeconds = kylinConfig.getQueryTimeoutSeconds();
initNotifiers();
}
- public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec) {
+ public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec, int queryTimeoutSeconds) {
super("BadQueryDetector");
this.setDaemon(true);
this.detectionInterval = detectionInterval;
this.alertMB = alertMB;
this.alertRunningSec = alertRunningSec;
this.kylinConfig = KylinConfig.getInstanceFromEnv();
+ this.queryTimeoutSeconds = queryTimeoutSeconds;
initNotifiers();
}
@@ -121,6 +124,7 @@ public class BadQueryDetector extends Thread {
notify(badReason, entry);
}
+ @Override
public void run() {
while (true) {
try {
@@ -147,6 +151,8 @@ public class BadQueryDetector extends Thread {
// report if query running long
for (Entry e : entries) {
float runningSec = (float) (now - e.startTime) / 1000;
+ setQueryThreadInterrupted(e, runningSec);
+
if (runningSec >= alertRunningSec) {
notify(BadQueryEntry.ADJ_SLOW, e);
dumpStackTrace(e.thread);
@@ -161,6 +167,13 @@ public class BadQueryDetector extends Thread {
}
}
+ private void setQueryThreadInterrupted(Entry e, float runningSec) {
+ if (queryTimeoutSeconds != 0 && runningSec >= queryTimeoutSeconds) {
+ e.thread.interrupt();
+ logger.error("Trying to cancel query:" + e.thread.getName());
+ }
+ }
+
// log the stack trace of bad query thread for further analysis
private void dumpStackTrace(Thread t) {
int maxStackTraceDepth = kylinConfig.getBadQueryStackTraceDepth();
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 7b30606..56fab34 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -194,6 +194,7 @@ public class QueryService extends BasicService {
} finally {
String badReason = (ret != null && ret.isPushDown()) ? BadQueryEntry.ADJ_PUSHDOWN : null;
badQueryDetector.queryEnd(Thread.currentThread(), badReason);
+ Thread.interrupted(); //reset if interrupted
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
index fc18d92..d61dfbe 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
@@ -50,7 +50,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase {
String mockSql = "select * from just_a_test";
final ArrayList<String[]> alerts = new ArrayList<>();
- BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec);
+ BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec, 1000);
badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() {
@Override
public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
@@ -72,7 +72,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase {
badQueryDetector.queryEnd(Thread.currentThread(), BadQueryEntry.ADJ_PUSHDOWN);
}
- badQueryDetector.stop();
+ badQueryDetector.interrupt();
assertEquals(2, alerts.size());
// second check founds a Slow
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java b/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java
new file mode 100644
index 0000000..25ff75b
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+import java.sql.SQLException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.query.security.QueryACLTestUtil;
+import org.apache.kylin.rest.request.SQLRequest;
+import org.apache.kylin.storage.IStorage;
+import org.apache.kylin.storage.IStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageFactory;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class KyilnQueryTimeoutTest extends LocalFileMetadataTestCase {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Before
+ public void setUp() {
+ this.createTestMetadata();
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty("kylin.storage.provider.2", MockQueryTimeoutStorage.class.getName());
+ config.setProperty("kylin.storage.default", "2");
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ StorageFactory.clearCache();
+ }
+
+ @Test
+ public void testQueryTimeout() throws SQLException {
+ thrown.expectCause(CoreMatchers.isA(KylinTimeoutException.class));
+ thrown.expectMessage(CoreMatchers.containsString("Kylin query timeout"));
+ StorageFactory.clearCache();
+ BadQueryDetector detector = new BadQueryDetector(100, BadQueryDetector.getSystemAvailMB() * 2, 100, 1);
+ detector.start();
+ SQLRequest request = new SQLRequest();
+ request.setProject("default");
+ request.setSql("select count(*) from STREAMING_TABLE");
+ detector.queryStart(Thread.currentThread(), request, "ADMIN");
+ try {
+ QueryACLTestUtil.mockQuery("default", "select * from STREAMING_TABLE");
+ } finally{
+ detector.queryEnd(Thread.currentThread(), "timeout");
+ detector.interrupt();
+ }
+ }
+
+ public static class MockQueryTimeoutStorage implements IStorage {
+
+ @Override
+ public IStorageQuery createQuery(IRealization realization) {
+ return new MockQueryTimeoutQuery();
+ }
+
+ @Override
+ public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+ return null;
+ }
+ }
+
+ private static class MockQueryTimeoutQuery implements IStorageQuery {
+ @Override
+ public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ try {
+ Thread.sleep(5 * 1000);
+ } catch (InterruptedException e) {
+ throw new KylinTimeoutException("Kylin query timeout");
+ }
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/13f64f48/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 2e82140..634a3cd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -311,11 +311,8 @@ public abstract class CubeHBaseRPC implements IGTStorage {
coopTimeout = (long) (rpcTimeout * 0.9);
}
- long millisBeforeDeadline = queryContext.checkMillisBeforeDeadline();
- coopTimeout = Math.min(coopTimeout, millisBeforeDeadline);
-
- logger.debug("{} = {} ms, {} ms before deadline, use {} ms as timeout for coprocessor",
- HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, millisBeforeDeadline, coopTimeout);
+ queryContext.checkMillisBeforeDeadline();
+ logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout);
return coopTimeout;
}