You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 05:53:07 UTC

[kylin] branch master updated (50c374b -> bbe9be3)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 50c374b  KYLIN-4447 Upload a complete example for CDH5.7 env(2 job + 2 query)
     new 5c69a12  KYLIN-4408 Change kylin.query.max-return-rows to kylin.query.max-return-bytes
     new d3788e0  KYLIN-4406 QueryRequestLimits limit should consider the queries hit/wait cache
     new 1142469  KYLIN-4409 Limit the number of simultaneous rpcs to HBase for cube
     new bbe9be3  KYLIN-4410 Limit the number of simultaneous rpcs to HBase for project

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/KylinConfigBase.java   |  14 ++-
 .../java/org/apache/kylin/common/QueryContext.java |  60 +++++++---
 .../apache/kylin/common/QueryContextFacade.java    |   6 +-
 .../apache/kylin/common/SubThreadPoolExecutor.java | 127 +++++++++++++++++++++
 .../kylin/common/SubThreadPoolExecutorTest.java    |  97 ++++++++++++++++
 .../apache/kylin/rest/service/QueryService.java    |  35 +++---
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 118 +++++++++++++------
 7 files changed, 385 insertions(+), 72 deletions(-)
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java
 create mode 100644 core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java


[kylin] 04/04: KYLIN-4410 Limit the number of simultaneous rpcs to HBase for project

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit bbe9be3326e3ef74d9ba04a3f069f66151286dc2
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 13:02:25 2020 +0800

    KYLIN-4410 Limit the number of simultaneous rpcs to HBase for project
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++
 .../java/org/apache/kylin/common/QueryContext.java |  9 +++++
 .../apache/kylin/rest/service/QueryService.java    |  9 +++--
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 43 +++++++++++++++++++++-
 4 files changed, 60 insertions(+), 5 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index dee57d1..3490b7a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1370,6 +1370,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-query", "400"));
     }
 
+    public int getHBaseMaxConnectionThreadsPerProject() {
+        return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-project", "800"));
+    }
+
     // ============================================================================
     // ENGINE.MR
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 30d410d..5d349c3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -54,6 +54,7 @@ public class QueryContext {
 
     private final String queryId;
     private String username;
+    private String project;
     private Set<String> groups;
     private AtomicLong scannedRows = new AtomicLong();
     private AtomicLong returnedRows = new AtomicLong();
@@ -122,6 +123,14 @@ public class QueryContext {
         this.username = username;
     }
 
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
     public Set<String> getGroups() {
         return groups;
     }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index de101d9..de0c9bc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -404,7 +404,11 @@ public class QueryService extends BasicService {
         if (sqlRequest.getBackdoorToggles() != null)
             BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
 
+        // set initial info when starting a query
         final QueryContext queryContext = QueryContextFacade.current();
+        queryContext.setUsername(SecurityContextHolder.getContext().getAuthentication().getName());
+        queryContext.setGroups(AclPermissionUtil.getCurrentUserGroups());
+        queryContext.setProject(sqlRequest.getProject());
 
         try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
             SQLResponse sqlResponse = null;
@@ -629,10 +633,7 @@ public class QueryService extends BasicService {
 
         try {
             conn = QueryConnection.getConnection(sqlRequest.getProject());
-            String userInfo = SecurityContextHolder.getContext().getAuthentication().getName();
-            QueryContext context = QueryContextFacade.current();
-            context.setUsername(userInfo);
-            context.setGroups(AclPermissionUtil.getCurrentUserGroups());
+            String userInfo = QueryContextFacade.current().getUsername();
             final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext()
                     .getAuthentication().getAuthorities();
             for (GrantedAuthority grantedAuthority : grantedAuthorities) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index dde4956..2eb6372 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.common.SubThreadPoolExecutor;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
@@ -57,7 +58,10 @@ import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
 import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
@@ -71,6 +75,9 @@ import org.apache.kylin.storage.hbase.util.HBaseUnionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.HBaseZeroCopyByteString;
@@ -81,6 +88,38 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private static ExecutorService executorService = new LoggableCachedThreadPool();
 
+    private static LoadingCache<String, ExecutorService> projectThreadPoolMap = CacheBuilder.newBuilder()
+            .build(new CacheLoader<String, ExecutorService>() {
+                @Override
+                public ExecutorService load(String projName) throws Exception {
+                    ExecutorService sharedPool = HBaseConnection.getCoprocessorPool();
+                    ProjectInstance projInst = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
+                            .getProject(projName);
+                    return new SubThreadPoolExecutor(sharedPool, "PROJECT",
+                            projInst.getConfig().getHBaseMaxConnectionThreadsPerProject());
+                }
+            });
+
+    private static class ProjectThreadPoolSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            projectThreadPoolMap.invalidateAll();
+            logger.info("Project level thread pools are cleared");
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
+                throws IOException {
+            projectThreadPoolMap.invalidate(cacheKey);
+            logger.info("Thread pool map for project {} is cleared", cacheKey);
+        }
+    }
+
+    static {
+        Broadcaster.getInstance(KylinConfig.getInstanceFromEnv())
+                .registerStaticListener(new ProjectThreadPoolSyncListener(), "project");
+    }
+    
     public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
         super(segment, cuboid, fullGTInfo, context);
     }
@@ -258,8 +297,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         try {
             final Connection conn =  HBaseUnionUtil.getConnection(cubeSeg.getConfig(), cubeSeg.getStorageLocationIdentifier());
+            ExecutorService projThreadPool = projectThreadPoolMap
+                    .get(queryContext.getProject().toUpperCase(Locale.ROOT));
             final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
-                    queryContext.getConnectionPool(HBaseConnection.getCoprocessorPool()));
+                    queryContext.getConnectionPool(projThreadPool));
 
             table.coprocessorService(CubeVisitService.class, startKey, endKey, //
                     new Batch.Call<CubeVisitService, CubeVisitResponse>() {


[kylin] 03/04: KYLIN-4409 Limit the number of simultaneous rpcs to HBase for cube

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1142469c2558f0a9d9caa03d6c574b6363ff227d
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 12:37:17 2020 +0800

    KYLIN-4409 Limit the number of simultaneous rpcs to HBase for cube
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../java/org/apache/kylin/common/QueryContext.java |  25 +++-
 .../apache/kylin/common/QueryContextFacade.java    |   6 +-
 .../apache/kylin/common/SubThreadPoolExecutor.java | 127 +++++++++++++++++++++
 .../kylin/common/SubThreadPoolExecutorTest.java    |  97 ++++++++++++++++
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        |   2 +-
 6 files changed, 255 insertions(+), 6 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e2d7543..dee57d1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1366,6 +1366,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.storage.clean-after-delete-operation", FALSE));
     }
 
+    public int getHBaseMaxConnectionThreadsPerQuery() {
+        return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-query", "400"));
+    }
+
     // ============================================================================
     // ENGINE.MR
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 85cc5f8..30d410d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -68,13 +69,31 @@ public class QueryContext {
     private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList();
     private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
 
-    QueryContext() {
-        this(System.currentTimeMillis());
+    final int maxConnThreads;
+
+    private ExecutorService connPool;
+
+    QueryContext(int maxConnThreads) {
+        this(maxConnThreads, System.currentTimeMillis());
     }
 
-    QueryContext(long startMills) {
+    QueryContext(int maxConnThreads, long startMills) {
         queryId = RandomUtil.randomUUID().toString();
         queryStartMillis = startMills;
+        this.maxConnThreads = maxConnThreads;
+    }
+
+    public ExecutorService getConnectionPool(ExecutorService sharedConnPool) {
+        if (connPool != null) {
+            return connPool;
+        }
+
+        synchronized (this) {
+            if (connPool == null) {
+                connPool = new SubThreadPoolExecutor(sharedConnPool, "QUERY", maxConnThreads);
+            }
+            return connPool;
+        }
     }
 
     public long getQueryStartMillis() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
index bd22f68..b92df50 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
@@ -38,7 +38,8 @@ public class QueryContextFacade {
     private static final InternalThreadLocal<QueryContext> CURRENT_CTX = new InternalThreadLocal<QueryContext>() {
         @Override
         protected QueryContext initialValue() {
-            QueryContext queryContext = new QueryContext();
+            QueryContext queryContext = new QueryContext(
+                    KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery());
             RUNNING_CTX_MAP.put(queryContext.getQueryId(), queryContext);
             return queryContext;
         }
@@ -96,7 +97,8 @@ public class QueryContextFacade {
      */
     public static TreeSet<QueryContext> getLongRunningQueries(long runningTime) {
         SortedSet<QueryContext> allRunningQueries = getAllRunningQueries();
-        QueryContext tmpCtx = new QueryContext(runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid
+        QueryContext tmpCtx = new QueryContext(KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery(),
+                runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid
         return (TreeSet<QueryContext>) allRunningQueries.headSet(tmpCtx);
     }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java b/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java
new file mode 100644
index 0000000..986ad7f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class SubThreadPoolExecutor extends AbstractExecutorService {
+
+    private final static Logger logger = LoggerFactory.getLogger(SubThreadPoolExecutor.class);
+
+    private final Semaphore semaphore;
+
+    private final ExecutorService impl;
+
+    private final String subject;
+
+    public SubThreadPoolExecutor(ExecutorService impl, String subject, int maxThreads) {
+        this.impl = impl;
+        this.subject = subject;
+        this.semaphore = new Semaphore(maxThreads);
+    }
+
+    // Obtain a thread resource. If no resources, block it
+    private void obtainThread() {
+        try {
+            semaphore.acquire();
+            logger.debug("Obtain thread for {}", subject);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // Release a thread resource
+    private void releaseThread() {
+        semaphore.release();
+        logger.debug("Release thread for {}", subject);
+    }
+
+    public void execute(Runnable command) {
+        obtainThread();
+        impl.execute(command);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+        ListenableFutureTask<T> ret = impl instanceof SubThreadPoolExecutor
+                ? (ListenableFutureTask) ((SubThreadPoolExecutor) impl).newTaskFor(runnable, value)
+                : ListenableFutureTask.create(runnable, value);
+        ret.addListener(new Runnable() {
+            @Override
+            public void run() {
+                releaseThread();
+            }
+        }, MoreExecutors.sameThreadExecutor());
+        return ret;
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+        ListenableFutureTask<T> ret = impl instanceof SubThreadPoolExecutor
+                ? (ListenableFutureTask) ((SubThreadPoolExecutor) impl).newTaskFor(callable)
+                : ListenableFutureTask.create(callable);
+        ret.addListener(new Runnable() {
+            @Override
+            public void run() {
+                releaseThread();
+            }
+        }, MoreExecutors.sameThreadExecutor());
+        return ret;
+    }
+
+    @Override
+    public void shutdown() {
+        throw new IllegalStateException(
+                "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        throw new IllegalStateException(
+                "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new IllegalStateException(
+                "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java b/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java
new file mode 100644
index 0000000..3f9cc25
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+public class SubThreadPoolExecutorTest {
+    private final static Logger logger = LoggerFactory.getLogger(SubThreadPoolExecutorTest.class);
+
+    private static final long SleepTime = 50L;
+
+    @Test
+    public void testOneLayer() throws InterruptedException, ExecutionException {
+        int nTotalThread = 10;
+        int nTask = 5;
+        int nSubMaxThread = 2;
+        long minNRound = (nTask - 1) / nSubMaxThread + 1;
+
+        ExecutorService executor = Executors.newFixedThreadPool(nTotalThread);
+        SubThreadPoolExecutor subExecutor = new SubThreadPoolExecutor(executor, "layOne", nSubMaxThread);
+        testInner(subExecutor, nTask, minNRound);
+    }
+
+    @Test
+    public void testTwoLayer() throws InterruptedException, ExecutionException {
+        testTwoLayer(5);
+        testTwoLayer(1);
+    }
+
+    private void testTwoLayer(int nSubMaxThread) throws InterruptedException, ExecutionException {
+        int nTotalThread = 10;
+        int nTask = 5;
+        int nSSubMaxThread = 2;
+        long minNRound = (nTask - 1) / Math.min(nSubMaxThread, nSSubMaxThread) + 1;
+
+        ExecutorService executor = Executors.newFixedThreadPool(nTotalThread);
+        SubThreadPoolExecutor subExecutor = new SubThreadPoolExecutor(executor, "layOne", nSubMaxThread);
+        SubThreadPoolExecutor ssubExecutor = new SubThreadPoolExecutor(subExecutor, "layTwo", nSSubMaxThread);
+        testInner(ssubExecutor, nTask, minNRound);
+    }
+
+    private void testInner(SubThreadPoolExecutor subExecutor, int nTask, long minNRound)
+            throws InterruptedException, ExecutionException {
+        Stopwatch sw = new Stopwatch();
+
+        sw.start();
+        List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(nTask);
+        for (int i = 0; i < nTask; i++) {
+            futureList.add(subExecutor.submit(new TestTask()));
+        }
+        for (Future<?> future : futureList) {
+            future.get();
+        }
+        long timeCost = sw.elapsed(TimeUnit.MILLISECONDS);
+        Assert.assertTrue("Time cost should be more than " + timeCost + "ms", timeCost >= minNRound * SleepTime);
+        logger.info("time cost: {}ms", timeCost);
+    }
+
+    private static class TestTask implements Runnable {
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(SleepTime);
+            } catch (InterruptedException e) {
+            }
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 56538af..dde4956 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -259,7 +259,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         try {
             final Connection conn =  HBaseUnionUtil.getConnection(cubeSeg.getConfig(), cubeSeg.getStorageLocationIdentifier());
             final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
-                    HBaseConnection.getCoprocessorPool());
+                    queryContext.getConnectionPool(HBaseConnection.getCoprocessorPool()));
 
             table.coprocessorService(CubeVisitService.class, startKey, endKey, //
                     new Batch.Call<CubeVisitService, CubeVisitResponse>() {


[kylin] 02/04: KYLIN-4406 QueryRequestLimits limit should consider the queries hit/wait cache

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d3788e08a20b61cf981b7949c3a204d7aaf1a415
Author: Ma Gang <mg...@163.com>
AuthorDate: Tue Mar 10 12:04:04 2020 +0800

    KYLIN-4406 QueryRequestLimits limit should consider the queries hit/wait cache
---
 .../apache/kylin/rest/service/QueryService.java    | 26 +++++++++++-----------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 652cbfc..de101d9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -421,22 +421,22 @@ public class QueryService extends BasicService {
             sql = result.getSecond();
             sqlRequest.setSql(sql);
 
-            // try some cheap executions
-            if (sqlResponse == null && isQueryInspect) {
-                sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql());
-            }
+            try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) {
+                // try some cheap executions
+                if (sqlResponse == null && isQueryInspect) {
+                    sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql());
+                }
 
-            if (sqlResponse == null && isCreateTempStatement) {
-                sqlResponse = new SQLResponse(null, null, 0, false, null);
-            }
+                if (sqlResponse == null && isCreateTempStatement) {
+                    sqlResponse = new SQLResponse(null, null, 0, false, null);
+                }
 
-            if (sqlResponse == null && isQueryCacheEnabled) {
-                sqlResponse = searchQueryInCache(sqlRequest);
-            }
+                if (sqlResponse == null && isQueryCacheEnabled) {
+                    sqlResponse = searchQueryInCache(sqlRequest);
+                }
 
-            // real execution if required
-            if (sqlResponse == null) {
-                try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) {
+                // real execution if required
+                if (sqlResponse == null) {
                     sqlResponse = queryAndUpdateCache(sqlRequest, isQueryCacheEnabled);
                 }
             }


[kylin] 01/04: KYLIN-4408 Change kylin.query.max-return-rows to kylin.query.max-return-bytes

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5c69a12037d2b8c99cbffe7541bfbc721c44c0b3
Author: Wang Ken <mi...@ebay.com>
AuthorDate: Tue Mar 10 11:39:49 2020 +0800

    KYLIN-4408 Change kylin.query.max-return-rows to kylin.query.max-return-bytes
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  6 +-
 .../java/org/apache/kylin/common/QueryContext.java | 26 +++++---
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 75 +++++++++++-----------
 3 files changed, 58 insertions(+), 49 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8eefc02..e2d7543 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1730,8 +1730,10 @@ public abstract class KylinConfigBase implements Serializable {
         return value > 0 ? value : Long.MAX_VALUE;
     }
 
-    public long getQueryMaxReturnRows() {
-        return Integer.parseInt(this.getOptional("kylin.query.max-return-rows", "5000000"));
+    public long getQueryMaxReturnBytes() {
+        long value = Long
+                .parseLong(this.getOptional("kylin.query.max-return-bytes", String.valueOf(3L * 1024 * 1024 * 1024)));
+        return value > 0 ? value : Long.MAX_VALUE;
     }
 
     public int getTranslatedInClauseMaxSize() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 88caa7d..85cc5f8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -57,6 +57,7 @@ public class QueryContext {
     private AtomicLong scannedRows = new AtomicLong();
     private AtomicLong returnedRows = new AtomicLong();
     private AtomicLong scannedBytes = new AtomicLong();
+    private AtomicLong returnedBytes = new AtomicLong();
     private Object calcitePlan;
 
     private AtomicBoolean isRunning = new AtomicBoolean(true);
@@ -142,6 +143,14 @@ public class QueryContext {
         return scannedBytes.addAndGet(deltaBytes);
     }
 
+    public long getReturnedBytes() {
+        return returnedBytes.get();
+    }
+
+    public long addAndGetReturnedBytes(long deltaBytes) {
+        return returnedBytes.addAndGet(deltaBytes);
+    }
+
     public void addQueryStopListener(QueryStopListener listener) {
         this.stopListeners.add(listener);
     }
@@ -222,14 +231,12 @@ public class QueryContext {
         }
         ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
         if (segmentStatisticsMap == null) {
-            logger.warn(
-                    "cubeSegmentStatistic should be initialized for cube {}", cubeName);
+            logger.warn("cubeSegmentStatistic should be initialized for cube {}", cubeName);
             return null;
         }
         CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName);
         if (segmentStatistics == null) {
-            logger.warn(
-                    "segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName);
+            logger.warn("segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName);
             return null;
         }
         return segmentStatistics;
@@ -280,16 +287,15 @@ public class QueryContext {
         if (old == null) {
             segmentStatistics.setWrapper(cubeName, segmentName, sourceCuboidId, targetCuboidId, filterMask);
         } else if (segmentStatistics.sourceCuboidId != sourceCuboidId
-                || segmentStatistics.targetCuboidId != targetCuboidId
-                || segmentStatistics.filterMask != filterMask) {
+                || segmentStatistics.targetCuboidId != targetCuboidId || segmentStatistics.filterMask != filterMask) {
             StringBuilder inconsistency = new StringBuilder();
             if (segmentStatistics.sourceCuboidId != sourceCuboidId) {
-                inconsistency.append(
-                        "sourceCuboidId exist " + segmentStatistics.sourceCuboidId + INPUT + sourceCuboidId);
+                inconsistency
+                        .append("sourceCuboidId exist " + segmentStatistics.sourceCuboidId + INPUT + sourceCuboidId);
             }
             if (segmentStatistics.targetCuboidId != targetCuboidId) {
-                inconsistency.append(
-                        "targetCuboidId exist " + segmentStatistics.targetCuboidId + INPUT + targetCuboidId);
+                inconsistency
+                        .append("targetCuboidId exist " + segmentStatistics.targetCuboidId + INPUT + targetCuboidId);
             }
             if (segmentStatistics.filterMask != filterMask) {
                 inconsistency.append("filterMask exist " + segmentStatistics.filterMask + INPUT + filterMask);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 9a12ed8..56538af 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -362,14 +362,26 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                             - stats.getFilteredRowCount(),
                                     stats.getAggregatedRowCount(), stats.getScannedBytes());
 
+                            byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
+                            byte[] queueData = rawData;
                             if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
                                 rpcException = new ResourceLimitExceededException(
                                         "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold "
                                                 + cubeSeg.getConfig().getQueryMaxScanBytes());
-                            } else if (queryContext.getReturnedRows() > cubeSeg.getConfig().getQueryMaxReturnRows()) {
-                                rpcException = new ResourceLimitExceededException(
-                                        "Query returned " + queryContext.getReturnedRows() + " rows exceeds threshold "
-                                                + cubeSeg.getConfig().getQueryMaxReturnRows());
+                            } else {
+                                try {
+                                    if (compressionResult) {
+                                        queueData = CompressionUtils.decompress(rawData);
+                                    }
+                                } catch (IOException | DataFormatException e) {
+                                    throw new RuntimeException(logHeader + "Error when decompressing", e);
+                                }
+                                if (queryContext.addAndGetReturnedBytes(queueData.length) > cubeSeg.getConfig()
+                                        .getQueryMaxReturnBytes()) {
+                                    rpcException = new ResourceLimitExceededException("Query returned "
+                                            + queryContext.getReturnedBytes() + " bytes exceeds threshold "
+                                            + cubeSeg.getConfig().getQueryMaxReturnBytes());
+                                }
                             }
 
                             if (rpcException != null) {
@@ -377,42 +389,31 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                 return;
                             }
 
-                            try {
-                                byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
-                                if (compressionResult) {
-                                    epResultItr.append(CompressionUtils.decompress(rawData));
-                                } else {
-                                    epResultItr.append(rawData);
-                                }
-                                // put segment query result to cache if cache is enabled
-                                if (querySegmentCacheEnabled) {
-                                    try {
-                                        segmentQueryResultBuilder.putRegionResult(rawData);
-                                        if (segmentQueryResultBuilder.isComplete()) {
-                                            CubeSegmentStatistics cubeSegmentStatistics = queryContext
-                                                    .getCubeSegmentStatistics(storageContext.ctxId,
-                                                            cubeSeg.getCubeInstance().getName(), cubeSeg.getName());
-                                            if (cubeSegmentStatistics != null) {
-                                                segmentQueryResultBuilder
-                                                        .setCubeSegmentStatistics(cubeSegmentStatistics);
-                                                logger.info(
-                                                        "Query-{}: try to put segment query result to cache for segment:{}",
-                                                        queryContext.getQueryId(), cubeSeg);
-                                                SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder
-                                                        .build();
-                                                SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
-                                                        segmentQueryResult);
-                                                logger.info(
-                                                        "Query-{}: successfully put segment query result to cache for segment:{}",
-                                                        queryContext.getQueryId(), cubeSeg);
-                                            }
+                            epResultItr.append(queueData);
+                            // put segment query result to cache if cache is enabled
+                            if (querySegmentCacheEnabled) {
+                                try {
+                                    segmentQueryResultBuilder.putRegionResult(rawData);
+                                    if (segmentQueryResultBuilder.isComplete()) {
+                                        CubeSegmentStatistics cubeSegmentStatistics = queryContext
+                                                .getCubeSegmentStatistics(storageContext.ctxId,
+                                                        cubeSeg.getCubeInstance().getName(), cubeSeg.getName());
+                                        if (cubeSegmentStatistics != null) {
+                                            segmentQueryResultBuilder.setCubeSegmentStatistics(cubeSegmentStatistics);
+                                            logger.info(
+                                                    "Query-{}: try to put segment query result to cache for segment:{}",
+                                                    queryContext.getQueryId(), cubeSeg);
+                                            SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder.build();
+                                            SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
+                                                    segmentQueryResult);
+                                            logger.info(
+                                                    "Query-{}: successfully put segment query result to cache for segment:{}",
+                                                    queryContext.getQueryId(), cubeSeg);
                                         }
-                                    } catch (Throwable t) {
-                                        logger.error("Fail to put query segment result to cache", t);
                                     }
+                                } catch (Throwable t) {
+                                    logger.error("Fail to put query segment result to cache", t);
                                 }
-                            } catch (IOException | DataFormatException e) {
-                                throw new RuntimeException(logHeader + "Error when decompressing", e);
                             }
                         }
                     });