You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/01/26 14:56:24 UTC

[16/34] kylin git commit: KYLIN-3157, enhancement kylin's query timeout.

KYLIN-3157, enhancement kylin's query timeout.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1f33dd93
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1f33dd93
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1f33dd93

Branch: refs/heads/master
Commit: 1f33dd93123dd59795a16193a3b5064419f4b532
Parents: 87a0058
Author: Li Yang <li...@apache.org>
Authored: Fri Jan 26 18:39:57 2018 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Jan 26 22:54:58 2018 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  13 ++-
 .../org/apache/kylin/common/QueryContext.java   |  21 +---
 .../apache/kylin/storage/StorageFactory.java    |   4 +
 .../gtrecord/GTCubeStorageQueryBase.java        |   3 +-
 .../gtrecord/SequentialCubeTupleIterator.java   |   5 +-
 .../kylin/rest/service/BadQueryDetector.java    |  17 ++-
 .../apache/kylin/rest/service/QueryService.java |   1 +
 .../rest/service/BadQueryDetectorTest.java      |   4 +-
 .../rest/service/KyilnQueryTimeoutTest.java     | 104 +++++++++++++++++++
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |   7 +-
 10 files changed, 144 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8efd260..b053daa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1266,7 +1266,11 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     public int getBadQueryDefaultDetectIntervalSeconds() {
-        return Integer.parseInt(getOptional("kylin.query.badquery-detect-interval", "60"));
+        int time = getQueryTimeoutSeconds() / 2; // half of query timeout
+        if (time == 0) {
+            time = 60; // 60 sec
+        }
+        return time;
     }
 
     public boolean getBadQueryPersistentEnabled() {
@@ -1324,7 +1328,12 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     public int getQueryTimeoutSeconds() {
-        return Integer.parseInt(this.getOptional("kylin.query.timeout-seconds", "0"));
+        int time = Integer.parseInt(this.getOptional("kylin.query.timeout-seconds", "0"));
+        if (time != 0 && time <= 60) {
+            logger.warn("query timeout seconds less than 60 sec, set to 60 sec.");
+            time = 60;
+        }
+        return time;
     }
 
     public boolean isPushDownEnabled() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index cb1b09c..1aa94d3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -45,7 +45,6 @@ public class QueryContext {
     }
 
     private long queryStartMillis;
-    private long deadline = Long.MAX_VALUE;
 
     private final String queryId;
     private String username;
@@ -75,26 +74,10 @@ public class QueryContext {
         return queryStartMillis;
     }
 
-    public void setDeadline(long timeoutMillis) {
-        if (timeoutMillis > 0) {
-            deadline = queryStartMillis + timeoutMillis;
-        }
-    }
-
-    public long getDeadline() {
-        return deadline;
-    }
-
-    /**
-     * @return millis before deadline
-     * @throws KylinTimeoutException if deadline has passed
-     */
-    public long checkMillisBeforeDeadline() {
-        long remain = deadline - System.currentTimeMillis();
-        if (remain <= 0) {
+    public void checkMillisBeforeDeadline() {
+        if (Thread.interrupted()) {
             throw new KylinTimeoutException("Query timeout");
         }
-        return remain;
     }
 
     public String getQueryId() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index 79b93fe..3505708 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -39,6 +39,10 @@ public class StorageFactory {
         return current.get(aware.getStorageType());
     }
 
+    public static void clearCache() {
+        storages.remove();
+    }
+
     public static IStorageQuery createQuery(IRealization realization) {
         return storage(realization).createQuery(realization);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index ae1f64f..2f69b76 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -160,8 +160,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
                 sqlDigest.aggregations, context);
         // set whether to aggregate results from multiple partitions
         enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
-        // set and check query deadline
-        QueryContextFacade.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
+        // check query deadline
         QueryContextFacade.current().checkMillisBeforeDeadline();
 
         // push down having clause filter if possible

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 72417bf..c067e33 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -144,10 +144,9 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
         if (scanCount++ % 100 == 1) {
             QueryContextFacade.current().checkMillisBeforeDeadline();
         }
-
-        if (++scanCountDelta >= 1000)
+        if (++scanCountDelta >= 1000) {
             flushScanCountDelta();
-
+        }
         return tupleIterator.next();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
index 7410c9c..4f7bccf 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -44,7 +44,8 @@ public class BadQueryDetector extends Thread {
     private final int alertMB;
     private final int alertRunningSec;
     private KylinConfig kylinConfig;
-    private ArrayList<Notifier> notifiers = new ArrayList<Notifier>();
+    private ArrayList<Notifier> notifiers = new ArrayList<>();
+    private int queryTimeoutSeconds;
 
     public BadQueryDetector() {
         super("BadQueryDetector");
@@ -53,17 +54,19 @@ public class BadQueryDetector extends Thread {
         this.detectionInterval = kylinConfig.getBadQueryDefaultDetectIntervalSeconds() * 1000L;
         this.alertMB = 100;
         this.alertRunningSec = kylinConfig.getBadQueryDefaultAlertingSeconds();
+        this.queryTimeoutSeconds = kylinConfig.getQueryTimeoutSeconds();
 
         initNotifiers();
     }
 
-    public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec) {
+    public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec, int queryTimeoutSeconds) {
         super("BadQueryDetector");
         this.setDaemon(true);
         this.detectionInterval = detectionInterval;
         this.alertMB = alertMB;
         this.alertRunningSec = alertRunningSec;
         this.kylinConfig = KylinConfig.getInstanceFromEnv();
+        this.queryTimeoutSeconds = queryTimeoutSeconds;
 
         initNotifiers();
     }
@@ -121,6 +124,7 @@ public class BadQueryDetector extends Thread {
             notify(badReason, entry);
     }
 
+    @Override
     public void run() {
         while (true) {
             try {
@@ -147,6 +151,8 @@ public class BadQueryDetector extends Thread {
         // report if query running long
         for (Entry e : entries) {
             float runningSec = (float) (now - e.startTime) / 1000;
+            setQueryThreadInterrupted(e, runningSec);
+
             if (runningSec >= alertRunningSec) {
                 notify(BadQueryEntry.ADJ_SLOW, e);
                 dumpStackTrace(e.thread);
@@ -161,6 +167,13 @@ public class BadQueryDetector extends Thread {
         }
     }
 
+    private void setQueryThreadInterrupted(Entry e, float runningSec) {
+        if (queryTimeoutSeconds != 0 && runningSec >= queryTimeoutSeconds) {
+            e.thread.interrupt();
+            logger.error("Trying to cancel query:" + e.thread.getName());
+        }
+    }
+
     // log the stack trace of bad query thread for further analysis
     private void dumpStackTrace(Thread t) {
         int maxStackTraceDepth = kylinConfig.getBadQueryStackTraceDepth();

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 7b30606..56fab34 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -194,6 +194,7 @@ public class QueryService extends BasicService {
         } finally {
             String badReason = (ret != null && ret.isPushDown()) ? BadQueryEntry.ADJ_PUSHDOWN : null;
             badQueryDetector.queryEnd(Thread.currentThread(), badReason);
+            Thread.interrupted(); //reset if interrupted
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
index fc18d92..d61dfbe 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
@@ -50,7 +50,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase {
         String mockSql = "select * from just_a_test";
         final ArrayList<String[]> alerts = new ArrayList<>();
 
-        BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec);
+        BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec, 1000);
         badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() {
             @Override
             public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
@@ -72,7 +72,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase {
             badQueryDetector.queryEnd(Thread.currentThread(), BadQueryEntry.ADJ_PUSHDOWN);
         }
 
-        badQueryDetector.stop();
+        badQueryDetector.interrupt();
 
         assertEquals(2, alerts.size());
         // second check founds a Slow

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java b/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java
new file mode 100644
index 0000000..25ff75b
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+import java.sql.SQLException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.query.security.QueryACLTestUtil;
+import org.apache.kylin.rest.request.SQLRequest;
+import org.apache.kylin.storage.IStorage;
+import org.apache.kylin.storage.IStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageFactory;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class KyilnQueryTimeoutTest extends LocalFileMetadataTestCase {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Before
+    public void setUp() {
+        this.createTestMetadata();
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.storage.provider.2", MockQueryTimeoutStorage.class.getName());
+        config.setProperty("kylin.storage.default", "2");
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        StorageFactory.clearCache();
+    }
+
+    @Test
+    public void testQueryTimeout() throws SQLException {
+        thrown.expectCause(CoreMatchers.isA(KylinTimeoutException.class));
+        thrown.expectMessage(CoreMatchers.containsString("Kylin query timeout"));
+        StorageFactory.clearCache();
+        BadQueryDetector detector = new BadQueryDetector(100, BadQueryDetector.getSystemAvailMB() * 2, 100, 1);
+        detector.start();
+        SQLRequest request = new SQLRequest();
+        request.setProject("default");
+        request.setSql("select count(*) from STREAMING_TABLE");
+        detector.queryStart(Thread.currentThread(), request, "ADMIN");
+        try {
+            QueryACLTestUtil.mockQuery("default", "select * from STREAMING_TABLE");
+        } finally{
+            detector.queryEnd(Thread.currentThread(), "timeout");
+            detector.interrupt();
+        }
+    }
+
+    public static class MockQueryTimeoutStorage implements IStorage {
+
+        @Override
+        public IStorageQuery createQuery(IRealization realization) {
+            return new MockQueryTimeoutQuery();
+        }
+
+        @Override
+        public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+            return null;
+        }
+    }
+
+    private static class MockQueryTimeoutQuery implements IStorageQuery {
+        @Override
+        public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+            try {
+                Thread.sleep(5 * 1000);
+            } catch (InterruptedException e) {
+                throw new KylinTimeoutException("Kylin query timeout");
+            }
+            return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 2e82140..634a3cd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -311,11 +311,8 @@ public abstract class CubeHBaseRPC implements IGTStorage {
             coopTimeout = (long) (rpcTimeout * 0.9);
         }
 
-        long millisBeforeDeadline = queryContext.checkMillisBeforeDeadline();
-        coopTimeout = Math.min(coopTimeout, millisBeforeDeadline);
-        
-        logger.debug("{} = {} ms, {} ms before deadline, use {} ms as timeout for coprocessor",
-                HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, millisBeforeDeadline, coopTimeout);
+        queryContext.checkMillisBeforeDeadline();
+        logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout);
         return coopTimeout;
     }