You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/01/03 07:21:48 UTC

[kylin] branch 2.5.x updated: KYLIN-3752 Increase broadcaster's concurrency to avoid exceptions

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/2.5.x by this push:
     new 19b2bf7  KYLIN-3752 Increase broadcaster's concurrency to avoid exceptions
19b2bf7 is described below

commit 19b2bf7e5f544d2dc5513c7dd4b7e3ac90196a1e
Author: xbirbird <31...@users.noreply.github.com>
AuthorDate: Wed Jan 2 10:43:19 2019 +0800

    KYLIN-3752 Increase broadcaster's concurrency to avoid exceptions
---
 .../java/org/apache/kylin/metadata/cachesync/Broadcaster.java | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index d5ecc16..b67d10c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -91,14 +91,17 @@ public class Broadcaster {
         this.config = config;
         this.syncErrorHandler = getSyncErrorHandler(config);
         this.announceMainLoop = Executors.newSingleThreadExecutor(new DaemonThreadFactory());
-        this.announceThreadPool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
-
+        
         final String[] nodes = config.getRestServers();
         if (nodes == null || nodes.length < 1) {
             logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
         }
-        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
+        logger.debug("{} nodes in the cluster: {}", (nodes == null ? 0 : nodes.length), Arrays.toString(nodes));
+
+        int corePoolSize = (nodes == null || nodes.length < 1)? 1 : nodes.length;
+        int maximumPoolSize = (nodes == null || nodes.length < 1)? 10 : nodes.length * 2;
+        this.announceThreadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
 
         announceMainLoop.execute(new Runnable() {
             @Override