You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/05/06 06:59:10 UTC

[kylin] 09/38: KYLIN-5526 fix unique queue async query count more than setting

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4a889f08ab94df6bccb5743cdc51d32a5d1b2b66
Author: Jiawei Li <10...@qq.com>
AuthorDate: Thu Feb 16 17:37:19 2023 +0800

    KYLIN-5526 fix unique queue async query count more than setting
    
    *  KYLIN-5526 add check when add unique queue async query count
---
 .../apache/kylin/rest/controller/NAsyncQueryController.java |  8 ++++++--
 .../java/org/apache/kylin/rest/service/QueryService.java    | 13 +++++--------
 .../org/apache/kylin/rest/util/AsyncQueryRequestLimits.java |  8 +++++++-
 3 files changed, 18 insertions(+), 11 deletions(-)

diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
index 8f84c63ddf..99a3f4ee7a 100644
--- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
+++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
@@ -125,10 +125,11 @@ public class NAsyncQueryController extends NBasicController {
         if (StringUtils.isEmpty(sqlRequest.getSeparator())) {
             sqlRequest.setSeparator(",");
         }
+        AsyncQueryRequestLimits asyncQueryRequestLimits = null;
         if (NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) {
-            AsyncQueryRequestLimits.checkCount();
+            asyncQueryRequestLimits = new AsyncQueryRequestLimits();
         }
-
+        AsyncQueryRequestLimits finalAsyncQueryRequestLimits = asyncQueryRequestLimits;
         executorService.submit(Objects.requireNonNull(TtlRunnable.get(() -> {
             String format = sqlRequest.getFormat().toLowerCase(Locale.ROOT);
             String encode = sqlRequest.getEncode().toLowerCase(Locale.ROOT);
@@ -166,6 +167,9 @@ public class NAsyncQueryController extends NBasicController {
                     throw new RuntimeException(e1);
                 }
             } finally {
+                if (finalAsyncQueryRequestLimits != null) {
+                    finalAsyncQueryRequestLimits.close();
+                }
                 logger.info("Async query with queryId: {} end", queryContext.getQueryId());
                 QueryContext.current().close();
             }
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 846e7b6260..42fa8425b5 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -142,7 +142,6 @@ import org.apache.kylin.rest.response.TableMetaCacheResultV2;
 import org.apache.kylin.rest.security.MutableAclRecord;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.AclPermissionUtil;
-import org.apache.kylin.rest.util.AsyncQueryRequestLimits;
 import org.apache.kylin.rest.util.PrepareSQLUtils;
 import org.apache.kylin.rest.util.QueryCacheSignatureUtil;
 import org.apache.kylin.rest.util.QueryRequestLimits;
@@ -304,13 +303,11 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
                 if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) {
                     queryParams.setSparkQueue(sqlRequest.getSparkQueue());
                 }
-                try (AsyncQueryRequestLimits ignored = new AsyncQueryRequestLimits()) {
-                    AsyncQueryJob asyncQueryJob = new AsyncQueryJob();
-                    asyncQueryJob.setProject(queryParams.getProject());
-                    asyncQueryJob.submit(queryParams);
-                    return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(),
-                            sqlRequest.getProject());
-                }
+                AsyncQueryJob asyncQueryJob = new AsyncQueryJob();
+                asyncQueryJob.setProject(queryParams.getProject());
+                asyncQueryJob.submit(queryParams);
+                return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(),
+                        sqlRequest.getProject());
             }
 
             SQLResponse fakeResponse = TableauInterceptor.tableauIntercept(queryParams.getSql());
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
index ea3df6e9e4..382635f2aa 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
@@ -25,17 +25,23 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.MsgPicker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AsyncQueryRequestLimits implements AutoCloseable {
+    private static final Logger logger = LoggerFactory.getLogger(AsyncQueryRequestLimits.class);
+
     private static volatile AtomicInteger asyncQueryCount = new AtomicInteger(0);
 
     private static final int MAX_COUNT = KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs();
 
-    private static void openAsyncQueryRequest() {
+    private static synchronized void openAsyncQueryRequest() {
         if (MAX_COUNT <= 0) {
             return;
         }
+        checkCount();
         asyncQueryCount.incrementAndGet();
+        logger.debug("current async query job count is {}.", asyncQueryCount.get());
 
     }