You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/29 11:27:58 UTC

[kylin] 06/12: KYLIN-2897 improve the query execution for a set of duplicate queries in a short period

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit b9aab0105cbf00a3581409a357be3816d0cd89a6
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu Oct 18 18:33:16 2018 +0800

    KYLIN-2897 improve the query execution for a set of duplicate queries in a short period
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  8 ++++++
 .../apache/kylin/rest/response/SQLResponse.java    | 18 ++++++++++++
 .../apache/kylin/rest/service/QueryService.java    | 33 ++++++++++++++++++++++
 3 files changed, 59 insertions(+)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b19f2e9..778b5bf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1363,6 +1363,14 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000"));
     }
 
+    public boolean isLazyQueryEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.query.lazy-query-enabled", "false"));
+    }
+
+    public long getLazyQueryWaitingTimeoutMilliSeconds() {
+        return Long.parseLong(getOptional("kylin.query.lazy-query-waiting-timeout-milliseconds", "60000L"));
+    }
+
     public int getQueryConcurrentRunningThresholdForProject() {
         // by default there's no limitation
         return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0"));
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 0502798..1721efe 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -79,6 +79,10 @@ public class SQLResponse implements Serializable {
     // it's sql response signature for cache checking, no need to return and should be JsonIgnore
     protected String signature;
 
+    // it's a temporary flag, no need to return and should be JsonIgnore
+    // indicating the lazy query start time, -1 indicating not enabled
+    protected long lazyQueryStartTime = -1L;
+
     public SQLResponse() {
     }
 
@@ -219,6 +223,20 @@ public class SQLResponse implements Serializable {
     }
 
     @JsonIgnore
+    public boolean isRunning() {
+        return this.lazyQueryStartTime >= 0;
+    }
+
+    @JsonIgnore
+    public long getLazyQueryStartTime() {
+        return lazyQueryStartTime;
+    }
+
+    public void setLazyQueryStartTime(long lazyQueryStartTime) {
+        this.lazyQueryStartTime = lazyQueryStartTime;
+    }
+
+    @JsonIgnore
     public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
         try {
             return queryStatistics == null ? Lists.<QueryContext.CubeSegmentStatisticsResult> newArrayList()
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index abcab7f..78068eb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -443,8 +443,16 @@ public class QueryService extends BasicService {
         Message msg = MsgPicker.getMsg();
         final QueryContext queryContext = QueryContextFacade.current();
 
+        boolean isDummpyResponseEnabled = queryCacheEnabled && kylinConfig.isLazyQueryEnabled();
         SQLResponse sqlResponse = null;
         try {
+            // Add dummy response which will be updated or evicted when query finishes
+            if (isDummpyResponseEnabled) {
+                SQLResponse dummyResponse = new SQLResponse();
+                dummyResponse.setLazyQueryStartTime(System.currentTimeMillis());
+                cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), dummyResponse);
+            }
+
             final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
             if (isSelect) {
                 sqlResponse = query(sqlRequest, queryContext.getQueryId());
@@ -481,6 +489,8 @@ public class QueryService extends BasicService {
                             "query response is too large: {} ({})", sqlResponse.getResults().size(),
                             kylinConfig.getLargeQueryThreshold())) {
                 cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse);
+            } else if (isDummpyResponseEnabled) {
+                cacheManager.getCache(QUERY_CACHE).evict(sqlRequest.getCacheKey());
             }
 
         } catch (Throwable e) { // calcite may throw AssertError
@@ -529,6 +539,29 @@ public class QueryService extends BasicService {
         if (response == null) {
             return null;
         }
+
+        // Check whether duplicate query exists
+        while (response.isRunning()) {
+            // Wait at most one minute
+            if (System.currentTimeMillis() - response.getLazyQueryStartTime() >= getConfig()
+                    .getLazyQueryWaitingTimeoutMilliSeconds()) {
+                cache.evict(sqlRequest.getCacheKey());
+                return null;
+            }
+            logger.info("Duplicated SQL request is running, waiting...");
+            try {
+                Thread.sleep(100L);
+            } catch (InterruptedException e) {
+            }
+            wrapper = cache.get(sqlRequest.getCacheKey());
+            if (wrapper == null) {
+                return null;
+            }
+            response = (SQLResponse) wrapper.get();
+            if (response == null) {
+                return null;
+            }
+        }
         logger.info("The sqlResponse is found in QUERY_CACHE");
         if (getConfig().isQueryCacheSignatureEnabled()
                 && !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {