You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/05/06 06:59:10 UTC
[kylin] 09/38: KYLIN-5526 fix unique queue async query count more than setting
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4a889f08ab94df6bccb5743cdc51d32a5d1b2b66
Author: Jiawei Li <10...@qq.com>
AuthorDate: Thu Feb 16 17:37:19 2023 +0800
KYLIN-5526 fix unique queue async query count more than setting
* KYLIN-5526 add check when add unique queue async query count
---
.../apache/kylin/rest/controller/NAsyncQueryController.java | 8 ++++++--
.../java/org/apache/kylin/rest/service/QueryService.java | 13 +++++--------
.../org/apache/kylin/rest/util/AsyncQueryRequestLimits.java | 8 +++++++-
3 files changed, 18 insertions(+), 11 deletions(-)
diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
index 8f84c63ddf..99a3f4ee7a 100644
--- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
+++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
@@ -125,10 +125,11 @@ public class NAsyncQueryController extends NBasicController {
if (StringUtils.isEmpty(sqlRequest.getSeparator())) {
sqlRequest.setSeparator(",");
}
+ AsyncQueryRequestLimits asyncQueryRequestLimits = null;
if (NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) {
- AsyncQueryRequestLimits.checkCount();
+ asyncQueryRequestLimits = new AsyncQueryRequestLimits();
}
-
+ AsyncQueryRequestLimits finalAsyncQueryRequestLimits = asyncQueryRequestLimits;
executorService.submit(Objects.requireNonNull(TtlRunnable.get(() -> {
String format = sqlRequest.getFormat().toLowerCase(Locale.ROOT);
String encode = sqlRequest.getEncode().toLowerCase(Locale.ROOT);
@@ -166,6 +167,9 @@ public class NAsyncQueryController extends NBasicController {
throw new RuntimeException(e1);
}
} finally {
+ if (finalAsyncQueryRequestLimits != null) {
+ finalAsyncQueryRequestLimits.close();
+ }
logger.info("Async query with queryId: {} end", queryContext.getQueryId());
QueryContext.current().close();
}
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 846e7b6260..42fa8425b5 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -142,7 +142,6 @@ import org.apache.kylin.rest.response.TableMetaCacheResultV2;
import org.apache.kylin.rest.security.MutableAclRecord;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.AclPermissionUtil;
-import org.apache.kylin.rest.util.AsyncQueryRequestLimits;
import org.apache.kylin.rest.util.PrepareSQLUtils;
import org.apache.kylin.rest.util.QueryCacheSignatureUtil;
import org.apache.kylin.rest.util.QueryRequestLimits;
@@ -304,13 +303,11 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) {
queryParams.setSparkQueue(sqlRequest.getSparkQueue());
}
- try (AsyncQueryRequestLimits ignored = new AsyncQueryRequestLimits()) {
- AsyncQueryJob asyncQueryJob = new AsyncQueryJob();
- asyncQueryJob.setProject(queryParams.getProject());
- asyncQueryJob.submit(queryParams);
- return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(),
- sqlRequest.getProject());
- }
+ AsyncQueryJob asyncQueryJob = new AsyncQueryJob();
+ asyncQueryJob.setProject(queryParams.getProject());
+ asyncQueryJob.submit(queryParams);
+ return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(),
+ sqlRequest.getProject());
}
SQLResponse fakeResponse = TableauInterceptor.tableauIntercept(queryParams.getSql());
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
index ea3df6e9e4..382635f2aa 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
@@ -25,17 +25,23 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.msg.MsgPicker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AsyncQueryRequestLimits implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(AsyncQueryRequestLimits.class);
+
private static volatile AtomicInteger asyncQueryCount = new AtomicInteger(0);
private static final int MAX_COUNT = KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs();
- private static void openAsyncQueryRequest() {
+ private static synchronized void openAsyncQueryRequest() {
if (MAX_COUNT <= 0) {
return;
}
+ checkCount();
asyncQueryCount.incrementAndGet();
+ logger.debug("current async query job count is {}.", asyncQueryCount.get());
}