You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/29 11:27:58 UTC
[kylin] 06/12: KYLIN-2897 improve the query execution for a set of
duplicate queries in a short period
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b9aab0105cbf00a3581409a357be3816d0cd89a6
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu Oct 18 18:33:16 2018 +0800
KYLIN-2897 improve the query execution for a set of duplicate queries in a short period
---
.../org/apache/kylin/common/KylinConfigBase.java | 8 ++++++
.../apache/kylin/rest/response/SQLResponse.java | 18 ++++++++++++
.../apache/kylin/rest/service/QueryService.java | 33 ++++++++++++++++++++++
3 files changed, 59 insertions(+)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b19f2e9..778b5bf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1363,6 +1363,14 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000"));
}
+ public boolean isLazyQueryEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.query.lazy-query-enabled", "false"));
+ }
+
+ public long getLazyQueryWaitingTimeoutMilliSeconds() {
+ return Long.parseLong(getOptional("kylin.query.lazy-query-waiting-timeout-milliseconds", "60000L"));
+ }
+
public int getQueryConcurrentRunningThresholdForProject() {
// by default there's no limitation
return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0"));
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 0502798..1721efe 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -79,6 +79,10 @@ public class SQLResponse implements Serializable {
// it's sql response signature for cache checking, no need to return and should be JsonIgnore
protected String signature;
+ // it's a temporary flag, no need to return and should be JsonIgnore
+ // indicating the lazy query start time, -1 indicating not enabled
+ protected long lazyQueryStartTime = -1L;
+
public SQLResponse() {
}
@@ -219,6 +223,20 @@ public class SQLResponse implements Serializable {
}
@JsonIgnore
+ public boolean isRunning() {
+ return this.lazyQueryStartTime >= 0;
+ }
+
+ @JsonIgnore
+ public long getLazyQueryStartTime() {
+ return lazyQueryStartTime;
+ }
+
+ public void setLazyQueryStartTime(long lazyQueryStartTime) {
+ this.lazyQueryStartTime = lazyQueryStartTime;
+ }
+
+ @JsonIgnore
public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
try {
return queryStatistics == null ? Lists.<QueryContext.CubeSegmentStatisticsResult> newArrayList()
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index abcab7f..78068eb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -443,8 +443,16 @@ public class QueryService extends BasicService {
Message msg = MsgPicker.getMsg();
final QueryContext queryContext = QueryContextFacade.current();
+ boolean isDummpyResponseEnabled = queryCacheEnabled && kylinConfig.isLazyQueryEnabled();
SQLResponse sqlResponse = null;
try {
+ // Add dummy response which will be updated or evicted when query finishes
+ if (isDummpyResponseEnabled) {
+ SQLResponse dummyResponse = new SQLResponse();
+ dummyResponse.setLazyQueryStartTime(System.currentTimeMillis());
+ cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), dummyResponse);
+ }
+
final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
if (isSelect) {
sqlResponse = query(sqlRequest, queryContext.getQueryId());
@@ -481,6 +489,8 @@ public class QueryService extends BasicService {
"query response is too large: {} ({})", sqlResponse.getResults().size(),
kylinConfig.getLargeQueryThreshold())) {
cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse);
+ } else if (isDummpyResponseEnabled) {
+ cacheManager.getCache(QUERY_CACHE).evict(sqlRequest.getCacheKey());
}
} catch (Throwable e) { // calcite may throw AssertError
@@ -529,6 +539,29 @@ public class QueryService extends BasicService {
if (response == null) {
return null;
}
+
+ // Check whether duplicate query exists
+ while (response.isRunning()) {
+ // Wait at most one minute
+ if (System.currentTimeMillis() - response.getLazyQueryStartTime() >= getConfig()
+ .getLazyQueryWaitingTimeoutMilliSeconds()) {
+ cache.evict(sqlRequest.getCacheKey());
+ return null;
+ }
+ logger.info("Duplicated SQL request is running, waiting...");
+ try {
+ Thread.sleep(100L);
+ } catch (InterruptedException e) {
+ }
+ wrapper = cache.get(sqlRequest.getCacheKey());
+ if (wrapper == null) {
+ return null;
+ }
+ response = (SQLResponse) wrapper.get();
+ if (response == null) {
+ return null;
+ }
+ }
logger.info("The sqlResponse is found in QUERY_CACHE");
if (getConfig().isQueryCacheSignatureEnabled()
&& !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {