You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 05:53:10 UTC

[kylin] 03/04: KYLIN-4409 Limit the number of simultaneous rpcs to HBase for cube

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1142469c2558f0a9d9caa03d6c574b6363ff227d
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 12:37:17 2020 +0800

    KYLIN-4409 Limit the number of simultaneous rpcs to HBase for cube
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../java/org/apache/kylin/common/QueryContext.java |  25 +++-
 .../apache/kylin/common/QueryContextFacade.java    |   6 +-
 .../apache/kylin/common/SubThreadPoolExecutor.java | 127 +++++++++++++++++++++
 .../kylin/common/SubThreadPoolExecutorTest.java    |  97 ++++++++++++++++
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        |   2 +-
 6 files changed, 255 insertions(+), 6 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e2d7543..dee57d1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1366,6 +1366,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.storage.clean-after-delete-operation", FALSE));
     }
 
+    public int getHBaseMaxConnectionThreadsPerQuery() {
+        return Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-query", "400"));
+    }
+
     // ============================================================================
     // ENGINE.MR
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 85cc5f8..30d410d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -68,13 +69,31 @@ public class QueryContext {
     private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList();
     private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
 
-    QueryContext() {
-        this(System.currentTimeMillis());
+    final int maxConnThreads;
+
+    private ExecutorService connPool;
+
+    QueryContext(int maxConnThreads) {
+        this(maxConnThreads, System.currentTimeMillis());
     }
 
-    QueryContext(long startMills) {
+    QueryContext(int maxConnThreads, long startMills) {
         queryId = RandomUtil.randomUUID().toString();
         queryStartMillis = startMills;
+        this.maxConnThreads = maxConnThreads;
+    }
+
+    public ExecutorService getConnectionPool(ExecutorService sharedConnPool) {
+        if (connPool != null) {
+            return connPool;
+        }
+
+        synchronized (this) {
+            if (connPool == null) {
+                connPool = new SubThreadPoolExecutor(sharedConnPool, "QUERY", maxConnThreads);
+            }
+            return connPool;
+        }
     }
 
     public long getQueryStartMillis() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
index bd22f68..b92df50 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
@@ -38,7 +38,8 @@ public class QueryContextFacade {
     private static final InternalThreadLocal<QueryContext> CURRENT_CTX = new InternalThreadLocal<QueryContext>() {
         @Override
         protected QueryContext initialValue() {
-            QueryContext queryContext = new QueryContext();
+            QueryContext queryContext = new QueryContext(
+                    KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery());
             RUNNING_CTX_MAP.put(queryContext.getQueryId(), queryContext);
             return queryContext;
         }
@@ -96,7 +97,8 @@ public class QueryContextFacade {
      */
     public static TreeSet<QueryContext> getLongRunningQueries(long runningTime) {
         SortedSet<QueryContext> allRunningQueries = getAllRunningQueries();
-        QueryContext tmpCtx = new QueryContext(runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid
+        QueryContext tmpCtx = new QueryContext(KylinConfig.getInstanceFromEnv().getHBaseMaxConnectionThreadsPerQuery(),
+                runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid
         return (TreeSet<QueryContext>) allRunningQueries.headSet(tmpCtx);
     }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java b/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java
new file mode 100644
index 0000000..986ad7f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/SubThreadPoolExecutor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class SubThreadPoolExecutor extends AbstractExecutorService {
+
+    private final static Logger logger = LoggerFactory.getLogger(SubThreadPoolExecutor.class);
+
+    private final Semaphore semaphore;
+
+    private final ExecutorService impl;
+
+    private final String subject;
+
+    public SubThreadPoolExecutor(ExecutorService impl, String subject, int maxThreads) {
+        this.impl = impl;
+        this.subject = subject;
+        this.semaphore = new Semaphore(maxThreads);
+    }
+
+    // Obtain a thread resource. If no resources, block it
+    private void obtainThread() {
+        try {
+            semaphore.acquire();
+            logger.debug("Obtain thread for {}", subject);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // Release a thread resource
+    private void releaseThread() {
+        semaphore.release();
+        logger.debug("Release thread for {}", subject);
+    }
+
+    public void execute(Runnable command) {
+        obtainThread();
+        impl.execute(command);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+        ListenableFutureTask<T> ret = impl instanceof SubThreadPoolExecutor
+                ? (ListenableFutureTask) ((SubThreadPoolExecutor) impl).newTaskFor(runnable, value)
+                : ListenableFutureTask.create(runnable, value);
+        ret.addListener(new Runnable() {
+            @Override
+            public void run() {
+                releaseThread();
+            }
+        }, MoreExecutors.sameThreadExecutor());
+        return ret;
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+        ListenableFutureTask<T> ret = impl instanceof SubThreadPoolExecutor
+                ? (ListenableFutureTask) ((SubThreadPoolExecutor) impl).newTaskFor(callable)
+                : ListenableFutureTask.create(callable);
+        ret.addListener(new Runnable() {
+            @Override
+            public void run() {
+                releaseThread();
+            }
+        }, MoreExecutors.sameThreadExecutor());
+        return ret;
+    }
+
+    @Override
+    public void shutdown() {
+        throw new IllegalStateException(
+                "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        throw new IllegalStateException(
+                "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new IllegalStateException(
+                "Manual shutdown not supported - SubThreadPoolExecutor is dependent on an external lifecycle");
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java b/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java
new file mode 100644
index 0000000..3f9cc25
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/SubThreadPoolExecutorTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+public class SubThreadPoolExecutorTest {
+    private final static Logger logger = LoggerFactory.getLogger(SubThreadPoolExecutorTest.class);
+
+    private static final long SleepTime = 50L;
+
+    @Test
+    public void testOneLayer() throws InterruptedException, ExecutionException {
+        int nTotalThread = 10;
+        int nTask = 5;
+        int nSubMaxThread = 2;
+        long minNRound = (nTask - 1) / nSubMaxThread + 1;
+
+        ExecutorService executor = Executors.newFixedThreadPool(nTotalThread);
+        SubThreadPoolExecutor subExecutor = new SubThreadPoolExecutor(executor, "layOne", nSubMaxThread);
+        testInner(subExecutor, nTask, minNRound);
+    }
+
+    @Test
+    public void testTwoLayer() throws InterruptedException, ExecutionException {
+        testTwoLayer(5);
+        testTwoLayer(1);
+    }
+
+    private void testTwoLayer(int nSubMaxThread) throws InterruptedException, ExecutionException {
+        int nTotalThread = 10;
+        int nTask = 5;
+        int nSSubMaxThread = 2;
+        long minNRound = (nTask - 1) / Math.min(nSubMaxThread, nSSubMaxThread) + 1;
+
+        ExecutorService executor = Executors.newFixedThreadPool(nTotalThread);
+        SubThreadPoolExecutor subExecutor = new SubThreadPoolExecutor(executor, "layOne", nSubMaxThread);
+        SubThreadPoolExecutor ssubExecutor = new SubThreadPoolExecutor(subExecutor, "layTwo", nSSubMaxThread);
+        testInner(ssubExecutor, nTask, minNRound);
+    }
+
+    private void testInner(SubThreadPoolExecutor subExecutor, int nTask, long minNRound)
+            throws InterruptedException, ExecutionException {
+        Stopwatch sw = new Stopwatch();
+
+        sw.start();
+        List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(nTask);
+        for (int i = 0; i < nTask; i++) {
+            futureList.add(subExecutor.submit(new TestTask()));
+        }
+        for (Future<?> future : futureList) {
+            future.get();
+        }
+        long timeCost = sw.elapsed(TimeUnit.MILLISECONDS);
+        Assert.assertTrue("Time cost should be more than " + timeCost + "ms", timeCost >= minNRound * SleepTime);
+        logger.info("time cost: {}ms", timeCost);
+    }
+
+    private static class TestTask implements Runnable {
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(SleepTime);
+            } catch (InterruptedException e) {
+            }
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 56538af..dde4956 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -259,7 +259,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         try {
             final Connection conn =  HBaseUnionUtil.getConnection(cubeSeg.getConfig(), cubeSeg.getStorageLocationIdentifier());
             final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
-                    HBaseConnection.getCoprocessorPool());
+                    queryContext.getConnectionPool(HBaseConnection.getCoprocessorPool()));
 
             table.coprocessorService(CubeVisitService.class, startKey, endKey, //
                     new Batch.Call<CubeVisitService, CubeVisitResponse>() {