You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/09 04:54:42 UTC

[2/3] incubator-kylin git commit: KYLIN-1127 Broadcaster be configured by KylinConfig (instead of spring profile)

KYLIN-1127 Broadcaster be configured by KylinConfig (instead of spring profile)


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f4595ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f4595ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f4595ae

Branch: refs/heads/2.x-staging
Commit: 2f4595ae9db582a5b1c498534916f90db22b55b2
Parents: f1dedab
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 11:15:12 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 11:15:12 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/cache/CacheUpdater.java | 11 ----
 .../kylin/common/cache/LocalCacheUpdater.java   | 18 ------
 .../kylin/common/cache/RemoteCacheUpdater.java  | 14 -----
 .../common/restclient/AbstractRestCache.java    | 17 +++---
 .../kylin/common/restclient/Broadcaster.java    | 58 +++++++++++++++-----
 .../restclient/CaseInsensitiveStringCache.java  |  6 +-
 .../common/restclient/SingleValueCache.java     | 16 +++---
 .../org/apache/kylin/cube/CubeDescManager.java  |  3 +-
 .../java/org/apache/kylin/cube/CubeManager.java |  4 +-
 .../apache/kylin/metadata/MetadataManager.java  | 10 +++-
 .../kylin/metadata/project/ProjectManager.java  |  3 +-
 .../kylin/storage/hybrid/HybridManager.java     |  4 +-
 .../engine/streaming/StreamingManager.java      |  3 +-
 .../engine/streaming/cli/StreamingCLI.java      |  7 +--
 .../test_case_data/localmeta/kylin.properties   |  2 +-
 .../kylin/invertedindex/IIDescManager.java      |  3 +-
 .../apache/kylin/invertedindex/IIManager.java   |  4 +-
 .../kylin/rest/service/CacheServiceTest.java    |  6 +-
 .../kylin/source/kafka/KafkaConfigManager.java  | 23 ++++----
 19 files changed, 105 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
deleted file mode 100644
index 615ee14..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings("rawtypes")
-public interface CacheUpdater {
-    void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
deleted file mode 100644
index 8d3b648..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class LocalCacheUpdater implements CacheUpdater {
-    @Override
-    public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
-        if (syncAction == Broadcaster.EVENT.CREATE || syncAction == Broadcaster.EVENT.UPDATE) {
-            cache.putLocal(key, value);
-        } else if (syncAction == Broadcaster.EVENT.DROP) {
-            cache.removeLocal(key);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
deleted file mode 100644
index 2927d2d..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings("rawtypes")
-public class RemoteCacheUpdater implements CacheUpdater {
-    @Override
-    public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
-        Broadcaster.getInstance().queue(type.getType(), syncAction.getType(), key.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
index 68d9be5..fc030b4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
@@ -18,8 +18,7 @@
 
 package org.apache.kylin.common.restclient;
 
-import org.apache.kylin.common.cache.CacheUpdater;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.KylinConfig;
 
 /**
  * @author xjiang
@@ -27,17 +26,17 @@ import org.apache.kylin.common.cache.RemoteCacheUpdater;
  */
 public abstract class AbstractRestCache<K, V> {
 
-    protected static CacheUpdater cacheUpdater = new RemoteCacheUpdater();
-
-    public static void setCacheUpdater(CacheUpdater cu) {
-        cacheUpdater = cu;
-    }
-
+    protected final KylinConfig config;
     protected final Broadcaster.TYPE syncType;
 
-    protected AbstractRestCache(Broadcaster.TYPE syncType) {
+    protected AbstractRestCache(KylinConfig config, Broadcaster.TYPE syncType) {
+        this.config = config;
         this.syncType = syncType;
     }
+    
+    public Broadcaster getBroadcaster() {
+        return Broadcaster.getInstance(config);
+    }
 
     public abstract void put(K key, V value);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 80ec33c..871d77c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -43,24 +44,52 @@ public class Broadcaster {
 
     private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
 
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
+
+    public static Broadcaster getInstance(KylinConfig config) {
+        Broadcaster r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (Broadcaster.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+
+            r = new Broadcaster(config);
+            CACHE.put(config, r);
+            if (CACHE.size() > 1) {
+                logger.warn("More than one cubemanager singleton exist");
+            }
+            return r;
+        }
+    }
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    // ============================================================================
+
     private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
 
     private AtomicLong counter = new AtomicLong();
 
-    static class BroadcasterHolder {
-        static final Broadcaster INSTANCE = new Broadcaster();
-    }
+    private Broadcaster(final KylinConfig config) {
+        final String[] nodes = config.getRestServers();
+        if (nodes == null || nodes.length < 1) {
+            logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
+            broadcastEvents = null; // disable the broadcaster
+            return;
+        }
+        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
 
-    private Broadcaster() {
         Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
             @Override
             public void run() {
-                final String[] nodes = KylinConfig.getInstanceFromEnv().getRestServers();
-                if (nodes == null || nodes.length < 1) {//TODO if the node count is greater than 1, it means it is a cluster
-                    logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
-                    return;
-                }
-                logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
                 final List<RestClient> restClients = Lists.newArrayList();
                 for (String node : nodes) {
                     restClients.add(new RestClient(node));
@@ -90,10 +119,6 @@ public class Broadcaster {
         });
     }
 
-    public static Broadcaster getInstance() {
-        return BroadcasterHolder.INSTANCE;
-    }
-
     /**
      * Broadcast the cubedesc event out
      * 
@@ -101,6 +126,9 @@ public class Broadcaster {
      *            event action
      */
     public void queue(String type, String action, String key) {
+        if (broadcastEvents == null)
+            return;
+
         try {
             counter.incrementAndGet();
             broadcastEvents.putFirst(new BroadcastEvent(type, action, key));
@@ -138,7 +166,7 @@ public class Broadcaster {
     }
 
     public enum TYPE {
-        ALL("all"), CUBE("cube"),STREAMING("streaming"),KAFKA("kafka"),CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
+        ALL("all"), CUBE("cube"), STREAMING("streaming"), KAFKA("kafka"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
         private String text;
 
         TYPE(String text) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
index 68e3c04..2bcddbf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
@@ -20,12 +20,14 @@ package org.apache.kylin.common.restclient;
 
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import org.apache.kylin.common.KylinConfig;
+
 /**
  */
 public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> {
 
-    public CaseInsensitiveStringCache(Broadcaster.TYPE syncType) {
-        super(syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
+    public CaseInsensitiveStringCache(KylinConfig config, Broadcaster.TYPE syncType) {
+        super(config, syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
index cb6c286..9acfeca 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
@@ -25,6 +25,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.kylin.common.KylinConfig;
+
 /**
  * @author xjiang
  * 
@@ -33,12 +35,12 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
 
     private final ConcurrentMap<K, V> innerCache;
 
-    public SingleValueCache(Broadcaster.TYPE syncType) {
-        this(syncType, new ConcurrentHashMap<K, V>());
+    public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType) {
+        this(config, syncType, new ConcurrentHashMap<K, V>());
     }
 
-    public SingleValueCache(Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
-        super(syncType);
+    public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
+        super(config, syncType);
         this.innerCache = innerCache;
     }
 
@@ -48,9 +50,9 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
         innerCache.put(key, value);
 
         if (!exists) {
-            cacheUpdater.updateCache(key, value, Broadcaster.EVENT.CREATE, syncType, this);
+            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.CREATE.getType(), key.toString());
         } else {
-            cacheUpdater.updateCache(key, value, Broadcaster.EVENT.UPDATE, syncType, this);
+            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.UPDATE.getType(), key.toString());
         }
     }
 
@@ -64,7 +66,7 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
         innerCache.remove(key);
         
         if (exists) {
-            cacheUpdater.updateCache(key, null, Broadcaster.EVENT.DROP, syncType, this);
+            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.DROP.getType(), key.toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 8e75d29..c50836c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -57,7 +57,7 @@ public class CubeDescManager {
 
     private KylinConfig config;
     // name ==> CubeDesc
-    private CaseInsensitiveStringCache<CubeDesc> cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(Broadcaster.TYPE.CUBE_DESC);
+    private CaseInsensitiveStringCache<CubeDesc> cubeDescMap;
 
     public static CubeDescManager getInstance(KylinConfig config) {
         CubeDescManager r = CACHE.get(config);
@@ -90,6 +90,7 @@ public class CubeDescManager {
     private CubeDescManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeDescManager with config " + config);
         this.config = config;
+        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, Broadcaster.TYPE.CUBE_DESC);
         reloadAllCubeDesc();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 9b7a024..a249cd7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -116,7 +116,7 @@ public class CubeManager implements IRealizationProvider {
 
     private KylinConfig config;
     // cube name ==> CubeInstance
-    private CaseInsensitiveStringCache<CubeInstance> cubeMap = new CaseInsensitiveStringCache<CubeInstance>(Broadcaster.TYPE.CUBE);
+    private CaseInsensitiveStringCache<CubeInstance> cubeMap;
     // "table/column" ==> lookup table
     //    private SingleValueCache<String, LookupStringTable> lookupTables = new SingleValueCache<String, LookupStringTable>(Broadcaster.TYPE.METADATA);
 
@@ -126,7 +126,7 @@ public class CubeManager implements IRealizationProvider {
     private CubeManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeManager with config " + config);
         this.config = config;
-
+        this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, Broadcaster.TYPE.CUBE);
         loadAllCubeInstance();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index b7e7dc5..c907afd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -103,11 +103,11 @@ public class MetadataManager {
 
     private KylinConfig config;
     // table name ==> SourceTable
-    private CaseInsensitiveStringCache<TableDesc> srcTableMap = new CaseInsensitiveStringCache<TableDesc>(Broadcaster.TYPE.TABLE);
+    private CaseInsensitiveStringCache<TableDesc> srcTableMap;
     // name => value
-    private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(Broadcaster.TYPE.TABLE);
+    private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap;
     // name => DataModelDesc
-    private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(Broadcaster.TYPE.DATA_MODEL);
+    private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap;
 
     private MetadataManager(KylinConfig config) throws IOException {
         init(config);
@@ -198,6 +198,10 @@ public class MetadataManager {
 
     private void init(KylinConfig config) throws IOException {
         this.config = config;
+        this.srcTableMap = new CaseInsensitiveStringCache<TableDesc>(config, Broadcaster.TYPE.TABLE);
+        this.srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(config, Broadcaster.TYPE.TABLE);
+        this.dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(config, Broadcaster.TYPE.DATA_MODEL);
+        
         reloadAllSourceTable();
         reloadAllSourceTableExd();
         reloadAllDataModel();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index fd41f59..8304128 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -81,11 +81,12 @@ public class ProjectManager {
     private KylinConfig config;
     private ProjectL2Cache l2Cache;
     // project name => ProjrectInstance
-    private CaseInsensitiveStringCache<ProjectInstance> projectMap = new CaseInsensitiveStringCache<ProjectInstance>(Broadcaster.TYPE.PROJECT);
+    private CaseInsensitiveStringCache<ProjectInstance> projectMap;
 
     private ProjectManager(KylinConfig config) throws IOException {
         logger.info("Initializing ProjectManager with metadata url " + config);
         this.config = config;
+        this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, Broadcaster.TYPE.PROJECT);
         this.l2Cache = new ProjectL2Cache(this);
 
         reloadAllProjects();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 0f00f1a..9392ef5 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -76,12 +76,12 @@ public class HybridManager implements IRealizationProvider {
 
     private KylinConfig config;
 
-    private CaseInsensitiveStringCache<HybridInstance> hybridMap = new CaseInsensitiveStringCache<HybridInstance>(Broadcaster.TYPE.HYBRID);
+    private CaseInsensitiveStringCache<HybridInstance> hybridMap;
 
     private HybridManager(KylinConfig config) throws IOException {
         logger.info("Initializing HybridManager with config " + config);
         this.config = config;
-
+        this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, Broadcaster.TYPE.HYBRID);
         loadAllHybridInstance();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index fa7d0f8..8cabe1b 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -79,7 +79,7 @@ public class StreamingManager {
     private KylinConfig config;
 
     // name ==> StreamingConfig
-    private CaseInsensitiveStringCache<StreamingConfig> streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(Broadcaster.TYPE.STREAMING);
+    private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
 
     public static void clearCache() {
         CACHE.clear();
@@ -87,6 +87,7 @@ public class StreamingManager {
 
     private StreamingManager(KylinConfig config) throws IOException {
         this.config = config;
+        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, Broadcaster.TYPE.STREAMING);
         reloadAllStreaming();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index e3a7133..a73a6ac 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -34,11 +34,10 @@
 
 package org.apache.kylin.engine.streaming.cli;
 
-import com.google.common.base.Preconditions;
+import java.util.List;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
-import org.apache.kylin.common.restclient.AbstractRestCache;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
@@ -48,7 +47,7 @@ import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import com.google.common.base.Preconditions;
 
 public class StreamingCLI {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 92773a1..48f01f5 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -1,7 +1,7 @@
 ## Config for Kylin Engine ##
 
 # List of web servers in use, this enables one web server instance to sync up with other servers.
-kylin.rest.servers=localhost:7070
+#kylin.rest.servers=localhost:7070
 
 # The metadata store in hbase
 kylin.metadata.url=

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
index a166ae7..917fe46 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
@@ -54,7 +54,7 @@ public class IIDescManager {
 
     private KylinConfig config;
     // name ==> IIDesc
-    private CaseInsensitiveStringCache<IIDesc> iiDescMap = new CaseInsensitiveStringCache<IIDesc>(Broadcaster.TYPE.INVERTED_INDEX_DESC);
+    private CaseInsensitiveStringCache<IIDesc> iiDescMap;
 
     public static IIDescManager getInstance(KylinConfig config) {
         IIDescManager r = CACHE.get(config);
@@ -87,6 +87,7 @@ public class IIDescManager {
     private IIDescManager(KylinConfig config) throws IOException {
         logger.info("Initializing IIDescManager with config " + config);
         this.config = config;
+        this.iiDescMap = new CaseInsensitiveStringCache<IIDesc>(config, Broadcaster.TYPE.INVERTED_INDEX_DESC);
         reloadAllIIDesc();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 5633004..b6dfdf1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -92,7 +92,7 @@ public class IIManager implements IRealizationProvider {
 
     private KylinConfig config;
     // ii name ==> IIInstance
-    private CaseInsensitiveStringCache<IIInstance> iiMap = new CaseInsensitiveStringCache<IIInstance>(Broadcaster.TYPE.INVERTED_INDEX);
+    private CaseInsensitiveStringCache<IIInstance> iiMap;
 
     // for generation hbase table name of a new segment
     private Multimap<String, String> usedStorageLocation = HashMultimap.create();
@@ -100,7 +100,7 @@ public class IIManager implements IRealizationProvider {
     private IIManager(KylinConfig config) throws IOException {
         logger.info("Initializing IIManager with config " + config);
         this.config = config;
-
+        this.iiMap = new CaseInsensitiveStringCache<IIInstance>(config, Broadcaster.TYPE.INVERTED_INDEX);
         loadAllIIInstance();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index af1bbc0..b90c10a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -71,7 +71,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
     public static void beforeClass() throws Exception {
         staticCreateTestMetadata();
         configA = KylinConfig.getInstanceFromEnv();
+        configA.setProperty("kylin.rest.servers", "localhost:7070");
         configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
+        configB.setProperty("kylin.rest.servers", "localhost:7070");
         configB.setMetadataUrl("../examples/test_metadata");
 
         server = new Server(7070);
@@ -209,7 +211,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testCubeCRUD() throws Exception {
-        final Broadcaster broadcaster = Broadcaster.getInstance();
+        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
         broadcaster.getCounterAndClear();
 
         getStore().deleteResource("/cube/a_whole_new_cube.json");
@@ -306,7 +308,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
     public void testMetaCRUD() throws Exception {
         final MetadataManager metadataManager = MetadataManager.getInstance(configA);
         final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
-        final Broadcaster broadcaster = Broadcaster.getInstance();
+        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
         broadcaster.getCounterAndClear();
 
         TableDesc tableDesc = createTestTableDesc();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 3032d13..8cf51b6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -34,10 +34,12 @@
 
 package org.apache.kylin.source.kafka;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -45,17 +47,15 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
 
 /**
  */
@@ -69,7 +69,7 @@ public class KafkaConfigManager {
     private KylinConfig config;
 
     // name ==> StreamingConfig
-    private CaseInsensitiveStringCache<KafkaConfig> kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(Broadcaster.TYPE.KAFKA);
+    private CaseInsensitiveStringCache<KafkaConfig> kafkaMap;
 
     public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
 
@@ -79,6 +79,7 @@ public class KafkaConfigManager {
 
     private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
+        this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, Broadcaster.TYPE.KAFKA);
         reloadAllKafkaConfig();
     }