You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 05:53:11 UTC
[kylin] 04/04: KYLIN-4410 Limit the number of simultaneous rpcs to
HBase for project
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit bbe9be3326e3ef74d9ba04a3f069f66151286dc2
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 13:02:25 2020 +0800
KYLIN-4410 Limit the number of simultaneous rpcs to HBase for project
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 ++
.../java/org/apache/kylin/common/QueryContext.java | 9 +++++
.../apache/kylin/rest/service/QueryService.java | 9 +++--
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 43 +++++++++++++++++++++-
4 files changed, 60 insertions(+), 5 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index dee57d1..3490b7a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1370,6 +1370,10 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-query", "400"));
}
+ public int getHBaseMaxConnectionThreadsPerProject() {
+ return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-project", "800"));
+ }
+
// ============================================================================
// ENGINE.MR
// ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 30d410d..5d349c3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -54,6 +54,7 @@ public class QueryContext {
private final String queryId;
private String username;
+ private String project;
private Set<String> groups;
private AtomicLong scannedRows = new AtomicLong();
private AtomicLong returnedRows = new AtomicLong();
@@ -122,6 +123,14 @@ public class QueryContext {
this.username = username;
}
+ public String getProject() {
+ return project;
+ }
+
+ public void setProject(String project) {
+ this.project = project;
+ }
+
public Set<String> getGroups() {
return groups;
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index de101d9..de0c9bc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -404,7 +404,11 @@ public class QueryService extends BasicService {
if (sqlRequest.getBackdoorToggles() != null)
BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
+ // set initial info when starting a query
final QueryContext queryContext = QueryContextFacade.current();
+ queryContext.setUsername(SecurityContextHolder.getContext().getAuthentication().getName());
+ queryContext.setGroups(AclPermissionUtil.getCurrentUserGroups());
+ queryContext.setProject(sqlRequest.getProject());
try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
SQLResponse sqlResponse = null;
@@ -629,10 +633,7 @@ public class QueryService extends BasicService {
try {
conn = QueryConnection.getConnection(sqlRequest.getProject());
- String userInfo = SecurityContextHolder.getContext().getAuthentication().getName();
- QueryContext context = QueryContextFacade.current();
- context.setUsername(userInfo);
- context.setGroups(AclPermissionUtil.getCurrentUserGroups());
+ String userInfo = QueryContextFacade.current().getUsername();
final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext()
.getAuthentication().getAuthorities();
for (GrantedAuthority grantedAuthority : grantedAuthorities) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index dde4956..2eb6372 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.common.SubThreadPoolExecutor;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
@@ -57,7 +58,10 @@ import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTUtil;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
@@ -71,6 +75,9 @@ import org.apache.kylin.storage.hbase.util.HBaseUnionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
@@ -81,6 +88,38 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private static ExecutorService executorService = new LoggableCachedThreadPool();
+ private static LoadingCache<String, ExecutorService> projectThreadPoolMap = CacheBuilder.newBuilder()
+ .build(new CacheLoader<String, ExecutorService>() {
+ @Override
+ public ExecutorService load(String projName) throws Exception {
+ ExecutorService sharedPool = HBaseConnection.getCoprocessorPool();
+ ProjectInstance projInst = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
+ .getProject(projName);
+ return new SubThreadPoolExecutor(sharedPool, "PROJECT",
+ projInst.getConfig().getHBaseMaxConnectionThreadsPerProject());
+ }
+ });
+
+ private static class ProjectThreadPoolSyncListener extends Broadcaster.Listener {
+ @Override
+ public void onClearAll(Broadcaster broadcaster) throws IOException {
+ projectThreadPoolMap.invalidateAll();
+ logger.info("Project level thread pools are cleared");
+ }
+
+ @Override
+ public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
+ throws IOException {
+ projectThreadPoolMap.invalidate(cacheKey);
+ logger.info("Thread pool map for project {} is cleared", cacheKey);
+ }
+ }
+
+ static {
+ Broadcaster.getInstance(KylinConfig.getInstanceFromEnv())
+ .registerStaticListener(new ProjectThreadPoolSyncListener(), "project");
+ }
+
public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
super(segment, cuboid, fullGTInfo, context);
}
@@ -258,8 +297,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
try {
final Connection conn = HBaseUnionUtil.getConnection(cubeSeg.getConfig(), cubeSeg.getStorageLocationIdentifier());
+ ExecutorService projThreadPool = projectThreadPoolMap
+ .get(queryContext.getProject().toUpperCase(Locale.ROOT));
final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
- queryContext.getConnectionPool(HBaseConnection.getCoprocessorPool()));
+ queryContext.getConnectionPool(projThreadPool));
table.coprocessorService(CubeVisitService.class, startKey, endKey, //
new Batch.Call<CubeVisitService, CubeVisitResponse>() {