You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/27 00:35:00 UTC

[36/50] [abbrv] kylin git commit: KYLIN-2033 Broadcaster stronger sync locking and more comments

KYLIN-2033 Broadcaster stronger sync locking and more comments


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e10f2b92
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e10f2b92
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e10f2b92

Branch: refs/heads/1.5.x-CDH5.7
Commit: e10f2b922006a002ea9cb58ff11a4ecd9aa749c9
Parents: bf127a9
Author: Yang Li <li...@apache.org>
Authored: Sun Sep 25 13:54:51 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Sep 25 13:54:51 2016 +0800

----------------------------------------------------------------------
 .../kylin/metadata/cachesync/Broadcaster.java   | 119 ++++++++++---------
 1 file changed, 65 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e10f2b92/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 75b2333..8d34cc0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -43,7 +43,16 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
- * Broadcast kylin event out
+ * Broadcast metadata changes across all Kylin servers.
+ * 
+ * The origin server announce the event via Rest API to all Kylin servers including itself.
+ * On target server, listeners are registered to process events. As part of processing, a 
+ * listener can re-notify a new event to other local listeners.
+ * 
+ * A typical project schema change event:
+ * - model is update on origin server, a "model" update event is announced
+ * - on all servers, model listener is invoked, reload the model, and notify a "project_schema" update event
+ * - all listeners respond to the "project_schema" update -- reload cube desc, clear project L2 cache, clear calcite data source etc
  */
 public class Broadcaster {
 
@@ -57,13 +66,9 @@ public class Broadcaster {
     private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
 
     public static Broadcaster getInstance(KylinConfig config) {
-        Broadcaster r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
 
-        synchronized (Broadcaster.class) {
-            r = CACHE.get(config);
+        synchronized (CACHE) {
+            Broadcaster r = CACHE.get(config);
             if (r != null) {
                 return r;
             }
@@ -79,7 +84,9 @@ public class Broadcaster {
 
     // call Broadcaster.getInstance().notifyClearAll() to clear cache
     static void clearCache() {
-        CACHE.clear();
+        synchronized (CACHE) {
+            CACHE.clear();
+        }
     }
 
     // ============================================================================
@@ -134,22 +141,24 @@ public class Broadcaster {
     }
 
     public void registerListener(Listener listener, String... entities) {
-        // ignore re-registration
-        List<Listener> all = listenerMap.get(SYNC_ALL);
-        if (all != null && all.contains(listener)) {
-            return;
-        }
+        synchronized (CACHE) {
+            // ignore re-registration
+            List<Listener> all = listenerMap.get(SYNC_ALL);
+            if (all != null && all.contains(listener)) {
+                return;
+            }
 
-        for (String entity : entities) {
-            if (!StringUtils.isBlank(entity))
-                addListener(entity, listener);
+            for (String entity : entities) {
+                if (!StringUtils.isBlank(entity))
+                    addListener(entity, listener);
+            }
+            addListener(SYNC_ALL, listener);
+            addListener(SYNC_PRJ_SCHEMA, listener);
+            addListener(SYNC_PRJ_DATA, listener);
         }
-        addListener(SYNC_ALL, listener);
-        addListener(SYNC_PRJ_SCHEMA, listener);
-        addListener(SYNC_PRJ_DATA, listener);
     }
 
-    synchronized private void addListener(String entity, Listener listener) {
+    private void addListener(String entity, Listener listener) {
         List<Listener> list = listenerMap.get(entity);
         if (list == null) {
             list = new ArrayList<>();
@@ -170,42 +179,44 @@ public class Broadcaster {
         notifyListener(SYNC_PRJ_DATA, Event.UPDATE, project);
     }
 
-    public synchronized void notifyListener(String entity, Event event, String cacheKey) throws IOException {
-        List<Listener> list = listenerMap.get(entity);
-        if (list == null)
-            return;
-        
-        logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list);
-        
-        // prevents concurrent modification exception
-        list = Lists.newArrayList(list);
-        switch (entity) {
-        case SYNC_ALL:
-            for (Listener l : list) {
-                l.onClearAll(this);
-            }
-            clearCache(); // clear broadcaster too in the end
-            break;
-        case SYNC_PRJ_SCHEMA:
-            ProjectManager.getInstance(config).clearL2Cache();
-            for (Listener l : list) {
-                l.onProjectSchemaChange(this, cacheKey);
-            }
-            break;
-        case SYNC_PRJ_DATA:
-            ProjectManager.getInstance(config).clearL2Cache(); // cube's first becoming ready leads to schema change too
-            for (Listener l : list) {
-                l.onProjectDataChange(this, cacheKey);
-            }
-            break;
-        default:
-            for (Listener l : list) {
-                l.onEntityChange(this, entity, event, cacheKey);
+    public void notifyListener(String entity, Event event, String cacheKey) throws IOException {
+        synchronized (CACHE) {
+            List<Listener> list = listenerMap.get(entity);
+            if (list == null)
+                return;
+
+            logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list);
+
+            // prevents concurrent modification exception
+            list = Lists.newArrayList(list);
+            switch (entity) {
+            case SYNC_ALL:
+                for (Listener l : list) {
+                    l.onClearAll(this);
+                }
+                clearCache(); // clear broadcaster too in the end
+                break;
+            case SYNC_PRJ_SCHEMA:
+                ProjectManager.getInstance(config).clearL2Cache();
+                for (Listener l : list) {
+                    l.onProjectSchemaChange(this, cacheKey);
+                }
+                break;
+            case SYNC_PRJ_DATA:
+                ProjectManager.getInstance(config).clearL2Cache(); // cube's first becoming ready leads to schema change too
+                for (Listener l : list) {
+                    l.onProjectDataChange(this, cacheKey);
+                }
+                break;
+            default:
+                for (Listener l : list) {
+                    l.onEntityChange(this, entity, event, cacheKey);
+                }
+                break;
             }
-            break;
+
+            logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey);
         }
-        
-        logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey);
     }
 
     /**