You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/25 05:55:07 UTC
kylin git commit: KYLIN-2033 Broadcaster stronger sync locking and
more comments
Repository: kylin
Updated Branches:
refs/heads/master bf127a916 -> e10f2b922
KYLIN-2033 Broadcaster stronger sync locking and more comments
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e10f2b92
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e10f2b92
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e10f2b92
Branch: refs/heads/master
Commit: e10f2b922006a002ea9cb58ff11a4ecd9aa749c9
Parents: bf127a9
Author: Yang Li <li...@apache.org>
Authored: Sun Sep 25 13:54:51 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Sep 25 13:54:51 2016 +0800
----------------------------------------------------------------------
.../kylin/metadata/cachesync/Broadcaster.java | 119 ++++++++++---------
1 file changed, 65 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e10f2b92/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 75b2333..8d34cc0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -43,7 +43,16 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
- * Broadcast kylin event out
+ * Broadcast metadata changes across all Kylin servers.
+ *
+ * The origin server announce the event via Rest API to all Kylin servers including itself.
+ * On target server, listeners are registered to process events. As part of processing, a
+ * listener can re-notify a new event to other local listeners.
+ *
+ * A typical project schema change event:
+ * - model is update on origin server, a "model" update event is announced
+ * - on all servers, model listener is invoked, reload the model, and notify a "project_schema" update event
+ * - all listeners respond to the "project_schema" update -- reload cube desc, clear project L2 cache, clear calcite data source etc
*/
public class Broadcaster {
@@ -57,13 +66,9 @@ public class Broadcaster {
private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
public static Broadcaster getInstance(KylinConfig config) {
- Broadcaster r = CACHE.get(config);
- if (r != null) {
- return r;
- }
- synchronized (Broadcaster.class) {
- r = CACHE.get(config);
+ synchronized (CACHE) {
+ Broadcaster r = CACHE.get(config);
if (r != null) {
return r;
}
@@ -79,7 +84,9 @@ public class Broadcaster {
// call Broadcaster.getInstance().notifyClearAll() to clear cache
static void clearCache() {
- CACHE.clear();
+ synchronized (CACHE) {
+ CACHE.clear();
+ }
}
// ============================================================================
@@ -134,22 +141,24 @@ public class Broadcaster {
}
public void registerListener(Listener listener, String... entities) {
- // ignore re-registration
- List<Listener> all = listenerMap.get(SYNC_ALL);
- if (all != null && all.contains(listener)) {
- return;
- }
+ synchronized (CACHE) {
+ // ignore re-registration
+ List<Listener> all = listenerMap.get(SYNC_ALL);
+ if (all != null && all.contains(listener)) {
+ return;
+ }
- for (String entity : entities) {
- if (!StringUtils.isBlank(entity))
- addListener(entity, listener);
+ for (String entity : entities) {
+ if (!StringUtils.isBlank(entity))
+ addListener(entity, listener);
+ }
+ addListener(SYNC_ALL, listener);
+ addListener(SYNC_PRJ_SCHEMA, listener);
+ addListener(SYNC_PRJ_DATA, listener);
}
- addListener(SYNC_ALL, listener);
- addListener(SYNC_PRJ_SCHEMA, listener);
- addListener(SYNC_PRJ_DATA, listener);
}
- synchronized private void addListener(String entity, Listener listener) {
+ private void addListener(String entity, Listener listener) {
List<Listener> list = listenerMap.get(entity);
if (list == null) {
list = new ArrayList<>();
@@ -170,42 +179,44 @@ public class Broadcaster {
notifyListener(SYNC_PRJ_DATA, Event.UPDATE, project);
}
- public synchronized void notifyListener(String entity, Event event, String cacheKey) throws IOException {
- List<Listener> list = listenerMap.get(entity);
- if (list == null)
- return;
-
- logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list);
-
- // prevents concurrent modification exception
- list = Lists.newArrayList(list);
- switch (entity) {
- case SYNC_ALL:
- for (Listener l : list) {
- l.onClearAll(this);
- }
- clearCache(); // clear broadcaster too in the end
- break;
- case SYNC_PRJ_SCHEMA:
- ProjectManager.getInstance(config).clearL2Cache();
- for (Listener l : list) {
- l.onProjectSchemaChange(this, cacheKey);
- }
- break;
- case SYNC_PRJ_DATA:
- ProjectManager.getInstance(config).clearL2Cache(); // cube's first becoming ready leads to schema change too
- for (Listener l : list) {
- l.onProjectDataChange(this, cacheKey);
- }
- break;
- default:
- for (Listener l : list) {
- l.onEntityChange(this, entity, event, cacheKey);
+ public void notifyListener(String entity, Event event, String cacheKey) throws IOException {
+ synchronized (CACHE) {
+ List<Listener> list = listenerMap.get(entity);
+ if (list == null)
+ return;
+
+ logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list);
+
+ // prevents concurrent modification exception
+ list = Lists.newArrayList(list);
+ switch (entity) {
+ case SYNC_ALL:
+ for (Listener l : list) {
+ l.onClearAll(this);
+ }
+ clearCache(); // clear broadcaster too in the end
+ break;
+ case SYNC_PRJ_SCHEMA:
+ ProjectManager.getInstance(config).clearL2Cache();
+ for (Listener l : list) {
+ l.onProjectSchemaChange(this, cacheKey);
+ }
+ break;
+ case SYNC_PRJ_DATA:
+ ProjectManager.getInstance(config).clearL2Cache(); // cube's first becoming ready leads to schema change too
+ for (Listener l : list) {
+ l.onProjectDataChange(this, cacheKey);
+ }
+ break;
+ default:
+ for (Listener l : list) {
+ l.onEntityChange(this, entity, event, cacheKey);
+ }
+ break;
}
- break;
+
+ logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey);
}
-
- logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey);
}
/**