You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/23 14:16:32 UTC

[2/2] kylin git commit: KYLIN-2033 refactor metadata sync mechanismn

KYLIN-2033 refactor metadata sync mechanismn


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/72005ea5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/72005ea5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/72005ea5

Branch: refs/heads/master
Commit: 72005ea5b7e945268028b88e5fc8f197d7608861
Parents: 858fad6
Author: Li Yang <li...@apache.org>
Authored: Thu Sep 22 18:42:37 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Fri Sep 23 22:15:58 2016 +0800

----------------------------------------------------------------------
 .../common/restclient/AbstractRestCache.java    |  52 ---
 .../kylin/common/restclient/Broadcaster.java    | 272 ---------------
 .../restclient/CaseInsensitiveStringCache.java  |  42 ---
 .../kylin/common/restclient/RestClient.java     |   4 +-
 .../common/restclient/SingleValueCache.java     | 103 ------
 .../common/util/AbstractKylinTestCase.java      |   1 -
 .../org/apache/kylin/cube/CubeDescManager.java  |  52 ++-
 .../org/apache/kylin/cube/CubeInstance.java     |   4 -
 .../java/org/apache/kylin/cube/CubeManager.java |  41 ++-
 .../apache/kylin/metadata/MetadataManager.java  | 107 +++++-
 .../kylin/metadata/cachesync/AbstractCache.java |  50 +++
 .../kylin/metadata/cachesync/Broadcaster.java   | 332 +++++++++++++++++++
 .../cachesync/CaseInsensitiveStringCache.java   |  42 +++
 .../metadata/cachesync/SingleValueCache.java    | 103 ++++++
 .../kylin/metadata/project/ProjectManager.java  |  50 ++-
 .../kylin/storage/hybrid/HybridManager.java     |  57 +++-
 .../engine/streaming/StreamingManager.java      |  25 +-
 .../kylin/rest/controller/CacheController.java  |  33 +-
 .../apache/kylin/rest/service/CacheService.java | 214 +++---------
 .../kylin/rest/service/CacheServiceTest.java    |  32 +-
 .../kylin/rest/service/ServiceTestBase.java     |  15 +-
 .../kylin/source/kafka/KafkaConfigManager.java  |  29 +-
 .../storage/hbase/util/CubeMigrationCLI.java    |   4 +-
 23 files changed, 920 insertions(+), 744 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
deleted file mode 100644
index 584131d..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- * @author xjiang
- * 
- */
-public abstract class AbstractRestCache<K, V> {
-
-    protected final KylinConfig config;
-    protected final Broadcaster.TYPE syncType;
-
-    protected AbstractRestCache(KylinConfig config, Broadcaster.TYPE syncType) {
-        this.config = config;
-        this.syncType = syncType;
-    }
-
-    public Broadcaster getBroadcaster() {
-        return Broadcaster.getInstance(config);
-    }
-
-    public abstract void put(K key, V value);
-
-    public abstract void putLocal(K key, V value);
-
-    public abstract void remove(K key);
-
-    public abstract void removeLocal(K key);
-
-    public abstract void clear();
-
-    public abstract int size();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
deleted file mode 100644
index 230888f..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.DaemonThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-
-/**
- * Broadcast kylin event out
- */
-public class Broadcaster {
-
-    private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
-
-    // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
-
-    public static Broadcaster getInstance(KylinConfig config) {
-        Broadcaster r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (Broadcaster.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-
-            r = new Broadcaster(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one singleton exist");
-            }
-            return r;
-        }
-    }
-
-    public static void clearCache() {
-        CACHE.clear();
-    }
-
-    // ============================================================================
-
-    private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
-
-    private AtomicLong counter = new AtomicLong();
-
-    private Broadcaster(final KylinConfig config) {
-        final String[] nodes = config.getRestServers();
-        if (nodes == null || nodes.length < 1) {
-            logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
-            broadcastEvents = null; // disable the broadcaster
-            return;
-        }
-        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
-
-        Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
-            @Override
-            public void run() {
-                final List<RestClient> restClients = Lists.newArrayList();
-                for (String node : nodes) {
-                    restClients.add(new RestClient(node));
-                }
-                final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size());
-                while (true) {
-                    try {
-                        final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
-                        logger.info("new broadcast event:" + broadcastEvent);
-                        for (final RestClient restClient : restClients) {
-                            wipingCachePool.execute(new Runnable() {
-                                @Override
-                                public void run() {
-                                    try {
-                                        restClient.wipeCache(broadcastEvent.getType(), broadcastEvent.getAction(), broadcastEvent.getName());
-                                    } catch (IOException e) {
-                                        logger.warn("Thread failed during wipe cache at " + broadcastEvent);
-                                    }
-                                }
-                            });
-                        }
-                    } catch (Exception e) {
-                        logger.error("error running wiping", e);
-                    }
-                }
-            }
-        });
-    }
-
-    /**
-     * Broadcast the cubedesc event out
-     * 
-     * @param action
-     *            event action
-     */
-    public void queue(String type, String action, String key) {
-        if (broadcastEvents == null)
-            return;
-
-        try {
-            counter.incrementAndGet();
-            broadcastEvents.putFirst(new BroadcastEvent(type, action, key));
-        } catch (Exception e) {
-            counter.decrementAndGet();
-            logger.error("error putting BroadcastEvent", e);
-        }
-    }
-
-    public long getCounterAndClear() {
-        return counter.getAndSet(0);
-    }
-
-    public enum EVENT {
-
-        CREATE("create"), UPDATE("update"), DROP("drop");
-        private String text;
-
-        EVENT(String text) {
-            this.text = text;
-        }
-
-        public String getType() {
-            return text;
-        }
-
-        public static EVENT getEvent(String event) {
-            for (EVENT one : values()) {
-                if (one.getType().equalsIgnoreCase(event)) {
-                    return one;
-                }
-            }
-
-            return null;
-        }
-    }
-
-    public enum TYPE {
-        ALL("all"), //
-        PROJECT("project"), //
-        CUBE("cube"), //
-        CUBE_DESC("cube_desc"), //
-        STREAMING("streaming"), //
-        KAFKA("kafka"), //
-        INVERTED_INDEX("inverted_index"), //
-        INVERTED_INDEX_DESC("ii_desc"), // 
-        TABLE("table"), //
-        DATA_MODEL("data_model"), //
-        EXTERNAL_FILTER("external_filter"), //
-        HYBRID("hybrid");
-        
-        private String text;
-
-        TYPE(String text) {
-            this.text = text;
-        }
-
-        public String getType() {
-            return text;
-        }
-
-        /**
-         * @param type
-         * @return
-         */
-        public static TYPE getType(String type) {
-            for (TYPE one : values()) {
-                if (one.getType().equalsIgnoreCase(type)) {
-                    return one;
-                }
-            }
-
-            return null;
-        }
-    }
-
-    public static class BroadcastEvent {
-        private String type;
-        private String action;
-        private String name;
-
-        public BroadcastEvent(String type, String action, String name) {
-            super();
-            this.type = type;
-            this.action = action;
-            this.name = name;
-        }
-
-        public String getType() {
-            return type;
-        }
-
-        public String getAction() {
-            return action;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((action == null) ? 0 : action.hashCode());
-            result = prime * result + ((name == null) ? 0 : name.hashCode());
-            result = prime * result + ((type == null) ? 0 : type.hashCode());
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj == null) {
-                return false;
-            }
-            if (this == obj) {
-                return true;
-            }
-            if (getClass() != obj.getClass()) {
-                return false;
-            }
-            BroadcastEvent other = (BroadcastEvent) obj;
-            if (!StringUtils.equals(action, other.action)) {
-                return false;
-            }
-            if (!StringUtils.equals(name, other.name)) {
-                return false;
-            }
-            if (!StringUtils.equals(type, other.type)) {
-                return false;
-            }
-            return true;
-        }
-
-        @Override
-        public String toString() {
-            return Objects.toStringHelper(this).add("type", type).add("name", name).add("action", action).toString();
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
deleted file mode 100644
index 2bcddbf..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- */
-public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> {
-
-    public CaseInsensitiveStringCache(KylinConfig config, Broadcaster.TYPE syncType) {
-        super(config, syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
-    }
-
-    @Override
-    public void put(String key, V value) {
-        super.put(key, value);
-    }
-
-    @Override
-    public void putLocal(String key, V value) {
-        super.putLocal(key, value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 050d911..46a9e9b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -89,8 +89,8 @@ public class RestClient {
         }
     }
 
-    public void wipeCache(String type, String action, String name) throws IOException {
-        String url = baseUrl + "/cache/" + type + "/" + name + "/" + action;
+    public void wipeCache(String entity, String event, String cacheKey) throws IOException {
+        String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event;
         HttpPut request = new HttpPut(url);
 
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
deleted file mode 100644
index 5d1ca9a..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- * @author xjiang
- */
-public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
-
-    private final ConcurrentMap<K, V> innerCache;
-
-    public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType) {
-        this(config, syncType, new ConcurrentHashMap<K, V>());
-    }
-
-    public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
-        super(config, syncType);
-        this.innerCache = innerCache;
-    }
-
-    public void put(K key, V value) {
-        boolean exists = innerCache.containsKey(key);
-
-        innerCache.put(key, value);
-
-        if (!exists) {
-            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.CREATE.getType(), key.toString());
-        } else {
-            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.UPDATE.getType(), key.toString());
-        }
-    }
-
-    public void putLocal(K key, V value) {
-        innerCache.put(key, value);
-    }
-
-    public void remove(K key) {
-        boolean exists = innerCache.containsKey(key);
-
-        innerCache.remove(key);
-
-        if (exists) {
-            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.DROP.getType(), key.toString());
-        }
-    }
-
-    public void removeLocal(K key) {
-        innerCache.remove(key);
-    }
-
-    public void clear() {
-        innerCache.clear();
-    }
-
-    public int size() {
-        return innerCache.size();
-    }
-
-    public V get(K key) {
-        return innerCache.get(key);
-    }
-
-    public Collection<V> values() {
-        return innerCache.values();
-    }
-
-    public boolean containsKey(String key) {
-        return innerCache.containsKey(key);
-    }
-
-    public Map<K, V> getMap() {
-        return Collections.unmodifiableMap(innerCache);
-    }
-
-    public Set<K> keySet() {
-        return innerCache.keySet();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
index 556019f..14bf90b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
@@ -49,7 +49,6 @@ public abstract class AbstractKylinTestCase {
         cleanupCache();
         System.clearProperty(KylinConfig.KYLIN_CONF);
         KylinConfig.destroyInstance();
-
     }
 
     private static void cleanupCache() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 1b1cf70..d6364fe 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -27,22 +27,26 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.CubeMetadataValidator;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Manager class for CubeDesc; extracted from #CubeManager
+ * 
  * @author shaoshi
- *
  */
 public class CubeDescManager {
 
@@ -90,8 +94,46 @@ public class CubeDescManager {
     private CubeDescManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeDescManager with config " + config);
         this.config = config;
-        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, Broadcaster.TYPE.CUBE_DESC);
+        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc");
+        
+        // touch lower level metadata before registering my listener
         reloadAllCubeDesc();
+        Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc");
+    }
+    
+    private class CubeDescSyncListener extends Broadcaster.Listener {
+        
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+            Cuboid.clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+            for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
+                if (real instanceof CubeInstance) {
+                    String descName = ((CubeInstance) real).getDescName();
+                    reloadCubeDescLocal(descName);        
+                }
+            }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            String cubeDescName = cacheKey;
+            CubeDesc cubeDesc = getCubeDesc(cubeDescName);
+            String modelName = cubeDesc == null ? null : cubeDesc.getModel().getName();
+            
+            if (event == Event.DROP)
+                removeLocalCubeDesc(cubeDescName);
+            else
+                reloadCubeDescLocal(cubeDescName);
+            
+            for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByModel(modelName)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
     }
 
     public CubeDesc getCubeDesc(String name) {
@@ -130,6 +172,8 @@ public class CubeDescManager {
     private CubeDesc loadCubeDesc(String path, boolean allowBroken) throws IOException {
         ResourceStore store = getStore();
         CubeDesc ndesc = store.getResource(path, CubeDesc.class, CUBE_DESC_SERIALIZER);
+        if (ndesc == null)
+            throw new IllegalArgumentException("No cube desc found at " + path);
 
         try {
             ndesc.init(config, getMetadataManager().getAllTablesMap());

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index a2ed051..cad00e1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -249,10 +249,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
     }
 
     public String getDescName() {
-        return descName.toUpperCase();
-    }
-
-    public String getOriginDescName() {
         return descName;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index fd46b54..2fadedb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -39,8 +39,6 @@ import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -52,9 +50,13 @@ import org.apache.kylin.dict.lookup.LookupStringTable;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
@@ -130,8 +132,41 @@ public class CubeManager implements IRealizationProvider {
     private CubeManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeManager with config " + config);
         this.config = config;
-        this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, Broadcaster.TYPE.CUBE);
+        this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
+        
+        // touch lower level metadata before registering my listener
         loadAllCubeInstance();
+        Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
+    }
+
+    private class CubeSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+            for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
+                if (real instanceof CubeInstance) {
+                    reloadCubeLocal(real.getName());
+                }
+            }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            String cubeName = cacheKey;
+            
+            if (event == Event.DROP)
+                removeCubeLocal(cubeName);
+            else
+                reloadCubeLocal(cubeName);
+            
+            for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE, cubeName)) {
+                broadcaster.notifyProjectDataUpdate(prj.getName());
+            }
+        }
     }
 
     public List<CubeInstance> listAllCubes() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index a74dd58..9be3faf 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -37,9 +37,10 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.ExternalFilterDesc;
@@ -253,15 +254,100 @@ public class MetadataManager {
 
     private void init(KylinConfig config) throws IOException {
         this.config = config;
-        this.srcTableMap = new CaseInsensitiveStringCache<>(config, Broadcaster.TYPE.TABLE);
-        this.srcTableExdMap = new CaseInsensitiveStringCache<>(config, Broadcaster.TYPE.TABLE);
-        this.dataModelDescMap = new CaseInsensitiveStringCache<>(config, Broadcaster.TYPE.DATA_MODEL);
-        this.extFilterMap = new CaseInsensitiveStringCache<>(config, Broadcaster.TYPE.EXTERNAL_FILTER);
+        this.srcTableMap = new CaseInsensitiveStringCache<>(config, "table");
+        this.srcTableExdMap = new CaseInsensitiveStringCache<>(config, "table_ext");
+        this.dataModelDescMap = new CaseInsensitiveStringCache<>(config, "data_model");
+        this.extFilterMap = new CaseInsensitiveStringCache<>(config, "external_filter");
 
         reloadAllSourceTable();
         reloadAllSourceTableExd();
         reloadAllDataModel();
         reloadAllExternalFilter();
+        
+        // touch lower level metadata before registering my listener
+        Broadcaster.getInstance(config).registerListener(new SrcTableSyncListener(), "table");
+        Broadcaster.getInstance(config).registerListener(new SrcTableExtSyncListener(), "table_ext");
+        Broadcaster.getInstance(config).registerListener(new DataModelSyncListener(), "data_model");
+        Broadcaster.getInstance(config).registerListener(new ExtFilterSyncListener(), "external_filter");
+    }
+
+    private class SrcTableSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                srcTableMap.removeLocal(cacheKey);
+            else
+                reloadSourceTable(cacheKey);
+            
+            for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByTable(cacheKey)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
+
+    private class SrcTableExtSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                srcTableExdMap.removeLocal(cacheKey);
+            else
+                reloadSourceTableExt(cacheKey);
+            
+            for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByTable(cacheKey)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
+
+    private class DataModelSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+            for (String model : ProjectManager.getInstance(config).getProject(project).getModels()) {
+                reloadDataModelDesc(model);
+            }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                dataModelDescMap.removeLocal(cacheKey);
+            else
+                reloadDataModelDesc(cacheKey);
+            
+            for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByModel(cacheKey)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
+
+    private class ExtFilterSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                extFilterMap.removeLocal(cacheKey);
+            else
+                reloadExtFilter(cacheKey);
+        }
     }
 
     private void reloadAllSourceTableExd() throws IOException {
@@ -454,8 +540,8 @@ public class MetadataManager {
             dataModelDesc.init(config, this.getAllTablesMap());
             dataModelDescMap.putLocal(dataModelDesc.getName(), dataModelDesc);
             return dataModelDesc;
-        } catch (IOException e) {
-            throw new IllegalStateException("Error to load" + path, e);
+        } catch (Exception e) {
+            throw new IllegalStateException("Error to load " + path, e);
         }
     }
 
@@ -484,9 +570,10 @@ public class MetadataManager {
         String name = desc.getName();
         if (dataModelDescMap.containsKey(name))
             throw new IllegalArgumentException("DataModelDesc '" + name + "' already exists");
-        ProjectManager.getInstance(config).updateModelToProject(name, projectName);
         desc.setOwner(owner);
-        return saveDataModelDesc(desc);
+        desc = saveDataModelDesc(desc);
+        ProjectManager.getInstance(config).updateModelToProject(name, projectName);
+        return desc;
     }
 
     public DataModelDesc updateDataModelDesc(DataModelDesc desc) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/AbstractCache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/AbstractCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/AbstractCache.java
new file mode 100644
index 0000000..4894817
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/AbstractCache.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ */
+public abstract class AbstractCache<K, V> {
+
+    protected final KylinConfig config;
+    protected final String syncEntity;
+
+    protected AbstractCache(KylinConfig config, String syncEntity) {
+        this.config = config;
+        this.syncEntity = syncEntity;
+    }
+
+    public Broadcaster getBroadcaster() {
+        return Broadcaster.getInstance(config);
+    }
+
+    public abstract void put(K key, V value);
+
+    public abstract void putLocal(K key, V value);
+
+    public abstract void remove(K key);
+
+    public abstract void removeLocal(K key);
+
+    public abstract void clear();
+
+    public abstract int size();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
new file mode 100644
index 0000000..73dd0a7
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.DaemonThreadFactory;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Broadcast kylin event out
+ */
+public class Broadcaster {
+
+    private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
+
+    public static final String SYNC_ALL = "all"; // the special entity to indicate clear all
+    public static final String SYNC_PRJ_SCHEMA = "project_schema"; // the special entity to indicate project schema has change, e.g. table/model/cube_desc update
+    public static final String SYNC_PRJ_DATA = "project_data"; // the special entity to indicate project data has change, e.g. cube/raw_table update
+
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
+
+    public static Broadcaster getInstance(KylinConfig config) {
+        Broadcaster r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (Broadcaster.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+
+            r = new Broadcaster(config);
+            CACHE.put(config, r);
+            if (CACHE.size() > 1) {
+                logger.warn("More than one singleton exist");
+            }
+            return r;
+        }
+    }
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+
+    private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
+    private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
+    private AtomicLong counter = new AtomicLong();
+
+    private Broadcaster(final KylinConfig config) {
+        this.config = config;
+
+        final String[] nodes = config.getRestServers();
+        if (nodes == null || nodes.length < 1) {
+            logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
+            broadcastEvents = null; // disable the broadcaster
+            return;
+        }
+        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
+
+        Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
+            @Override
+            public void run() {
+                final List<RestClient> restClients = Lists.newArrayList();
+                for (String node : nodes) {
+                    restClients.add(new RestClient(node));
+                }
+                final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size());
+                while (true) {
+                    try {
+                        final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
+                        logger.info("Announcing new broadcast event:" + broadcastEvent);
+                        for (final RestClient restClient : restClients) {
+                            wipingCachePool.execute(new Runnable() {
+                                @Override
+                                public void run() {
+                                    try {
+                                        restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+                                    } catch (IOException e) {
+                                        logger.warn("Thread failed during wipe cache at " + broadcastEvent);
+                                    }
+                                }
+                            });
+                        }
+                    } catch (Exception e) {
+                        logger.error("error running wiping", e);
+                    }
+                }
+            }
+        });
+    }
+
+    public void registerListener(Listener listener, String... entities) {
+        // ignore re-registration
+        List<Listener> all = listenerMap.get(SYNC_ALL);
+        if (all != null && all.contains(listener)) {
+            return;
+        }
+
+        for (String entity : entities) {
+            if (!StringUtils.isBlank(entity))
+                addListener(entity, listener);
+        }
+        addListener(SYNC_ALL, listener);
+        addListener(SYNC_PRJ_SCHEMA, listener);
+        addListener(SYNC_PRJ_DATA, listener);
+    }
+
+    synchronized private void addListener(String entity, Listener listener) {
+        List<Listener> list = listenerMap.get(entity);
+        if (list == null) {
+            list = new ArrayList<>();
+        }
+        list.add(listener);
+        listenerMap.put(entity, list);
+    }
+
+    public void notifyClearAll() throws IOException {
+        notifyListener(SYNC_ALL, Event.UPDATE, SYNC_ALL);
+    }
+
+    public void notifyProjectSchemaUpdate(String project) throws IOException {
+        notifyListener(SYNC_PRJ_SCHEMA, Event.UPDATE, project);
+    }
+
+    public void notifyProjectDataUpdate(String project) throws IOException {
+        notifyListener(SYNC_PRJ_DATA, Event.UPDATE, project);
+    }
+
+    public synchronized void notifyListener(String entity, Event event, String cacheKey) throws IOException {
+        List<Listener> list = listenerMap.get(entity);
+        if (list == null)
+            return;
+        
+        logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list);
+        
+        // prevents concurrent modification exception
+        list = Lists.newArrayList(list);
+        switch (entity) {
+        case SYNC_ALL:
+            for (Listener l : list) {
+                l.onClearAll(this);
+            }
+            clearCache(); // clear broadcaster too in the end
+            break;
+        case SYNC_PRJ_SCHEMA:
+            ProjectManager.getInstance(config).clearL2Cache();
+            for (Listener l : list) {
+                l.onProjectSchemaChange(this, cacheKey);
+            }
+            break;
+        case SYNC_PRJ_DATA:
+            ProjectManager.getInstance(config).clearL2Cache(); // cube's first becoming ready leads to schema change too
+            for (Listener l : list) {
+                l.onProjectDataChange(this, cacheKey);
+            }
+            break;
+        default:
+            for (Listener l : list) {
+                l.onEntityChange(this, entity, event, cacheKey);
+            }
+            break;
+        }
+        
+        logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey);
+    }
+
+    /**
+     * Broadcast an event out
+     */
+    public void queue(String entity, String event, String key) {
+        if (broadcastEvents == null)
+            return;
+
+        try {
+            counter.incrementAndGet();
+            broadcastEvents.putFirst(new BroadcastEvent(entity, event, key));
+        } catch (Exception e) {
+            counter.decrementAndGet();
+            logger.error("error putting BroadcastEvent", e);
+        }
+    }
+
+    public long getCounterAndClear() {
+        return counter.getAndSet(0);
+    }
+
+    public enum Event {
+
+        CREATE("create"), UPDATE("update"), DROP("drop");
+        private String text;
+
+        Event(String text) {
+            this.text = text;
+        }
+
+        public String getType() {
+            return text;
+        }
+
+        public static Event getEvent(String event) {
+            for (Event one : values()) {
+                if (one.getType().equalsIgnoreCase(event)) {
+                    return one;
+                }
+            }
+
+            return null;
+        }
+    }
+
+    abstract public static class Listener {
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+        }
+
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+        }
+
+        public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
+        }
+
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+        }
+    }
+
+    public static class BroadcastEvent {
+        private String entity;
+        private String event;
+        private String cacheKey;
+
+        public BroadcastEvent(String entity, String event, String cacheKey) {
+            super();
+            this.entity = entity;
+            this.event = event;
+            this.cacheKey = cacheKey;
+        }
+
+        public String getEntity() {
+            return entity;
+        }
+
+        public String getEvent() {
+            return event;
+        }
+
+        public String getCacheKey() {
+            return cacheKey;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((event == null) ? 0 : event.hashCode());
+            result = prime * result + ((cacheKey == null) ? 0 : cacheKey.hashCode());
+            result = prime * result + ((entity == null) ? 0 : entity.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (this == obj) {
+                return true;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            BroadcastEvent other = (BroadcastEvent) obj;
+            if (!StringUtils.equals(event, other.event)) {
+                return false;
+            }
+            if (!StringUtils.equals(cacheKey, other.cacheKey)) {
+                return false;
+            }
+            if (!StringUtils.equals(entity, other.entity)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return Objects.toStringHelper(this).add("type", entity).add("name", cacheKey).add("action", event).toString();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CaseInsensitiveStringCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CaseInsensitiveStringCache.java
new file mode 100644
index 0000000..b4d0438
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CaseInsensitiveStringCache.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ */
+public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> {
+
+    public CaseInsensitiveStringCache(KylinConfig config, String syncEntity) {
+        super(config, syncEntity, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
+    }
+
+    @Override
+    public void put(String key, V value) {
+        super.put(key, value);
+    }
+
+    @Override
+    public void putLocal(String key, V value) {
+        super.putLocal(key, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
new file mode 100644
index 0000000..4bfaeae
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ * @author xjiang
+ */
+public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> {
+
+    private final ConcurrentMap<K, V> innerCache;
+
+    public SingleValueCache(KylinConfig config, String syncEntity) {
+        this(config, syncEntity, new ConcurrentHashMap<K, V>());
+    }
+
+    public SingleValueCache(KylinConfig config, String syncEntity, ConcurrentMap<K, V> innerCache) {
+        super(config, syncEntity);
+        this.innerCache = innerCache;
+    }
+
+    public void put(K key, V value) {
+        boolean exists = innerCache.containsKey(key);
+
+        innerCache.put(key, value);
+
+        if (!exists) {
+            getBroadcaster().queue(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString());
+        } else {
+            getBroadcaster().queue(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString());
+        }
+    }
+
+    public void putLocal(K key, V value) {
+        innerCache.put(key, value);
+    }
+
+    public void remove(K key) {
+        boolean exists = innerCache.containsKey(key);
+
+        innerCache.remove(key);
+
+        if (exists) {
+            getBroadcaster().queue(syncEntity, Broadcaster.Event.DROP.getType(), key.toString());
+        }
+    }
+
+    public void removeLocal(K key) {
+        innerCache.remove(key);
+    }
+
+    public void clear() {
+        innerCache.clear();
+    }
+
+    public int size() {
+        return innerCache.size();
+    }
+
+    public V get(K key) {
+        return innerCache.get(key);
+    }
+
+    public Collection<V> values() {
+        return innerCache.values();
+    }
+
+    public boolean containsKey(String key) {
+        return innerCache.containsKey(key);
+    }
+
+    public Map<K, V> getMap() {
+        return Collections.unmodifiableMap(innerCache);
+    }
+
+    public Set<K> keySet() {
+        return innerCache.keySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 1bf9804..2838e56 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -29,10 +29,11 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ExternalFilterDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -87,10 +88,33 @@ public class ProjectManager {
     private ProjectManager(KylinConfig config) throws IOException {
         logger.info("Initializing ProjectManager with metadata url " + config);
         this.config = config;
-        this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, Broadcaster.TYPE.PROJECT);
+        this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, "project");
         this.l2Cache = new ProjectL2Cache(this);
 
+        // touch lower level metadata before registering my listener
         reloadAllProjects();
+        Broadcaster.getInstance(config).registerListener(new ProjectSyncListener(), "project");
+    }
+
+    private class ProjectSyncListener extends Broadcaster.Listener {
+        
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            String project = cacheKey;
+            
+            if (event == Event.DROP)
+                removeProjectLocal(project);
+            else
+                reloadProjectLocal(project);
+            
+            broadcaster.notifyProjectSchemaUpdate(project);
+            broadcaster.notifyProjectDataUpdate(project);
+        }
     }
 
     public void clearL2Cache() {
@@ -224,6 +248,11 @@ public class ProjectManager {
         projectMap.remove(norm(proj.getName()));
         clearL2Cache();
     }
+    
+    private void removeProjectLocal(String proj) {
+        projectMap.remove(norm(proj));
+        clearL2Cache();
+    }
 
     public boolean isModelInProject(String projectName, String modelName) {
         return this.getProject(projectName).containsModel(modelName);
@@ -235,7 +264,7 @@ public class ProjectManager {
     }
 
     public void removeModelFromProjects(String modelName) throws IOException {
-        for (ProjectInstance projectInstance : findProjects(modelName)) {
+        for (ProjectInstance projectInstance : findProjectsByModel(modelName)) {
             projectInstance.removeModel(modelName);
             updateProject(projectInstance);
         }
@@ -344,17 +373,26 @@ public class ProjectManager {
         return result;
     }
 
-    private List<ProjectInstance> findProjects(String modelName) {
+    public List<ProjectInstance> findProjectsByModel(String modelName) {
         List<ProjectInstance> projects = new ArrayList<ProjectInstance>();
         for (ProjectInstance projectInstance : projectMap.values()) {
             if (projectInstance.containsModel(modelName)) {
                 projects.add(projectInstance);
             }
         }
-
         return projects;
     }
 
+    public List<ProjectInstance> findProjectsByTable(String tableIdentity) {
+        List<ProjectInstance> projects = new ArrayList<ProjectInstance>();
+        for (ProjectInstance projectInstance : projectMap.values()) {
+            if (projectInstance.containsTable(tableIdentity)) {
+                projects.add(projectInstance);
+            }
+        }
+        return projects;
+    }
+    
     public ExternalFilterDesc getExternalFilterDesc(String project, String extFilter) {
         return l2Cache.getExternalFilterDesc(project, extFilter);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 0f948cb..4f81b09 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -27,8 +27,11 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -83,18 +86,52 @@ public class HybridManager implements IRealizationProvider {
     private HybridManager(KylinConfig config) throws IOException {
         logger.info("Initializing HybridManager with config " + config);
         this.config = config;
-        this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, Broadcaster.TYPE.HYBRID);
-        loadAllHybridInstance();
+        this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid");
+        
+        // touch lower level metadata before registering my listener
+        reloadAllHybridInstance();
+        Broadcaster.getInstance(config).registerListener(new HybridSyncListener(), "hybrid");
     }
 
-    private void loadAllHybridInstance() throws IOException {
+    private class HybridSyncListener extends Broadcaster.Listener {
+        
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+            for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
+                if (real instanceof HybridInstance) {
+                    reloadHybridInstance(real.getName());
+                }
+            }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            String hybridName = cacheKey;
+            
+            if (event == Event.DROP)
+                hybridMap.removeLocal(hybridName);
+            else
+                reloadHybridInstance(hybridName);
+            
+            for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID, hybridName)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
+
+    private void reloadAllHybridInstance() throws IOException {
         ResourceStore store = getStore();
         List<String> paths = store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json");
 
         logger.debug("Loading Hybrid from folder " + store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT));
 
         for (String path : paths) {
-            loadHybridInstance(path);
+            reloadHybridInstanceAt(path);
         }
 
         logger.debug("Loaded " + paths.size() + " Hybrid(s)");
@@ -111,11 +148,15 @@ public class HybridManager implements IRealizationProvider {
             }
 
             if (includes == true)
-                loadHybridInstance(HybridInstance.concatResourcePath(hybridInstance.getName()));
+                reloadHybridInstance(hybridInstance.getName());
         }
     }
 
-    private synchronized HybridInstance loadHybridInstance(String path) {
+    public void reloadHybridInstance(String name) {
+        reloadHybridInstanceAt(HybridInstance.concatResourcePath(name));
+    }
+    
+    private synchronized HybridInstance reloadHybridInstanceAt(String path) {
         ResourceStore store = getStore();
 
         HybridInstance hybridInstance = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index e4e1359..7bf6ca2 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -44,9 +44,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,8 +73,26 @@ public class StreamingManager {
 
     private StreamingManager(KylinConfig config) throws IOException {
         this.config = config;
-        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, Broadcaster.TYPE.STREAMING);
+        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
+        
+        // touch lower level metadata before registering my listener
         reloadAllStreaming();
+        Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
+    }
+
+    private class StreamingSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                removeStreamingLocal(cacheKey);
+            else
+                reloadStreamingConfigLocal(cacheKey);
+        }
     }
 
     private ResourceStore getStore() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
index 845ffe0..667046b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
@@ -20,8 +20,7 @@ package org.apache.kylin.rest.controller;
 
 import java.io.IOException;
 
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.EVENT;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.rest.service.CacheService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +39,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
 @Controller
 @RequestMapping(value = "/cache")
 public class CacheController extends BasicController {
+    
+    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(CacheController.class);
 
     @Autowired
@@ -48,32 +49,16 @@ public class CacheController extends BasicController {
     /**
      * Wipe system cache
      *
-     * @param type  {@link Broadcaster.TYPE}
-     * @param event {@link Broadcaster.EVENT}
-     * @param name
+     * @param entity  {@link Broadcaster.TYPE}
+     * @param event {@link Broadcaster.Event}
+     * @param cacheKey
      * @return if the action success
      * @throws IOException
      */
-    @RequestMapping(value = "/{type}/{name}/{event}", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{entity}/{cacheKey}/{event}", method = { RequestMethod.PUT })
     @ResponseBody
-    public void wipeCache(@PathVariable String type, @PathVariable String event, @PathVariable String name) throws IOException {
-
-        Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
-        EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
-
-        logger.info("wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name);
-
-        switch (wipeEvent) {
-        case CREATE:
-        case UPDATE:
-            cacheService.rebuildCache(wipeType, name);
-            break;
-        case DROP:
-            cacheService.removeCache(wipeType, name);
-            break;
-        default:
-            throw new RuntimeException("invalid type:" + wipeEvent);
-        }
+    public void wipeCache(@PathVariable String entity, @PathVariable String event, @PathVariable String cacheKey) throws IOException {
+        cacheService.notifyMetadataChange(entity, Broadcaster.Event.getEvent(event), cacheKey);
     }
 
     public void setCacheService(CacheService cacheService) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 9d134d6..5d29dcd 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -21,36 +21,22 @@ package org.apache.kylin.rest.service;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import javax.annotation.PostConstruct;
 import javax.sql.DataSource;
 
 import org.apache.calcite.jdbc.Driver;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.rest.controller.QueryController;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -66,48 +52,64 @@ public class CacheService extends BasicService {
 
     private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
 
-    private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>();
+    private ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>();
 
     @Autowired
     private CubeService cubeService;
 
     @Autowired
     private CacheManager cacheManager;
+    
+    private Broadcaster.Listener cacheSyncListener = new Broadcaster.Listener() {
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+            removeOLAPDataSource(project);
+        }
 
-    @PostConstruct
-    public void initCubeChangeListener() throws IOException {
-        CubeManager cubeMgr = CubeManager.getInstance(getConfig());
-        cubeMgr.setCubeChangeListener(new CubeManager.CubeChangeListener() {
-
-            @Override
-            public void afterCubeCreate(CubeInstance cube) {
-                // no cache need change
-            }
-
-            @Override
-            public void afterCubeUpdate(CubeInstance cube) {
-                rebuildCubeCache(cube.getName());
-            }
+        @Override
+        public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
+            cleanDataCache(project);
+        }
 
-            @Override
-            public void afterCubeDelete(CubeInstance cube) {
-                removeCubeCache(cube.getName(), cube);
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if ("cube".equals(entity) && event == Event.UPDATE) {
+                final String cubeName = cacheKey;
+                new Thread() { // do not block the event broadcast thread
+                    public void run() {
+                        try {
+                            Thread.sleep(1000);
+                            cubeService.updateOnNewSegmentReady(cubeName);
+                        } catch (Throwable ex) {
+                            logger.error("Error in updateOnNewSegmentReady()", ex);
+                        }
+                    }
+                }.run();
             }
-        });
-    }
+        }
+    };
 
     // for test
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
     }
 
-    protected void cleanDataCache(String storageUUID) {
+    public void notifyMetadataChange(String entity, Event event, String cacheKey) throws IOException {
+        Broadcaster broadcaster = Broadcaster.getInstance(getConfig());
+        
+        // broadcaster can be clearCache() too, make sure listener is registered; re-registration will be ignored
+        broadcaster.registerListener(cacheSyncListener, "cube");
+        
+        broadcaster.notifyListener(entity, event, cacheKey);
+    }
+
+    protected void cleanDataCache(String project) {
         if (cacheManager != null) {
-            logger.info("cleaning cache for " + storageUUID + " (currently remove all entries)");
+            logger.info("cleaning cache for project" + project + " (currently remove all entries)");
             cacheManager.getCache(QueryController.SUCCESS_QUERY_CACHE).removeAll();
             cacheManager.getCache(QueryController.EXCEPTION_QUERY_CACHE).removeAll();
         } else {
-            logger.warn("skip cleaning cache for " + storageUUID);
+            logger.warn("skip cleaning cache for project " + project);
         }
     }
 
@@ -120,7 +122,7 @@ public class CacheService extends BasicService {
         }
     }
 
-    private static void removeOLAPDataSource(String project) {
+    private void removeOLAPDataSource(String project) {
         logger.info("removeOLAPDataSource is called for project " + project);
         if (StringUtils.isEmpty(project))
             throw new IllegalArgumentException("removeOLAPDataSource: project name not given");
@@ -129,7 +131,7 @@ public class CacheService extends BasicService {
         olapDataSources.remove(project);
     }
 
-    public static void removeAllOLAPDataSources() {
+    public void removeAllOLAPDataSources() {
         // brutal, yet simplest way
         logger.info("removeAllOLAPDataSources is called.");
         olapDataSources.clear();
@@ -166,134 +168,4 @@ public class CacheService extends BasicService {
         return ret;
     }
 
-    public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) {
-        final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey;
-        logger.info(log);
-        try {
-            switch (cacheType) {
-            case CUBE:
-                rebuildCubeCache(cacheKey);
-                break;
-            case STREAMING:
-                getStreamingManager().reloadStreamingConfigLocal(cacheKey);
-                break;
-            case KAFKA:
-                getKafkaManager().reloadKafkaConfigLocal(cacheKey);
-                break;
-            case CUBE_DESC:
-                getCubeDescManager().reloadCubeDescLocal(cacheKey);
-                break;
-            case PROJECT:
-                reloadProjectCache(cacheKey);
-                break;
-            case TABLE:
-                getMetadataManager().reloadTableCache(cacheKey);
-                CubeDescManager.clearCache();
-                clearRealizationCache();
-                break;
-            case EXTERNAL_FILTER:
-                getMetadataManager().reloadExtFilter(cacheKey);
-                CubeDescManager.clearCache();
-                break;
-            case DATA_MODEL:
-                getMetadataManager().reloadDataModelDesc(cacheKey);
-                CubeDescManager.clearCache();
-                break;
-            case ALL:
-                DictionaryManager.clearCache();
-                MetadataManager.clearCache();
-                CubeDescManager.clearCache();
-                clearRealizationCache();
-                Cuboid.clearCache();
-                ProjectManager.clearCache();
-                KafkaConfigManager.clearCache();
-                StreamingManager.clearCache();
-                HBaseConnection.clearConnCache();
-
-                cleanAllDataCache();
-                removeAllOLAPDataSources();
-                break;
-            default:
-                logger.error("invalid cacheType:" + cacheType);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("error " + log, e);
-        }
-    }
-
-    private void clearRealizationCache() {
-        CubeManager.clearCache();
-        HybridManager.clearCache();
-        RealizationRegistry.clearCache();
-    }
-
-    private void rebuildCubeCache(String cubeName) {
-        CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName);
-        getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName);
-        reloadProjectCache(getProjectManager().findProjects(RealizationType.CUBE, cubeName));
-        //clean query related cache first
-        if (cube != null) {
-            cleanDataCache(cube.getUuid());
-        }
-        cubeService.updateOnNewSegmentReady(cubeName);
-    }
-
-    public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) {
-        final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
-        try {
-            switch (cacheType) {
-            case CUBE:
-                removeCubeCache(cacheKey, null);
-                break;
-            case CUBE_DESC:
-                getCubeDescManager().removeLocalCubeDesc(cacheKey);
-                break;
-            case PROJECT:
-                ProjectManager.clearCache();
-                break;
-            case TABLE:
-                throw new UnsupportedOperationException(log);
-            case EXTERNAL_FILTER:
-                throw new UnsupportedOperationException(log);
-            case DATA_MODEL:
-                getMetadataManager().removeModelCache(cacheKey);
-                break;
-            default:
-                throw new RuntimeException("invalid cacheType:" + cacheType);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("error " + log, e);
-        }
-    }
-
-    private void removeCubeCache(String cubeName, CubeInstance cube) {
-        // you may not get the cube instance if it's already removed from metadata
-        if (cube == null) {
-            cube = getCubeManager().getCube(cubeName);
-        }
-
-        getCubeManager().removeCubeLocal(cubeName);
-        getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName);
-        reloadProjectCache(getProjectManager().findProjects(RealizationType.CUBE, cubeName));
-
-        if (cube != null) {
-            cleanDataCache(cube.getUuid());
-        }
-    }
-
-    private void reloadProjectCache(List<ProjectInstance> projects) {
-        for (ProjectInstance prj : projects) {
-            reloadProjectCache(prj.getName());
-        }
-    }
-
-    private void reloadProjectCache(String projectName) {
-        try {
-            getProjectManager().reloadProjectLocal(projectName);
-        } catch (IOException ex) {
-            logger.warn("Failed to reset project cache", ex);
-        }
-        removeOLAPDataSource(projectName);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 38cd93f..af9ccc0 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -22,13 +22,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
@@ -36,6 +36,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.LookupDesc;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -109,32 +110,19 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         };
 
         serviceA.setCubeService(cubeServiceA);
-        serviceA.initCubeChangeListener();
         serviceB.setCubeService(cubeServiceB);
-        serviceB.initCubeChangeListener();
 
         context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
             @Override
-            public void handle(String type, String name, String event) {
-
-                Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
-                Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
-                final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name;
+            public void handle(String entity, String cacheKey, String event) {
+                Broadcaster.Event wipeEvent = Broadcaster.Event.getEvent(event);
+                final String log = "wipe cache type: " + entity + " event:" + wipeEvent + " name:" + cacheKey;
                 logger.info(log);
                 try {
-                    switch (wipeEvent) {
-                    case CREATE:
-                    case UPDATE:
-                        serviceA.rebuildCache(wipeType, name);
-                        serviceB.rebuildCache(wipeType, name);
-                        break;
-                    case DROP:
-                        serviceA.removeCache(wipeType, name);
-                        serviceB.removeCache(wipeType, name);
-                        break;
-                    default:
-                        throw new RuntimeException("invalid type:" + wipeEvent);
-                    }
+                    serviceA.notifyMetadataChange(entity, wipeEvent, cacheKey);
+                    serviceB.notifyMetadataChange(entity, wipeEvent, cacheKey);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
                 } finally {
                     counter.incrementAndGet();
                 }
@@ -153,12 +141,10 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         counter.set(0L);
-        createTestMetadata();
     }
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
     }
 
     private void waitForCounterAndClear(long count) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index ae4c089..3a587e4 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -18,12 +18,9 @@
 
 package org.apache.kylin.rest.service;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -60,12 +57,8 @@ public class ServiceTestBase extends LocalFileMetadataTestCase {
     public void setup() throws Exception {
         this.createTestMetadata();
 
-        MetadataManager.clearCache();
-        CubeDescManager.clearCache();
-        CubeManager.clearCache();
-        RealizationRegistry.clearCache();
-        ProjectManager.clearCache();
-        CacheService.removeAllOLAPDataSources();
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        Broadcaster.getInstance(config).notifyClearAll();
     }
 
     @After