You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 05:53:11 UTC

[kylin] 04/04: KYLIN-4410 Limit the number of simultaneous rpcs to HBase for project

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit bbe9be3326e3ef74d9ba04a3f069f66151286dc2
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 13:02:25 2020 +0800

    KYLIN-4410 Limit the number of simultaneous rpcs to HBase for project
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++
 .../java/org/apache/kylin/common/QueryContext.java |  9 +++++
 .../apache/kylin/rest/service/QueryService.java    |  9 +++--
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 43 +++++++++++++++++++++-
 4 files changed, 60 insertions(+), 5 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index dee57d1..3490b7a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1370,6 +1370,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-query", "400"));
     }
 
+    public int getHBaseMaxConnectionThreadsPerProject() {
+        return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-project", "800"));
+    }
+
     // ============================================================================
     // ENGINE.MR
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 30d410d..5d349c3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -54,6 +54,7 @@ public class QueryContext {
 
     private final String queryId;
     private String username;
+    private String project;
     private Set<String> groups;
     private AtomicLong scannedRows = new AtomicLong();
     private AtomicLong returnedRows = new AtomicLong();
@@ -122,6 +123,14 @@ public class QueryContext {
         this.username = username;
     }
 
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
     public Set<String> getGroups() {
         return groups;
     }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index de101d9..de0c9bc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -404,7 +404,11 @@ public class QueryService extends BasicService {
         if (sqlRequest.getBackdoorToggles() != null)
             BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
 
+        // set initial info when starting a query
         final QueryContext queryContext = QueryContextFacade.current();
+        queryContext.setUsername(SecurityContextHolder.getContext().getAuthentication().getName());
+        queryContext.setGroups(AclPermissionUtil.getCurrentUserGroups());
+        queryContext.setProject(sqlRequest.getProject());
 
         try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
             SQLResponse sqlResponse = null;
@@ -629,10 +633,7 @@ public class QueryService extends BasicService {
 
         try {
             conn = QueryConnection.getConnection(sqlRequest.getProject());
-            String userInfo = SecurityContextHolder.getContext().getAuthentication().getName();
-            QueryContext context = QueryContextFacade.current();
-            context.setUsername(userInfo);
-            context.setGroups(AclPermissionUtil.getCurrentUserGroups());
+            String userInfo = QueryContextFacade.current().getUsername();
             final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext()
                     .getAuthentication().getAuthorities();
             for (GrantedAuthority grantedAuthority : grantedAuthorities) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index dde4956..2eb6372 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.common.SubThreadPoolExecutor;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
@@ -57,7 +58,10 @@ import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
 import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
@@ -71,6 +75,9 @@ import org.apache.kylin.storage.hbase.util.HBaseUnionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.HBaseZeroCopyByteString;
@@ -81,6 +88,38 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private static ExecutorService executorService = new LoggableCachedThreadPool();
 
+    private static LoadingCache<String, ExecutorService> projectThreadPoolMap = CacheBuilder.newBuilder()
+            .build(new CacheLoader<String, ExecutorService>() {
+                @Override
+                public ExecutorService load(String projName) throws Exception {
+                    ExecutorService sharedPool = HBaseConnection.getCoprocessorPool();
+                    ProjectInstance projInst = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
+                            .getProject(projName);
+                    return new SubThreadPoolExecutor(sharedPool, "PROJECT",
+                            projInst.getConfig().getHBaseMaxConnectionThreadsPerProject());
+                }
+            });
+
+    private static class ProjectThreadPoolSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            projectThreadPoolMap.invalidateAll();
+            logger.info("Project level thread pools are cleared");
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
+                throws IOException {
+            projectThreadPoolMap.invalidate(cacheKey);
+            logger.info("Thread pool map for project {} is cleared", cacheKey);
+        }
+    }
+
+    static {
+        Broadcaster.getInstance(KylinConfig.getInstanceFromEnv())
+                .registerStaticListener(new ProjectThreadPoolSyncListener(), "project");
+    }
+    
     public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
         super(segment, cuboid, fullGTInfo, context);
     }
@@ -258,8 +297,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         try {
             final Connection conn =  HBaseUnionUtil.getConnection(cubeSeg.getConfig(), cubeSeg.getStorageLocationIdentifier());
+            ExecutorService projThreadPool = projectThreadPoolMap
+                    .get(queryContext.getProject().toUpperCase(Locale.ROOT));
             final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
-                    queryContext.getConnectionPool(HBaseConnection.getCoprocessorPool()));
+                    queryContext.getConnectionPool(projThreadPool));
 
             table.coprocessorService(CubeVisitService.class, startKey, endKey, //
                     new Batch.Call<CubeVisitService, CubeVisitResponse>() {