You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/21 02:58:51 UTC
[06/16] kylin git commit: KYLIN-2902 minor refine
KYLIN-2902 minor refine
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8690fd2d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8690fd2d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8690fd2d
Branch: refs/heads/master
Commit: 8690fd2d671ca7fa49576cc84eda41c3f24e539f
Parents: 7aef88a
Author: lidongsjtu <li...@apache.org>
Authored: Wed Dec 20 15:24:10 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 20 23:20:11 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/rest/service/QueryService.java | 27 +++---
.../kylin/rest/util/QueryRequestLimits.java | 90 ++++++++++++++++++++
.../kylin/rest/util/QueryRequestUtil.java | 90 --------------------
.../kylin/rest/util/QueryRequestLimitsTest.java | 69 +++++++++++++++
.../kylin/rest/util/QueryRequestUtilTest.java | 69 ---------------
5 files changed, 174 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 84a184e..17f6b58 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -98,7 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest;
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.util.AclEvaluate;
-import org.apache.kylin.rest.util.QueryRequestUtil;
+import org.apache.kylin.rest.util.QueryRequestLimits;
import org.apache.kylin.rest.util.TableauInterceptor;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.slf4j.Logger;
@@ -412,25 +412,28 @@ public class QueryService extends BasicService {
final boolean isSelect = QueryUtil.isSelectStatement(sql);
final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled()
&& kylinConfig.isPushDownUpdateEnabled();
+ final int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject();
if (!isSelect && !isPushDownUpdateEnabled) {
logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled");
throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
}
- int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject();
- if (!QueryRequestUtil.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) {
- logger.warn("Directly return exception as too many concurrent query requests for project:" + project);
- throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
- }
+ SQLResponse sqlResponse = null;
+
+ try {
+ // Check project level query request concurrency limitation per query server
+ if (!QueryRequestLimits.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) {
+ logger.warn(
+ "Directly return exception as too many concurrent query requests for project:" + project);
+ throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
+ }
- long startTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis();
- // force clear the query context before a new query
- OLAPContext.clearThreadLocalContexts();
+ // force clear the query context before a new query
+ OLAPContext.clearThreadLocalContexts();
- SQLResponse sqlResponse = null;
- try { // to deal with the case that cache searching throws exception
boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(),
"query cache disabled in KylinConfig") && //
checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
@@ -498,7 +501,7 @@ public class QueryService extends BasicService {
}
}
} finally {
- QueryRequestUtil.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery);
+ QueryRequestLimits.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery);
}
logQuery(sqlRequest, sqlResponse);
http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
new file mode 100644
index 0000000..cddaa12
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class QueryRequestLimits {
+ private static final Logger logger = LoggerFactory.getLogger(QueryRequestLimits.class);
+
+ private static LoadingCache<String, AtomicInteger> runningStats = CacheBuilder.newBuilder()
+ .removalListener(new RemovalListener<String, AtomicInteger>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, AtomicInteger> notification) {
+ logger.info("Current running query number " + notification.getValue().get() + " for project "
+ + notification.getKey() + " is removed due to " + notification.getCause());
+ }
+ }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() {
+ @Override
+ public AtomicInteger load(String s) throws Exception {
+ return new AtomicInteger(0);
+ }
+ });
+
+ public static boolean openQueryRequest(String project, int maxConcurrentQuery) {
+ if (maxConcurrentQuery == 0) {
+ return true;
+ }
+ try {
+ AtomicInteger nRunningQueries = runningStats.get(project);
+ for (;;) {
+ int nRunning = nRunningQueries.get();
+ if (nRunning < maxConcurrentQuery) {
+ if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void closeQueryRequest(String project, int maxConcurrentQuery) {
+ if (maxConcurrentQuery == 0) {
+ return;
+ }
+ AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
+ if (nRunningQueries != null) {
+ nRunningQueries.decrementAndGet();
+ }
+ }
+
+ public static Integer getCurrentRunningQuery(String project) {
+ AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
+ if (nRunningQueries != null) {
+ return nRunningQueries.get();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java
deleted file mode 100644
index 3eb1670..0000000
--- a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.util;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
-public class QueryRequestUtil {
- private static final Logger logger = LoggerFactory.getLogger(QueryRequestUtil.class);
-
- private static LoadingCache<String, AtomicInteger> queryRequestMap = CacheBuilder.newBuilder()
- .removalListener(new RemovalListener<String, AtomicInteger>() {
- @Override
- public void onRemoval(RemovalNotification<String, AtomicInteger> notification) {
- logger.info("Current running query number " + notification.getValue().get() + " for project "
- + notification.getKey() + " is removed due to " + notification.getCause());
- }
- }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() {
- @Override
- public AtomicInteger load(String s) throws Exception {
- return new AtomicInteger(0);
- }
- });
-
- public static boolean openQueryRequest(String project, int maxConcurrentQuery) {
- if (maxConcurrentQuery == 0) {
- return true;
- }
- try {
- AtomicInteger nRunningQueries = queryRequestMap.get(project);
- for (;;) {
- int nRunning = nRunningQueries.get();
- if (nRunning < maxConcurrentQuery) {
- if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) {
- return true;
- }
- } else {
- return false;
- }
- }
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static void closeQueryRequest(String project, int maxConcurrentQuery) {
- if (maxConcurrentQuery == 0) {
- return;
- }
- AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project);
- if (nRunningQueries != null) {
- nRunningQueries.decrementAndGet();
- }
- }
-
- public static Integer getCurrentRunningQuery(String project) {
- AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project);
- if (nRunningQueries != null) {
- return nRunningQueries.get();
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java
----------------------------------------------------------------------
diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java
new file mode 100644
index 0000000..021c057
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueryRequestLimitsTest {
+
+ @Test
+ public void testOpenAndCloseQueryRequest() {
+ int nThread = 5;
+
+ final Integer maxConcurrentQuery = 2;
+ final String project = "test";
+
+ final AtomicInteger nQueryFailed = new AtomicInteger(0);
+
+ Thread[] threads = new Thread[nThread];
+ final CountDownLatch lock = new CountDownLatch(nThread);
+ for (int i = 0; i < nThread; i++) {
+ final int j = i;
+ threads[j] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ boolean ifOpen = QueryRequestLimits.openQueryRequest(project, maxConcurrentQuery);
+ lock.countDown();
+ if (ifOpen) {
+ lock.await();
+ QueryRequestLimits.closeQueryRequest(project, maxConcurrentQuery);
+ } else {
+ nQueryFailed.incrementAndGet();
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ });
+ threads[j].start();
+ }
+ for (int i = 0; i < nThread; i++) {
+ try {
+ threads[i].join();
+ } catch (InterruptedException e) {
+ }
+ }
+ Assert.assertEquals(new Integer(0), QueryRequestLimits.getCurrentRunningQuery(project));
+ Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java
----------------------------------------------------------------------
diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java
deleted file mode 100644
index fb6d2ff..0000000
--- a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class QueryRequestUtilTest {
-
- @Test
- public void testOpenAndCloseQueryRequest() {
- int nThread = 5;
-
- final Integer maxConcurrentQuery = 2;
- final String project = "test";
-
- final AtomicInteger nQueryFailed = new AtomicInteger(0);
-
- Thread[] threads = new Thread[nThread];
- final CountDownLatch lock = new CountDownLatch(nThread);
- for (int i = 0; i < nThread; i++) {
- final int j = i;
- threads[j] = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- boolean ifOpen = QueryRequestUtil.openQueryRequest(project, maxConcurrentQuery);
- lock.countDown();
- if (ifOpen) {
- lock.await();
- QueryRequestUtil.closeQueryRequest(project, maxConcurrentQuery);
- } else {
- nQueryFailed.incrementAndGet();
- }
- } catch (InterruptedException e) {
- }
- }
- });
- threads[j].start();
- }
- for (int i = 0; i < nThread; i++) {
- try {
- threads[i].join();
- } catch (InterruptedException e) {
- }
- }
- Assert.assertEquals(new Integer(0), QueryRequestUtil.getCurrentRunningQuery(project));
- Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get());
- }
-}