You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 05:53:10 UTC
[kylin] 03/04: KYLIN-4409 Limit the number of simultaneous rpcs to
HBase for cube
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1142469c2558f0a9d9caa03d6c574b6363ff227d
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 12:37:17 2020 +0800
KYLIN-4409 Limit the number of simultaneous rpcs to HBase for cube
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../java/org/apache/kylin/common/QueryContext.java | 25 +++-
.../apache/kylin/common/QueryContextFacade.java | 6 +-
.../apache/kylin/common/SubThreadPoolExecutor.java | 127 +++++++++++++++++++++
.../kylin/common/SubThreadPoolExecutorTest.java | 97 ++++++++++++++++
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +-
6 files changed, 255 insertions(+), 6 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e2d7543..dee57d1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1366,6 +1366,10 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.storage.clean-after-delete-operation", FALSE));
}
+ public int getHBaseMaxConnectionThreadsPerQuery() {
+ return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-query", "400"));
+ }
+
// ============================================================================
// ENGINE.MR
// ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 85cc5f8..30d410d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,13 +69,31 @@ public class QueryContext {
private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList();
private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
- QueryContext() {
- this(System.currentTimeMillis());
+ final int maxConnThreads;
+
+ private ExecutorService connPool;
+
+ QueryContext(int maxConnThreads) {
+ this(maxConnThreads, System.currentTimeMillis());
}
- QueryContext(long startMills) {
+ QueryContext(int maxConnThreads, long startMills) {
queryId = RandomUtil.randomUUID().toString();
queryStartMillis = startMills;
+ this.maxConnThreads = maxConnThreads;
+ }
+
+ public ExecutorService getConnectionPool(ExecutorService sharedConnPool) {
+ if (connPool != null) {
+ return connPool;
+ }
+
+ synchronized (this) {
+ if (connPool == null) {
+ connPool = new SubThreadPoolExecutor(sharedConnPool, "QUERY", maxConnThreads);
+ }
+ return connPool;
+ }
}
public long getQueryStartMillis() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
index bd22f68..b92df50 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
@@ -38,7 +38,8 @@ public class QueryContextFacade {
private static final InternalThreadLocal<QueryContext> CURRENT_CTX = new InternalThreadLocal<QueryContext>() {
@Override
protected QueryContext initialValue() {
- QueryContext queryContext = new QueryContext();
+ QueryContext queryContext = new QueryContext(
+ KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery());
RUNNING_CTX_MAP.put(queryContext.getQueryId(), queryContext);
return queryContext;
}
@@ -96,7 +97,8 @@ public class QueryContextFacade {
*/
public static TreeSet<QueryContext> getLongRunningQueries(long runningTime) {
SortedSet<QueryContext> allRunningQueries = getAllRunningQueries();
- QueryContext tmpCtx = new QueryContext(runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid
+ QueryContext tmpCtx = new QueryContext(KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery(),
+ runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid
return (TreeSet<QueryContext>) allRunningQueries.headSet(tmpCtx);
}
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java b/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java
new file mode 100644
index 0000000..986ad7f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class SubThreadPoolExecutor extends AbstractExecutorService {
+
+ private final static Logger logger = LoggerFactory.getLogger(SubThreadPoolExecutor.class);
+
+ private final Semaphore semaphore;
+
+ private final ExecutorService impl;
+
+ private final String subject;
+
+ public SubThreadPoolExecutor(ExecutorService impl, String subject, int maxThreads) {
+ this.impl = impl;
+ this.subject = subject;
+ this.semaphore = new Semaphore(maxThreads);
+ }
+
+ // Obtain a thread resource. If no resources, block it
+ private void obtainThread() {
+ try {
+ semaphore.acquire();
+ logger.debug("Obtain thread for {}", subject);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Release a thread resource
+ private void releaseThread() {
+ semaphore.release();
+ logger.debug("Release thread for {}", subject);
+ }
+
+ public void execute(Runnable command) {
+ obtainThread();
+ impl.execute(command);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ ListenableFutureTask<T> ret = impl instanceof SubThreadPoolExecutor
+ ? (ListenableFutureTask) ((SubThreadPoolExecutor) impl).newTaskFor(runnable, value)
+ : ListenableFutureTask.create(runnable, value);
+ ret.addListener(new Runnable() {
+ @Override
+ public void run() {
+ releaseThread();
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ return ret;
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ ListenableFutureTask<T> ret = impl instanceof SubThreadPoolExecutor
+ ? (ListenableFutureTask) ((SubThreadPoolExecutor) impl).newTaskFor(callable)
+ : ListenableFutureTask.create(callable);
+ ret.addListener(new Runnable() {
+ @Override
+ public void run() {
+ releaseThread();
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ return ret;
+ }
+
+ @Override
+ public void shutdown() {
+ throw new IllegalStateException(
+ "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ throw new IllegalStateException(
+ "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new IllegalStateException(
+ "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java b/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java
new file mode 100644
index 0000000..3f9cc25
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+public class SubThreadPoolExecutorTest {
+ private final static Logger logger = LoggerFactory.getLogger(SubThreadPoolExecutorTest.class);
+
+ private static final long SleepTime = 50L;
+
+ @Test
+ public void testOneLayer() throws InterruptedException, ExecutionException {
+ int nTotalThread = 10;
+ int nTask = 5;
+ int nSubMaxThread = 2;
+ long minNRound = (nTask - 1) / nSubMaxThread + 1;
+
+ ExecutorService executor = Executors.newFixedThreadPool(nTotalThread);
+ SubThreadPoolExecutor subExecutor = new SubThreadPoolExecutor(executor, "layOne", nSubMaxThread);
+ testInner(subExecutor, nTask, minNRound);
+ }
+
+ @Test
+ public void testTwoLayer() throws InterruptedException, ExecutionException {
+ testTwoLayer(5);
+ testTwoLayer(1);
+ }
+
+ private void testTwoLayer(int nSubMaxThread) throws InterruptedException, ExecutionException {
+ int nTotalThread = 10;
+ int nTask = 5;
+ int nSSubMaxThread = 2;
+ long minNRound = (nTask - 1) / Math.min(nSubMaxThread, nSSubMaxThread) + 1;
+
+ ExecutorService executor = Executors.newFixedThreadPool(nTotalThread);
+ SubThreadPoolExecutor subExecutor = new SubThreadPoolExecutor(executor, "layOne", nSubMaxThread);
+ SubThreadPoolExecutor ssubExecutor = new SubThreadPoolExecutor(subExecutor, "layTwo", nSSubMaxThread);
+ testInner(ssubExecutor, nTask, minNRound);
+ }
+
+ private void testInner(SubThreadPoolExecutor subExecutor, int nTask, long minNRound)
+ throws InterruptedException, ExecutionException {
+ Stopwatch sw = new Stopwatch();
+
+ sw.start();
+ List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(nTask);
+ for (int i = 0; i < nTask; i++) {
+ futureList.add(subExecutor.submit(new TestTask()));
+ }
+ for (Future<?> future : futureList) {
+ future.get();
+ }
+ long timeCost = sw.elapsed(TimeUnit.MILLISECONDS);
+ Assert.assertTrue("Time cost should be more than " + timeCost + "ms", timeCost >= minNRound * SleepTime);
+ logger.info("time cost: {}ms", timeCost);
+ }
+
+ private static class TestTask implements Runnable {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(SleepTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 56538af..dde4956 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -259,7 +259,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
try {
final Connection conn = HBaseUnionUtil.getConnection(cubeSeg.getConfig(), cubeSeg.getStorageLocationIdentifier());
final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
- HBaseConnection.getCoprocessorPool());
+ queryContext.getConnectionPool(HBaseConnection.getCoprocessorPool()));
table.coprocessorService(CubeVisitService.class, startKey, endKey, //
new Batch.Call<CubeVisitService, CubeVisitResponse>() {