You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/22 23:50:00 UTC

[16/16] kylin git commit: KYLIN-2033 code done, waiting test

KYLIN-2033 code done, waiting test


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0aaa6913
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0aaa6913
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0aaa6913

Branch: refs/heads/KYLIN-2033
Commit: 0aaa6913c50446302e5224a1387f51fd858b21eb
Parents: 52d0bc1
Author: Yang Li <li...@apache.org>
Authored: Fri Sep 23 07:49:02 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Fri Sep 23 07:49:02 2016 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/AbstractCache.java  |  50 ---
 .../kylin/common/restclient/Broadcaster.java    | 308 ------------------
 .../restclient/CaseInsensitiveStringCache.java  |  43 ---
 .../common/restclient/SingleValueCache.java     | 103 ------
 .../org/apache/kylin/cube/CubeDescManager.java  |  13 +-
 .../java/org/apache/kylin/cube/CubeManager.java |  13 +-
 .../apache/kylin/metadata/MetadataManager.java  |  20 +-
 .../kylin/metadata/cachesync/AbstractCache.java |  50 +++
 .../kylin/metadata/cachesync/Broadcaster.java   | 325 +++++++++++++++++++
 .../cachesync/CaseInsensitiveStringCache.java   |  42 +++
 .../metadata/cachesync/SingleValueCache.java    | 103 ++++++
 .../kylin/metadata/project/ProjectManager.java  |  17 +-
 .../kylin/storage/hybrid/HybridManager.java     |  13 +-
 .../engine/streaming/StreamingManager.java      |  13 +-
 .../kylin/rest/controller/CacheController.java  |  22 +-
 .../apache/kylin/rest/service/CacheService.java | 210 +++---------
 .../apache/kylin/rest/service/CubeService.java  |  30 --
 .../kylin/rest/service/CacheServiceTest.java    |  23 +-
 .../kylin/rest/service/ServiceTestBase.java     |  15 +-
 .../kylin/source/kafka/KafkaConfigManager.java  |  13 +-
 .../storage/hbase/util/CubeMigrationCLI.java    |   2 +-
 21 files changed, 634 insertions(+), 794 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractCache.java
deleted file mode 100644
index 42692ea..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractCache.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- */
-public abstract class AbstractCache<K, V> {
-
-    protected final KylinConfig config;
-    protected final String syncEntity;
-
-    protected AbstractCache(KylinConfig config, String syncEntity) {
-        this.config = config;
-        this.syncEntity = syncEntity;
-    }
-
-    public Broadcaster getBroadcaster() {
-        return Broadcaster.getInstance(config);
-    }
-
-    public abstract void put(K key, V value);
-
-    public abstract void putLocal(K key, V value);
-
-    public abstract void remove(K key);
-
-    public abstract void removeLocal(K key);
-
-    public abstract void clear();
-
-    public abstract int size();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
deleted file mode 100644
index d02859d..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.DaemonThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Broadcast kylin event out
- */
-public class Broadcaster {
-
-    private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
-
-    public static final String SYNC_ALL = "all";                   // the special entity to indicate clear all
-    public static final String SYNC_PRJ_SCHEMA = "project_schema"; // the special entity to indicate project schema has change, e.g. table/model/cube_desc update
-    public static final String SYNC_PRJ_DATA = "project_data";     // the special entity to indicate project data has change, e.g. cube/raw_table update
-
-    // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
-
-    public static Broadcaster getInstance(KylinConfig config) {
-        Broadcaster r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (Broadcaster.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-
-            r = new Broadcaster(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one singleton exist");
-            }
-            return r;
-        }
-    }
-
-    public static void clearCache() {
-        CACHE.clear();
-    }
-
-    // ============================================================================
-
-    private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
-
-    private AtomicLong counter = new AtomicLong();
-
-    private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
-
-    private Broadcaster(final KylinConfig config) {
-        final String[] nodes = config.getRestServers();
-        if (nodes == null || nodes.length < 1) {
-            logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
-            broadcastEvents = null; // disable the broadcaster
-            return;
-        }
-        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
-
-        Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
-            @Override
-            public void run() {
-                final List<RestClient> restClients = Lists.newArrayList();
-                for (String node : nodes) {
-                    restClients.add(new RestClient(node));
-                }
-                final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size());
-                while (true) {
-                    try {
-                        final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
-                        logger.info("new broadcast event:" + broadcastEvent);
-                        for (final RestClient restClient : restClients) {
-                            wipingCachePool.execute(new Runnable() {
-                                @Override
-                                public void run() {
-                                    try {
-                                        restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
-                                    } catch (IOException e) {
-                                        logger.warn("Thread failed during wipe cache at " + broadcastEvent);
-                                    }
-                                }
-                            });
-                        }
-                    } catch (Exception e) {
-                        logger.error("error running wiping", e);
-                    }
-                }
-            }
-        });
-    }
-
-    public void registerListener(Listener listener, String... entities) {
-        for (String entity : entities) {
-            addListener(entity, listener);
-        }
-        addListener(SYNC_ALL, listener);
-        addListener(SYNC_PRJ_SCHEMA, listener);
-        addListener(SYNC_PRJ_DATA, listener);
-    }
-
-    synchronized private void addListener(String entity, Listener listener) {
-        List<Listener> list = listenerMap.get(entity);
-        if (list == null) {
-            list = new ArrayList<>();
-        }
-        list.add(listener);
-        listenerMap.put(entity, list);
-    }
-
-    public void notifyClearAll() throws IOException {
-        notifyListener(SYNC_ALL, Event.UPDATE, SYNC_ALL);
-    }
-    
-    public void notifyProjectSchemaUpdate(String project) throws IOException {
-        notifyListener(SYNC_PRJ_SCHEMA, Event.UPDATE, project);
-    }
-    
-    public void notifyProjectDataUpdate(String project) throws IOException {
-        notifyListener(SYNC_PRJ_DATA, Event.UPDATE, project);
-    }
-    
-    public void notifyListener(String entity, Event event, String cacheKey) throws IOException {
-        List<Listener> list = listenerMap.get(entity);
-        if (list == null)
-            return;
-
-        switch (entity) {
-        case SYNC_ALL: 
-            for (Listener l : list)
-                l.onClearAll(this);
-            break;
-        case SYNC_PRJ_SCHEMA: 
-            for (Listener l : list)
-                l.onProjectSchemaChange(this, cacheKey);
-            break;
-        case SYNC_PRJ_DATA: 
-            for (Listener l : list)
-                l.onProjectDataChange(this, cacheKey);
-            break;
-        default:
-            for (Listener l : list)
-                l.onEntityChange(this, entity, event, cacheKey);
-            break;
-        }
-    }
-
-    /**
-     * Broadcast an event out
-     */
-    public void queue(String entity, String event, String key) {
-        if (broadcastEvents == null)
-            return;
-
-        try {
-            counter.incrementAndGet();
-            broadcastEvents.putFirst(new BroadcastEvent(entity, event, key));
-        } catch (Exception e) {
-            counter.decrementAndGet();
-            logger.error("error putting BroadcastEvent", e);
-        }
-    }
-
-    public long getCounterAndClear() {
-        return counter.getAndSet(0);
-    }
-
-    public enum Event {
-
-        CREATE("create"), UPDATE("update"), DROP("drop");
-        private String text;
-
-        Event(String text) {
-            this.text = text;
-        }
-
-        public String getType() {
-            return text;
-        }
-
-        public static Event getEvent(String event) {
-            for (Event one : values()) {
-                if (one.getType().equalsIgnoreCase(event)) {
-                    return one;
-                }
-            }
-
-            return null;
-        }
-    }
-
-    abstract public static class Listener {
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-        }
-
-        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
-        }
-        
-        public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
-        }
-        
-        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
-        }
-    }
-
-    public static class BroadcastEvent {
-        private String entity;
-        private String event;
-        private String cacheKey;
-
-        public BroadcastEvent(String entity, String event, String cacheKey) {
-            super();
-            this.entity = entity;
-            this.event = event;
-            this.cacheKey = cacheKey;
-        }
-
-        public String getEntity() {
-            return entity;
-        }
-
-        public String getEvent() {
-            return event;
-        }
-
-        public String getCacheKey() {
-            return cacheKey;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((event == null) ? 0 : event.hashCode());
-            result = prime * result + ((cacheKey == null) ? 0 : cacheKey.hashCode());
-            result = prime * result + ((entity == null) ? 0 : entity.hashCode());
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj == null) {
-                return false;
-            }
-            if (this == obj) {
-                return true;
-            }
-            if (getClass() != obj.getClass()) {
-                return false;
-            }
-            BroadcastEvent other = (BroadcastEvent) obj;
-            if (!StringUtils.equals(event, other.event)) {
-                return false;
-            }
-            if (!StringUtils.equals(cacheKey, other.cacheKey)) {
-                return false;
-            }
-            if (!StringUtils.equals(entity, other.entity)) {
-                return false;
-            }
-            return true;
-        }
-
-        @Override
-        public String toString() {
-            return Objects.toStringHelper(this).add("type", entity).add("name", cacheKey).add("action", event).toString();
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
deleted file mode 100644
index acc50bd..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- */
-public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> {
-
-    public CaseInsensitiveStringCache(KylinConfig config, String syncEntity, Broadcaster.Listener listener) {
-        super(config, syncEntity, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
-        getBroadcaster().registerListener(listener, syncEntity);
-    }
-
-    @Override
-    public void put(String key, V value) {
-        super.put(key, value);
-    }
-
-    @Override
-    public void putLocal(String key, V value) {
-        super.putLocal(key, value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
deleted file mode 100644
index 80dff33..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- * @author xjiang
- */
-public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> {
-
-    private final ConcurrentMap<K, V> innerCache;
-
-    public SingleValueCache(KylinConfig config, String syncEntity) {
-        this(config, syncEntity, new ConcurrentHashMap<K, V>());
-    }
-
-    public SingleValueCache(KylinConfig config, String syncEntity, ConcurrentMap<K, V> innerCache) {
-        super(config, syncEntity);
-        this.innerCache = innerCache;
-    }
-
-    public void put(K key, V value) {
-        boolean exists = innerCache.containsKey(key);
-
-        innerCache.put(key, value);
-
-        if (!exists) {
-            getBroadcaster().queue(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString());
-        } else {
-            getBroadcaster().queue(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString());
-        }
-    }
-
-    public void putLocal(K key, V value) {
-        innerCache.put(key, value);
-    }
-
-    public void remove(K key) {
-        boolean exists = innerCache.containsKey(key);
-
-        innerCache.remove(key);
-
-        if (exists) {
-            getBroadcaster().queue(syncEntity, Broadcaster.Event.DROP.getType(), key.toString());
-        }
-    }
-
-    public void removeLocal(K key) {
-        innerCache.remove(key);
-    }
-
-    public void clear() {
-        innerCache.clear();
-    }
-
-    public int size() {
-        return innerCache.size();
-    }
-
-    public V get(K key) {
-        return innerCache.get(key);
-    }
-
-    public Collection<V> values() {
-        return innerCache.values();
-    }
-
-    public boolean containsKey(String key) {
-        return innerCache.containsKey(key);
-    }
-
-    public Map<K, V> getMap() {
-        return Collections.unmodifiableMap(innerCache);
-    }
-
-    public Set<K> keySet() {
-        return innerCache.keySet();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 19e0eb8..ee98bcf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -27,15 +27,15 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.CubeMetadataValidator;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -94,11 +94,14 @@ public class CubeDescManager {
     private CubeDescManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeDescManager with config " + config);
         this.config = config;
-        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc", new SyncListener());
+        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc");
+        
+        // touch lower level metadata before registering my listener
         reloadAllCubeDesc();
+        Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc");
     }
     
-    private class SyncListener extends Broadcaster.Listener {
+    private class CubeDescSyncListener extends Broadcaster.Listener {
         
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index f86301f..2fadedb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -39,9 +39,6 @@ import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -53,6 +50,9 @@ import org.apache.kylin.dict.lookup.LookupStringTable;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -132,11 +132,14 @@ public class CubeManager implements IRealizationProvider {
     private CubeManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeManager with config " + config);
         this.config = config;
-        this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube", new SyncListener());
+        this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
+        
+        // touch lower level metadata before registering my listener
         loadAllCubeInstance();
+        Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
     }
 
-    private class SyncListener extends Broadcaster.Listener {
+    private class CubeSyncListener extends Broadcaster.Listener {
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {
             clearCache();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index 6803941..63da3f4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -37,10 +37,10 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.ExternalFilterDesc;
@@ -254,15 +254,21 @@ public class MetadataManager {
 
     private void init(KylinConfig config) throws IOException {
         this.config = config;
-        this.srcTableMap = new CaseInsensitiveStringCache<>(config, "table", new SrcTableSyncListener());
-        this.srcTableExdMap = new CaseInsensitiveStringCache<>(config, "table_ext", new SrcTableExtSyncListener());
-        this.dataModelDescMap = new CaseInsensitiveStringCache<>(config, "data_model", new DataModelSyncListener());
-        this.extFilterMap = new CaseInsensitiveStringCache<>(config, "external_filter", new ExtFilterSyncListener());
+        this.srcTableMap = new CaseInsensitiveStringCache<>(config, "table");
+        this.srcTableExdMap = new CaseInsensitiveStringCache<>(config, "table_ext");
+        this.dataModelDescMap = new CaseInsensitiveStringCache<>(config, "data_model");
+        this.extFilterMap = new CaseInsensitiveStringCache<>(config, "external_filter");
 
         reloadAllSourceTable();
         reloadAllSourceTableExd();
         reloadAllDataModel();
         reloadAllExternalFilter();
+        
+        // touch lower level metadata before registering my listener
+        Broadcaster.getInstance(config).registerListener(new SrcTableSyncListener(), "table");
+        Broadcaster.getInstance(config).registerListener(new SrcTableExtSyncListener(), "table_ext");
+        Broadcaster.getInstance(config).registerListener(new DataModelSyncListener(), "data_model");
+        Broadcaster.getInstance(config).registerListener(new ExtFilterSyncListener(), "external_filter");
     }
 
     private class SrcTableSyncListener extends Broadcaster.Listener {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/AbstractCache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/AbstractCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/AbstractCache.java
new file mode 100644
index 0000000..4894817
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/AbstractCache.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ */
+public abstract class AbstractCache<K, V> {
+
+    protected final KylinConfig config;
+    protected final String syncEntity;
+
+    protected AbstractCache(KylinConfig config, String syncEntity) {
+        this.config = config;
+        this.syncEntity = syncEntity;
+    }
+
+    public Broadcaster getBroadcaster() {
+        return Broadcaster.getInstance(config);
+    }
+
+    public abstract void put(K key, V value);
+
+    public abstract void putLocal(K key, V value);
+
+    public abstract void remove(K key);
+
+    public abstract void removeLocal(K key);
+
+    public abstract void clear();
+
+    public abstract int size();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
new file mode 100644
index 0000000..d1baa64
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.DaemonThreadFactory;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Broadcast kylin event out
+ */
+public class Broadcaster {
+
+    private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
+
+    public static final String SYNC_ALL = "all"; // the special entity to indicate clear all
+    public static final String SYNC_PRJ_SCHEMA = "project_schema"; // the special entity to indicate project schema has change, e.g. table/model/cube_desc update
+    public static final String SYNC_PRJ_DATA = "project_data"; // the special entity to indicate project data has change, e.g. cube/raw_table update
+
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
+
+    public static Broadcaster getInstance(KylinConfig config) {
+        Broadcaster r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (Broadcaster.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+
+            r = new Broadcaster(config);
+            CACHE.put(config, r);
+            if (CACHE.size() > 1) {
+                logger.warn("More than one singleton exist");
+            }
+            return r;
+        }
+    }
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+
+    private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
+    private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
+    private AtomicLong counter = new AtomicLong();
+
+    private Broadcaster(final KylinConfig config) {
+        this.config = config;
+
+        final String[] nodes = config.getRestServers();
+        if (nodes == null || nodes.length < 1) {
+            logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
+            broadcastEvents = null; // disable the broadcaster
+            return;
+        }
+        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
+
+        Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
+            @Override
+            public void run() {
+                final List<RestClient> restClients = Lists.newArrayList();
+                for (String node : nodes) {
+                    restClients.add(new RestClient(node));
+                }
+                final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size());
+                while (true) {
+                    try {
+                        final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
+                        logger.info("new broadcast event:" + broadcastEvent);
+                        for (final RestClient restClient : restClients) {
+                            wipingCachePool.execute(new Runnable() {
+                                @Override
+                                public void run() {
+                                    try {
+                                        restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+                                    } catch (IOException e) {
+                                        logger.warn("Thread failed during wipe cache at " + broadcastEvent);
+                                    }
+                                }
+                            });
+                        }
+                    } catch (Exception e) {
+                        logger.error("error running wiping", e);
+                    }
+                }
+            }
+        });
+    }
+
+    public void registerListener(Listener listener, String... entities) {
+        // ignore re-registration
+        List<Listener> all = listenerMap.get(SYNC_ALL);
+        if (all != null && all.contains(listener)) {
+            return;
+        }
+
+        for (String entity : entities) {
+            if (!StringUtils.isBlank(entity))
+                addListener(entity, listener);
+        }
+        addListener(SYNC_ALL, listener);
+        addListener(SYNC_PRJ_SCHEMA, listener);
+        addListener(SYNC_PRJ_DATA, listener);
+    }
+
+    synchronized private void addListener(String entity, Listener listener) {
+        List<Listener> list = listenerMap.get(entity);
+        if (list == null) {
+            list = new ArrayList<>();
+        }
+        list.add(listener);
+        listenerMap.put(entity, list);
+    }
+
+    public void notifyClearAll() throws IOException {
+        notifyListener(SYNC_ALL, Event.UPDATE, SYNC_ALL);
+    }
+
+    public void notifyProjectSchemaUpdate(String project) throws IOException {
+        notifyListener(SYNC_PRJ_SCHEMA, Event.UPDATE, project);
+    }
+
+    public void notifyProjectDataUpdate(String project) throws IOException {
+        notifyListener(SYNC_PRJ_DATA, Event.UPDATE, project);
+    }
+
+    public void notifyListener(String entity, Event event, String cacheKey) throws IOException {
+        List<Listener> list = listenerMap.get(entity);
+        if (list == null)
+            return;
+
+        switch (entity) {
+        case SYNC_ALL:
+            for (Listener l : list) {
+                l.onClearAll(this);
+            }
+            clearCache(); // clear broadcaster too in the end
+            break;
+        case SYNC_PRJ_SCHEMA:
+            ProjectManager.getInstance(config).clearL2Cache();
+            for (Listener l : list) {
+                l.onProjectSchemaChange(this, cacheKey);
+            }
+            break;
+        case SYNC_PRJ_DATA:
+            for (Listener l : list) {
+                l.onProjectDataChange(this, cacheKey);
+            }
+            break;
+        default:
+            for (Listener l : list) {
+                l.onEntityChange(this, entity, event, cacheKey);
+            }
+            break;
+        }
+    }
+
+    /**
+     * Broadcast an event out
+     */
+    public void queue(String entity, String event, String key) {
+        if (broadcastEvents == null)
+            return;
+
+        try {
+            counter.incrementAndGet();
+            broadcastEvents.putFirst(new BroadcastEvent(entity, event, key));
+        } catch (Exception e) {
+            counter.decrementAndGet();
+            logger.error("error putting BroadcastEvent", e);
+        }
+    }
+
+    public long getCounterAndClear() {
+        return counter.getAndSet(0);
+    }
+
+    public enum Event {
+
+        CREATE("create"), UPDATE("update"), DROP("drop");
+        private String text;
+
+        Event(String text) {
+            this.text = text;
+        }
+
+        public String getType() {
+            return text;
+        }
+
+        public static Event getEvent(String event) {
+            for (Event one : values()) {
+                if (one.getType().equalsIgnoreCase(event)) {
+                    return one;
+                }
+            }
+
+            return null;
+        }
+    }
+
+    abstract public static class Listener {
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+        }
+
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+        }
+
+        public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
+        }
+
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+        }
+    }
+
+    public static class BroadcastEvent {
+        private String entity;
+        private String event;
+        private String cacheKey;
+
+        public BroadcastEvent(String entity, String event, String cacheKey) {
+            super();
+            this.entity = entity;
+            this.event = event;
+            this.cacheKey = cacheKey;
+        }
+
+        public String getEntity() {
+            return entity;
+        }
+
+        public String getEvent() {
+            return event;
+        }
+
+        public String getCacheKey() {
+            return cacheKey;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((event == null) ? 0 : event.hashCode());
+            result = prime * result + ((cacheKey == null) ? 0 : cacheKey.hashCode());
+            result = prime * result + ((entity == null) ? 0 : entity.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (this == obj) {
+                return true;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            BroadcastEvent other = (BroadcastEvent) obj;
+            if (!StringUtils.equals(event, other.event)) {
+                return false;
+            }
+            if (!StringUtils.equals(cacheKey, other.cacheKey)) {
+                return false;
+            }
+            if (!StringUtils.equals(entity, other.entity)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return Objects.toStringHelper(this).add("type", entity).add("name", cacheKey).add("action", event).toString();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CaseInsensitiveStringCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CaseInsensitiveStringCache.java
new file mode 100644
index 0000000..b4d0438
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CaseInsensitiveStringCache.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ */
+public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> {
+
+    public CaseInsensitiveStringCache(KylinConfig config, String syncEntity) {
+        super(config, syncEntity, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
+    }
+
+    @Override
+    public void put(String key, V value) {
+        super.put(key, value);
+    }
+
+    @Override
+    public void putLocal(String key, V value) {
+        super.putLocal(key, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
new file mode 100644
index 0000000..4bfaeae
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ * @author xjiang
+ */
+public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> {
+
+    private final ConcurrentMap<K, V> innerCache;
+
+    public SingleValueCache(KylinConfig config, String syncEntity) {
+        this(config, syncEntity, new ConcurrentHashMap<K, V>());
+    }
+
+    public SingleValueCache(KylinConfig config, String syncEntity, ConcurrentMap<K, V> innerCache) {
+        super(config, syncEntity);
+        this.innerCache = innerCache;
+    }
+
+    public void put(K key, V value) {
+        boolean exists = innerCache.containsKey(key);
+
+        innerCache.put(key, value);
+
+        if (!exists) {
+            getBroadcaster().queue(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString());
+        } else {
+            getBroadcaster().queue(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString());
+        }
+    }
+
+    public void putLocal(K key, V value) {
+        innerCache.put(key, value);
+    }
+
+    public void remove(K key) {
+        boolean exists = innerCache.containsKey(key);
+
+        innerCache.remove(key);
+
+        if (exists) {
+            getBroadcaster().queue(syncEntity, Broadcaster.Event.DROP.getType(), key.toString());
+        }
+    }
+
+    public void removeLocal(K key) {
+        innerCache.remove(key);
+    }
+
+    public void clear() {
+        innerCache.clear();
+    }
+
+    public int size() {
+        return innerCache.size();
+    }
+
+    public V get(K key) {
+        return innerCache.get(key);
+    }
+
+    public Collection<V> values() {
+        return innerCache.values();
+    }
+
+    public boolean containsKey(String key) {
+        return innerCache.containsKey(key);
+    }
+
+    public Map<K, V> getMap() {
+        return Collections.unmodifiableMap(innerCache);
+    }
+
+    public Set<K> keySet() {
+        return innerCache.keySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index be69df3..2838e56 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -29,11 +29,11 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ExternalFilterDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -88,13 +88,15 @@ public class ProjectManager {
     private ProjectManager(KylinConfig config) throws IOException {
         logger.info("Initializing ProjectManager with metadata url " + config);
         this.config = config;
-        this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, "project", new SyncListener());
+        this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, "project");
         this.l2Cache = new ProjectL2Cache(this);
 
+        // touch lower level metadata before registering my listener
         reloadAllProjects();
+        Broadcaster.getInstance(config).registerListener(new ProjectSyncListener(), "project");
     }
 
-    private class SyncListener extends Broadcaster.Listener {
+    private class ProjectSyncListener extends Broadcaster.Listener {
         
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {
@@ -102,11 +104,6 @@ public class ProjectManager {
         }
 
         @Override
-        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
-            reloadProjectLocal(project);
-        }
-
-        @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
             String project = cacheKey;
             

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index d73a1a9..4f81b09 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -27,9 +27,9 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -86,11 +86,14 @@ public class HybridManager implements IRealizationProvider {
     private HybridManager(KylinConfig config) throws IOException {
         logger.info("Initializing HybridManager with config " + config);
         this.config = config;
-        this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid", new SyncListener());
+        this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid");
+        
+        // touch lower level metadata before registering my listener
         reloadAllHybridInstance();
+        Broadcaster.getInstance(config).registerListener(new HybridSyncListener(), "hybrid");
     }
 
-    private class SyncListener extends Broadcaster.Listener {
+    private class HybridSyncListener extends Broadcaster.Listener {
         
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 5a3f104..7bf6ca2 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -44,10 +44,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,11 +73,14 @@ public class StreamingManager {
 
     private StreamingManager(KylinConfig config) throws IOException {
         this.config = config;
-        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming", new SyncListener());
+        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
+        
+        // touch lower level metadata before registering my listener
         reloadAllStreaming();
+        Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
     }
 
-    private class SyncListener extends Broadcaster.Listener {
+    private class StreamingSyncListener extends Broadcaster.Listener {
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {
             clearCache();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
index dd9936f..667046b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
@@ -20,8 +20,7 @@ package org.apache.kylin.rest.controller;
 
 import java.io.IOException;
 
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.rest.service.CacheService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +39,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
 @Controller
 @RequestMapping(value = "/cache")
 public class CacheController extends BasicController {
+    
+    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(CacheController.class);
 
     @Autowired
@@ -57,22 +58,7 @@ public class CacheController extends BasicController {
     @RequestMapping(value = "/{entity}/{cacheKey}/{event}", method = { RequestMethod.PUT })
     @ResponseBody
     public void wipeCache(@PathVariable String entity, @PathVariable String event, @PathVariable String cacheKey) throws IOException {
-
-        Event wipeEvent = Broadcaster.Event.getEvent(event);
-
-        logger.info("wipe cache entity: " + entity + " event:" + wipeEvent + " cache key:" + cacheKey);
-
-        switch (wipeEvent) {
-        case CREATE:
-        case UPDATE:
-            cacheService.rebuildCache(entity, cacheKey);
-            break;
-        case DROP:
-            cacheService.removeCache(entity, cacheKey);
-            break;
-        default:
-            throw new RuntimeException("invalid event:" + wipeEvent);
-        }
+        cacheService.notifyMetadataChange(entity, Broadcaster.Event.getEvent(event), cacheKey);
     }
 
     public void setCacheService(CacheService cacheService) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index c121d06..5d29dcd 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -21,37 +21,22 @@ package org.apache.kylin.rest.service;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import javax.annotation.PostConstruct;
 import javax.sql.DataSource;
 
 import org.apache.calcite.jdbc.Driver;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.rest.controller.QueryController;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -74,48 +59,57 @@ public class CacheService extends BasicService {
 
     @Autowired
     private CacheManager cacheManager;
+    
+    private Broadcaster.Listener cacheSyncListener = new Broadcaster.Listener() {
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+            removeOLAPDataSource(project);
+        }
 
-    @PostConstruct
-    public void initCacheListener() throws IOException {
-        
-        Broadcaster.getInstance(getConfig()).registerListener(new Broadcaster.Listener() {
-            @Override
-            public void notify(String entity, Event event, String cacheKey) throws IOException {
-                switch (entity) {
-                case "cube":
-                    String cubeName = cacheKey;
-                    CubeInstance cube = getCubeManager().getCube(cubeName);
+        @Override
+        public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
+            cleanDataCache(project);
+        }
 
-                    cleanDataCache(cube.getUuid());
-                    for (ProjectInstance prj : getProjectManager().findProjects(RealizationType.CUBE, cubeName)) {
-                        removeOLAPDataSource(prj.getName());
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if ("cube".equals(entity) && event == Event.UPDATE) {
+                final String cubeName = cacheKey;
+                new Thread() { // do not block the event broadcast thread
+                    public void run() {
+                        try {
+                            Thread.sleep(1000);
+                            cubeService.updateOnNewSegmentReady(cubeName);
+                        } catch (Throwable ex) {
+                            logger.error("Error in updateOnNewSegmentReady()", ex);
+                        }
                     }
-                    break;
-                case "project":
-                    removeOLAPDataSource(cacheKey);
-                    break;
-                }
+                }.run();
             }
-            
-            @Override
-            public void clearAll() throws IOException {
-                
-            }
-        }, "cube", "project");
-    }
+        }
+    };
 
     // for test
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
     }
 
-    protected void cleanDataCache(String storageUUID) {
+    public void notifyMetadataChange(String entity, Event event, String cacheKey) throws IOException {
+        Broadcaster broadcaster = Broadcaster.getInstance(getConfig());
+        
+        // broadcaster can be clearCache() too, make sure listener is registered; re-registration will be ignored
+        broadcaster.registerListener(cacheSyncListener, "cube");
+        
+        broadcaster.notifyListener(entity, event, cacheKey);
+    }
+
+    protected void cleanDataCache(String project) {
         if (cacheManager != null) {
-            logger.info("cleaning cache for " + storageUUID + " (currently remove all entries)");
+            logger.info("cleaning cache for project" + project + " (currently remove all entries)");
             cacheManager.getCache(QueryController.SUCCESS_QUERY_CACHE).removeAll();
             cacheManager.getCache(QueryController.EXCEPTION_QUERY_CACHE).removeAll();
         } else {
-            logger.warn("skip cleaning cache for " + storageUUID);
+            logger.warn("skip cleaning cache for project " + project);
         }
     }
 
@@ -174,130 +168,4 @@ public class CacheService extends BasicService {
         return ret;
     }
 
-    public void rebuildCache(String entity, String cacheKey) {
-        final String log = "rebuild cache type: " + entity + " name:" + cacheKey;
-        logger.info(log);
-        try {
-            switch (entity) {
-            case "cube":
-                rebuildCubeCache(cacheKey);
-                break;
-            case "streaming":
-                
-                break;
-            case "kafka":
-                
-                break;
-            case "cube_desc":
-                
-                break;
-            case "project":
-                reloadProjectCache(cacheKey);
-                break;
-            case "table":
-                clearRealizationCache();
-                break;
-            case "external_filter":
-                
-                break;
-            case "data_model":
-                
-                break;
-            case "all":
-                DictionaryManager.clearCache();
-                MetadataManager.clearCache();
-                CubeDescManager.clearCache();
-                clearRealizationCache();
-                Cuboid.clearCache();
-                ProjectManager.clearCache();
-                KafkaConfigManager.clearCache();
-                StreamingManager.clearCache();
-                HBaseConnection.clearConnCache();
-
-                cleanAllDataCache();
-                removeAllOLAPDataSources();
-                break;
-            default:
-                logger.error("invalid cacheType:" + entity);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("error " + log, e);
-        }
-    }
-
-    private void clearRealizationCache() {
-        CubeManager.clearCache();
-        HybridManager.clearCache();
-        RealizationRegistry.clearCache();
-    }
-
-    private void rebuildCubeCache(String cubeName) {
-        //CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName);
-        //getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName);
-        //reloadProjectCache(getProjectManager().findProjects(RealizationType.CUBE, cubeName));
-        //clean query related cache first
-        if (cube != null) {
-            cleanDataCache(cube.getUuid());
-        }
-        cubeService.updateOnNewSegmentReady(cubeName);
-    }
-
-    public void removeCache(String entity, String cacheKey) {
-        final String log = "remove cache type: " + entity + " name:" + cacheKey;
-        try {
-            switch (entity) {
-            case "cube":
-                removeCubeCache(cacheKey, null);
-                break;
-            case "cube_desc":
-                getCubeDescManager().removeLocalCubeDesc(cacheKey);
-                break;
-            case "project":
-                ProjectManager.clearCache();
-                break;
-            case "table":
-                throw new UnsupportedOperationException(log);
-            case "external_filter":
-                throw new UnsupportedOperationException(log);
-            case "data_model":
-                getMetadataManager().removeModelCache(cacheKey);
-                break;
-            default:
-                throw new RuntimeException("invalid cacheType:" + entity);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("error " + log, e);
-        }
-    }
-
-    private void removeCubeCache(String cubeName, CubeInstance cube) {
-        // you may not get the cube instance if it's already removed from metadata
-        if (cube == null) {
-            cube = getCubeManager().getCube(cubeName);
-        }
-
-        getCubeManager().removeCubeLocal(cubeName);
-        getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName);
-        reloadProjectCache(getProjectManager().findProjects(RealizationType.CUBE, cubeName));
-
-        if (cube != null) {
-            cleanDataCache(cube.getUuid());
-        }
-    }
-
-    private void reloadProjectCache(List<ProjectInstance> projects) {
-        for (ProjectInstance prj : projects) {
-            reloadProjectCache(prj.getName());
-        }
-    }
-
-    private void reloadProjectCache(String projectName) {
-        try {
-            getProjectManager().reloadProjectLocal(projectName);
-        } catch (IOException ex) {
-            logger.warn("Failed to reset project cache", ex);
-        }
-        removeOLAPDataSource(projectName);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index bdf317c..e446045 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -28,13 +28,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.WeakHashMap;
 
-import javax.annotation.PostConstruct;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -96,32 +92,6 @@ public class CubeService extends BasicService {
     @Autowired
     private AccessService accessService;
 
-    @PostConstruct
-    public void initCacheListener() throws IOException {
-        Broadcaster.getInstance(getConfig()).registerListener(new Broadcaster.Listener() {
-            @Override
-            public void notify(String entity, Event event, String cacheKey) throws IOException {
-                if (event == Event.UPDATE) {
-                    final String cubeName = cacheKey;
-                    new Thread() { // do not block the event broadcast thread
-                        public void run() {
-                            try {
-                                Thread.sleep(1000);
-                                updateOnNewSegmentReady(cubeName);
-                            } catch (Throwable ex) {
-                                logger.error("Error in updateOnNewSegmentReady()", ex);
-                            }
-                        }
-                    }.run();
-                }
-            }
-            
-            @Override
-            public void clearAll() throws IOException {
-            }
-        }, "cube");
-    }
-
     @PostFilter(Constant.ACCESS_POST_FILTER_READ)
     public List<CubeInstance> listAllCubes(final String cubeName, final String projectName, final String modelName) {
         List<CubeInstance> cubeInstances = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 6101fb6..483ccc1 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -22,13 +22,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
@@ -36,6 +36,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.LookupDesc;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -109,31 +110,19 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         };
 
         serviceA.setCubeService(cubeServiceA);
-        serviceA.initCubeChangeListener();
         serviceB.setCubeService(cubeServiceB);
-        serviceB.initCubeChangeListener();
 
         context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
             @Override
             public void handle(String entity, String cacheKey, String event) {
-
                 Broadcaster.Event wipeEvent = Broadcaster.Event.getEvent(event);
                 final String log = "wipe cache type: " + entity + " event:" + wipeEvent + " name:" + cacheKey;
                 logger.info(log);
                 try {
-                    switch (wipeEvent) {
-                    case CREATE:
-                    case UPDATE:
-                        serviceA.rebuildCache(entity, cacheKey);
-                        serviceB.rebuildCache(entity, cacheKey);
-                        break;
-                    case DROP:
-                        serviceA.removeCache(entity, cacheKey);
-                        serviceB.removeCache(entity, cacheKey);
-                        break;
-                    default:
-                        throw new RuntimeException("invalid type:" + wipeEvent);
-                    }
+                    serviceA.notifyMetadataChange(entity, wipeEvent, cacheKey);
+                    serviceB.notifyMetadataChange(entity, wipeEvent, cacheKey);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
                 } finally {
                     counter.incrementAndGet();
                 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index ae4c089..3a587e4 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -18,12 +18,9 @@
 
 package org.apache.kylin.rest.service;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -60,12 +57,8 @@ public class ServiceTestBase extends LocalFileMetadataTestCase {
     public void setup() throws Exception {
         this.createTestMetadata();
 
-        MetadataManager.clearCache();
-        CubeDescManager.clearCache();
-        CubeManager.clearCache();
-        RealizationRegistry.clearCache();
-        ProjectManager.clearCache();
-        CacheService.removeAllOLAPDataSources();
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        Broadcaster.getInstance(config).notifyClearAll();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 8b982e2..2de8527 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -44,10 +44,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,11 +74,14 @@ public class KafkaConfigManager {
 
     private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
-        this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, "kafka", new SyncListener());
+        this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, "kafka");
+        
+        // touch lower level metadata before registering my listener
         reloadAllKafkaConfig();
+        Broadcaster.getInstance(config).registerListener(new KafkaSyncListener(), "kafka");
     }
 
-    private class SyncListener extends Broadcaster.Listener {
+    private class KafkaSyncListener extends Broadcaster.Listener {
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {
             clearCache();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aaa6913/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 71ab0d5..52aa7ea 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -44,7 +44,6 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Dictionary;
@@ -57,6 +56,7 @@ import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;