You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/29 05:48:27 UTC

[20/50] kylin git commit: #754 Change broadcast threads to daemon threads

#754 Change broadcast threads to daemon threads


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a4e465d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a4e465d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a4e465d

Branch: refs/heads/master
Commit: 5a4e465dd8b8fda640db74e595491a2b2dd3c9db
Parents: 4d2a548
Author: auphyroc99 <45...@qq.com>
Authored: Tue Jun 20 14:44:01 2017 +0800
Committer: Roger Shi <ro...@gmail.com>
Committed: Tue Jun 20 15:01:11 2017 +0800

----------------------------------------------------------------------
 .../kylin/metadata/cachesync/Broadcaster.java   | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5a4e465d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 4fbfc7c..c9e1130 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -114,26 +114,28 @@ public class Broadcaster {
             @Override
             public void run() {
                 final Map<String, RestClient> restClientMap = Maps.newHashMap();
-                final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+                final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
 
                 while (true) {
                     try {
                         final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
                         String[] restServers = config.getRestServers();
-                        logger.info("Servers in the cluster: " + Arrays.toString(restServers));
+                        logger.debug("Servers in the cluster: " + Arrays.toString(restServers));
                         for (final String node : restServers) {
                             if (restClientMap.containsKey(node) == false) {
                                 restClientMap.put(node, new RestClient(node));
                             }
                         }
 
-                        logger.info("Announcing new broadcast event: " + broadcastEvent);
+                        logger.debug("Announcing new broadcast event: " + broadcastEvent);
                         for (final String node : restServers) {
                             wipingCachePool.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     try {
-                                        restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+                                        restClientMap.get(node).wipeCache(broadcastEvent.getEntity(),
+                                                broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
                                     } catch (IOException e) {
                                         logger.warn("Thread failed during wipe cache at " + broadcastEvent, e);
                                     }
@@ -192,7 +194,8 @@ public class Broadcaster {
         if (list == null)
             return;
 
-        logger.trace("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list);
+        logger.trace("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey
+                + ", listeners=" + list);
 
         // prevents concurrent modification exception
         list = Lists.newArrayList(list);
@@ -222,7 +225,8 @@ public class Broadcaster {
             break;
         }
 
-        logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey);
+        logger.debug(
+                "Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey);
     }
 
     /**
@@ -279,7 +283,8 @@ public class Broadcaster {
         public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
         }
 
-        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey)
+                throws IOException {
         }
     }
 
@@ -343,7 +348,8 @@ public class Broadcaster {
 
         @Override
         public String toString() {
-            return Objects.toStringHelper(this).add("entity", entity).add("event", event).add("cacheKey", cacheKey).toString();
+            return Objects.toStringHelper(this).add("entity", entity).add("event", event).add("cacheKey", cacheKey)
+                    .toString();
         }
 
     }