You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/21 02:58:51 UTC

[06/16] kylin git commit: KYLIN-2902 minor refine

KYLIN-2902 minor refine


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8690fd2d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8690fd2d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8690fd2d

Branch: refs/heads/master
Commit: 8690fd2d671ca7fa49576cc84eda41c3f24e539f
Parents: 7aef88a
Author: lidongsjtu <li...@apache.org>
Authored: Wed Dec 20 15:24:10 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 20 23:20:11 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/rest/service/QueryService.java | 27 +++---
 .../kylin/rest/util/QueryRequestLimits.java     | 90 ++++++++++++++++++++
 .../kylin/rest/util/QueryRequestUtil.java       | 90 --------------------
 .../kylin/rest/util/QueryRequestLimitsTest.java | 69 +++++++++++++++
 .../kylin/rest/util/QueryRequestUtilTest.java   | 69 ---------------
 5 files changed, 174 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 84a184e..17f6b58 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -98,7 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
-import org.apache.kylin.rest.util.QueryRequestUtil;
+import org.apache.kylin.rest.util.QueryRequestLimits;
 import org.apache.kylin.rest.util.TableauInterceptor;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.slf4j.Logger;
@@ -412,25 +412,28 @@ public class QueryService extends BasicService {
             final boolean isSelect = QueryUtil.isSelectStatement(sql);
             final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled()
                     && kylinConfig.isPushDownUpdateEnabled();
+            final int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject();
 
             if (!isSelect && !isPushDownUpdateEnabled) {
                 logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled");
                 throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
             }
 
-            int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject();
-            if (!QueryRequestUtil.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) {
-                logger.warn("Directly return exception as too many concurrent query requests for project:" + project);
-                throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
-            }
+            SQLResponse sqlResponse = null;
+
+            try {
+                // Check project level query request concurrency limitation per query server
+                if (!QueryRequestLimits.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) {
+                    logger.warn(
+                            "Directly return exception as too many concurrent query requests for project:" + project);
+                    throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
+                }
 
-            long startTime = System.currentTimeMillis();
+                long startTime = System.currentTimeMillis();
 
-            // force clear the query context before a new query
-            OLAPContext.clearThreadLocalContexts();
+                // force clear the query context before a new query
+                OLAPContext.clearThreadLocalContexts();
 
-            SQLResponse sqlResponse = null;
-            try { // to deal with the case that cache searching throws exception
                 boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(),
                         "query cache disabled in KylinConfig") && //
                         checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
@@ -498,7 +501,7 @@ public class QueryService extends BasicService {
                     }
                 }
             } finally {
-                QueryRequestUtil.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery);
+                QueryRequestLimits.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery);
             }
 
             logQuery(sqlRequest, sqlResponse);

http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
new file mode 100644
index 0000000..cddaa12
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class QueryRequestLimits {
+    private static final Logger logger = LoggerFactory.getLogger(QueryRequestLimits.class);
+
+    private static LoadingCache<String, AtomicInteger> runningStats = CacheBuilder.newBuilder()
+            .removalListener(new RemovalListener<String, AtomicInteger>() {
+                @Override
+                public void onRemoval(RemovalNotification<String, AtomicInteger> notification) {
+                    logger.info("Current running query number " + notification.getValue().get() + " for project "
+                            + notification.getKey() + " is removed due to " + notification.getCause());
+                }
+            }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() {
+                @Override
+                public AtomicInteger load(String s) throws Exception {
+                    return new AtomicInteger(0);
+                }
+            });
+
+    public static boolean openQueryRequest(String project, int maxConcurrentQuery) {
+        if (maxConcurrentQuery == 0) {
+            return true;
+        }
+        try {
+            AtomicInteger nRunningQueries = runningStats.get(project);
+            for (;;) {
+                int nRunning = nRunningQueries.get();
+                if (nRunning < maxConcurrentQuery) {
+                    if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) {
+                        return true;
+                    }
+                } else {
+                    return false;
+                }
+            }
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void closeQueryRequest(String project, int maxConcurrentQuery) {
+        if (maxConcurrentQuery == 0) {
+            return;
+        }
+        AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
+        if (nRunningQueries != null) {
+            nRunningQueries.decrementAndGet();
+        }
+    }
+
+    public static Integer getCurrentRunningQuery(String project) {
+        AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
+        if (nRunningQueries != null) {
+            return nRunningQueries.get();
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java
deleted file mode 100644
index 3eb1670..0000000
--- a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.util;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
-public class QueryRequestUtil {
-    private static final Logger logger = LoggerFactory.getLogger(QueryRequestUtil.class);
-
-    private static LoadingCache<String, AtomicInteger> queryRequestMap = CacheBuilder.newBuilder()
-            .removalListener(new RemovalListener<String, AtomicInteger>() {
-                @Override
-                public void onRemoval(RemovalNotification<String, AtomicInteger> notification) {
-                    logger.info("Current running query number " + notification.getValue().get() + " for project "
-                            + notification.getKey() + " is removed due to " + notification.getCause());
-                }
-            }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() {
-                @Override
-                public AtomicInteger load(String s) throws Exception {
-                    return new AtomicInteger(0);
-                }
-            });
-
-    public static boolean openQueryRequest(String project, int maxConcurrentQuery) {
-        if (maxConcurrentQuery == 0) {
-            return true;
-        }
-        try {
-            AtomicInteger nRunningQueries = queryRequestMap.get(project);
-            for (;;) {
-                int nRunning = nRunningQueries.get();
-                if (nRunning < maxConcurrentQuery) {
-                    if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) {
-                        return true;
-                    }
-                } else {
-                    return false;
-                }
-            }
-        } catch (ExecutionException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static void closeQueryRequest(String project, int maxConcurrentQuery) {
-        if (maxConcurrentQuery == 0) {
-            return;
-        }
-        AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project);
-        if (nRunningQueries != null) {
-            nRunningQueries.decrementAndGet();
-        }
-    }
-
-    public static Integer getCurrentRunningQuery(String project) {
-        AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project);
-        if (nRunningQueries != null) {
-            return nRunningQueries.get();
-        } else {
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java
----------------------------------------------------------------------
diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java
new file mode 100644
index 0000000..021c057
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueryRequestLimitsTest {
+
+    @Test
+    public void testOpenAndCloseQueryRequest() {
+        int nThread = 5;
+
+        final Integer maxConcurrentQuery = 2;
+        final String project = "test";
+
+        final AtomicInteger nQueryFailed = new AtomicInteger(0);
+
+        Thread[] threads = new Thread[nThread];
+        final CountDownLatch lock = new CountDownLatch(nThread);
+        for (int i = 0; i < nThread; i++) {
+            final int j = i;
+            threads[j] = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        boolean ifOpen = QueryRequestLimits.openQueryRequest(project, maxConcurrentQuery);
+                        lock.countDown();
+                        if (ifOpen) {
+                            lock.await();
+                            QueryRequestLimits.closeQueryRequest(project, maxConcurrentQuery);
+                        } else {
+                            nQueryFailed.incrementAndGet();
+                        }
+                    } catch (InterruptedException e) {
+                    }
+                }
+            });
+            threads[j].start();
+        }
+        for (int i = 0; i < nThread; i++) {
+            try {
+                threads[i].join();
+            } catch (InterruptedException e) {
+            }
+        }
+        Assert.assertEquals(new Integer(0), QueryRequestLimits.getCurrentRunningQuery(project));
+        Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java
----------------------------------------------------------------------
diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java
deleted file mode 100644
index fb6d2ff..0000000
--- a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class QueryRequestUtilTest {
-
-    @Test
-    public void testOpenAndCloseQueryRequest() {
-        int nThread = 5;
-
-        final Integer maxConcurrentQuery = 2;
-        final String project = "test";
-
-        final AtomicInteger nQueryFailed = new AtomicInteger(0);
-
-        Thread[] threads = new Thread[nThread];
-        final CountDownLatch lock = new CountDownLatch(nThread);
-        for (int i = 0; i < nThread; i++) {
-            final int j = i;
-            threads[j] = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        boolean ifOpen = QueryRequestUtil.openQueryRequest(project, maxConcurrentQuery);
-                        lock.countDown();
-                        if (ifOpen) {
-                            lock.await();
-                            QueryRequestUtil.closeQueryRequest(project, maxConcurrentQuery);
-                        } else {
-                            nQueryFailed.incrementAndGet();
-                        }
-                    } catch (InterruptedException e) {
-                    }
-                }
-            });
-            threads[j].start();
-        }
-        for (int i = 0; i < nThread; i++) {
-            try {
-                threads[i].join();
-            } catch (InterruptedException e) {
-            }
-        }
-        Assert.assertEquals(new Integer(0), QueryRequestUtil.getCurrentRunningQuery(project));
-        Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get());
-    }
-}