You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/02/24 10:46:57 UTC

kylin git commit: KYLIN_2464, replace ConcurrentHashMap with ConcurrentMap

Repository: kylin
Updated Branches:
  refs/heads/master 6adb73d3a -> 693c6faf2


KYLIN_2464, replace ConcurrentHashMap with ConcurrentMap

Signed-off-by: Li Yang <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/693c6faf
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/693c6faf
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/693c6faf

Branch: refs/heads/master
Commit: 693c6faf2f38d19816c8e0c9e0a48c6caaaf18cd
Parents: 6adb73d
Author: Cheng Wang <ch...@kyligence.io>
Authored: Fri Feb 24 18:33:48 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Feb 24 18:46:44 2017 +0800

----------------------------------------------------------------------
 .../kylin/common/persistence/ResourceStore.java |  4 ++--
 .../common/util/MemoryBudgetController.java     |  3 ++-
 .../org/apache/kylin/cube/CubeDescManager.java  | 16 +++++++-------
 .../java/org/apache/kylin/cube/CubeManager.java |  3 ++-
 .../apache/kylin/dict/DictionaryManager.java    |  3 ++-
 .../kylin/dict/lookup/SnapshotManager.java      |  3 ++-
 .../org/apache/kylin/job/dao/ExecutableDao.java |  3 ++-
 .../kylin/job/execution/ExecutableManager.java  | 22 +++++++++++---------
 .../impl/threadpool/DistributedScheduler.java   |  3 ++-
 .../apache/kylin/metadata/MetadataManager.java  |  3 ++-
 .../badquery/BadQueryHistoryManager.java        |  3 ++-
 .../kylin/metadata/cachesync/Broadcaster.java   |  3 ++-
 .../kylin/metadata/project/ProjectManager.java  | 13 ++++++------
 .../realization/RealizationRegistry.java        |  3 ++-
 .../metadata/streaming/StreamingManager.java    |  3 ++-
 .../kylin/storage/hybrid/HybridManager.java     |  3 ++-
 .../kylin/source/kafka/KafkaConfigManager.java  |  9 ++++----
 17 files changed, 58 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 77143b0..b0e06f5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -63,10 +64,9 @@ abstract public class ResourceStore {
     public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
     public static final String BAD_QUERY_RESOURCE_ROOT = "/bad_query";
 
-
     protected static final String DEFAULT_STORE_NAME = "kylin_metadata";
 
-    private static final ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
+    private static final ConcurrentMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
 
     private static final ArrayList<Class<? extends ResourceStore>> knownImpl = new ArrayList<Class<? extends ResourceStore>>();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index ade929c..7a0b919 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.common.util;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
@@ -64,7 +65,7 @@ public class MemoryBudgetController {
 
     // all budget numbers are in MB
     private final int totalBudgetMB;
-    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+    private final ConcurrentMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
     private int totalReservedMB;
     private final ReentrantLock lock = new ReentrantLock();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 1bf7e97..00fa705 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -63,7 +64,7 @@ public class CubeDescManager {
     public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<CubeDesc>(CubeDesc.class);
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, CubeDescManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeDescManager>();
+    private static final ConcurrentMap<KylinConfig, CubeDescManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeDescManager>();
 
     public static CubeDescManager getInstance(KylinConfig config) {
         CubeDescManager r = CACHE.get(config);
@@ -103,14 +104,14 @@ public class CubeDescManager {
         logger.info("Initializing CubeDescManager with config " + config);
         this.config = config;
         this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc");
-        
+
         // touch lower level metadata before registering my listener
         reloadAllCubeDesc();
         Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc");
     }
-    
+
     private class CubeDescSyncListener extends Broadcaster.Listener {
-        
+
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {
             clearCache();
@@ -122,7 +123,7 @@ public class CubeDescManager {
             for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
                 if (real instanceof CubeInstance) {
                     String descName = ((CubeInstance) real).getDescName();
-                    reloadCubeDescLocal(descName);        
+                    reloadCubeDescLocal(descName);
                 }
             }
         }
@@ -132,12 +133,12 @@ public class CubeDescManager {
             String cubeDescName = cacheKey;
             CubeDesc cubeDesc = getCubeDesc(cubeDescName);
             String modelName = cubeDesc == null ? null : cubeDesc.getModel().getName();
-            
+
             if (event == Event.DROP)
                 removeLocalCubeDesc(cubeDescName);
             else
                 reloadCubeDescLocal(cubeDescName);
-            
+
             for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByModel(modelName)) {
                 broadcaster.notifyProjectSchemaUpdate(prj.getName());
             }
@@ -237,7 +238,6 @@ public class CubeDescManager {
         return cubeDesc;
     }
 
-
     /**
      * if there is some change need be applied after getting a cubeDesc from front-end, do it here
      * @param cubeDesc

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 9670b89..073f516 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -33,6 +33,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -89,7 +90,7 @@ public class CubeManager implements IRealizationProvider {
     private static final Logger logger = LoggerFactory.getLogger(CubeManager.class);
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, CubeManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeManager>();
+    private static final ConcurrentMap<KylinConfig, CubeManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeManager>();
 
     public static CubeManager getInstance(KylinConfig config) {
         CubeManager r = CACHE.get(config);

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 3ba24cf..427bd14 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -59,7 +60,7 @@ public class DictionaryManager {
     private static final DictionaryInfo NONE_INDICATOR = new DictionaryInfo();
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, DictionaryManager> CACHE = new ConcurrentHashMap<KylinConfig, DictionaryManager>();
+    private static final ConcurrentMap<KylinConfig, DictionaryManager> CACHE = new ConcurrentHashMap<KylinConfig, DictionaryManager>();
 
     public static DictionaryManager getInstance(KylinConfig config) {
         DictionaryManager r = CACHE.get(config);

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index b45d017..a912696 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup;
 import java.io.IOException;
 import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -47,7 +48,7 @@ public class SnapshotManager {
     private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
+    private static final ConcurrentMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
 
     public static SnapshotManager getInstance(KylinConfig config) {
         SnapshotManager r = SERVICE_CACHE.get(config);

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 96505e6..70799d8 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -43,7 +44,7 @@ public class ExecutableDao {
     private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
     private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
     private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
-    private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
+    private static final ConcurrentMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
 
     private ResourceStore store;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 48cedb5..0c86d72 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -18,9 +18,13 @@
 
 package org.apache.kylin.job.execution;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.job.dao.ExecutableDao;
@@ -31,18 +35,16 @@ import org.apache.kylin.job.exception.PersistentException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  */
 public class ExecutableManager {
 
     private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
-    private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
+    private static final ConcurrentMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
 
     private final KylinConfig config;
     private final ExecutableDao executableDao;
@@ -347,7 +349,7 @@ public class ExecutableManager {
             for (AbstractExecutable task : tasks) {
                 if (task.getId().compareTo(stepId) >= 0) {
                     logger.debug("rollback task : " + task);
-                    updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "");
+                    updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "");
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 84e62d5..1f2e958 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -72,7 +73,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     private DistributedJobLock jobLock;
 
     private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
-    private static final ConcurrentHashMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
+    private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
     //keep all segments having running job
     private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>();
     private volatile boolean initialized = false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index 49ec96e..9427ace 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -65,7 +66,7 @@ public class MetadataManager {
     public static final Serializer<ExternalFilterDesc> EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>(ExternalFilterDesc.class);
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, MetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, MetadataManager>();
+    private static final ConcurrentMap<KylinConfig, MetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, MetadataManager>();
 
     public static MetadataManager getInstance(KylinConfig config) {
         MetadataManager r = CACHE.get(config);

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
index 86e282e..c7eb133 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.badquery;
 import java.io.IOException;
 import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -35,7 +36,7 @@ public class BadQueryHistoryManager {
     public static final Serializer<BadQueryHistory> BAD_QUERY_INSTANCE_SERIALIZER = new JsonSerializer<>(BadQueryHistory.class);
     private static final Logger logger = LoggerFactory.getLogger(BadQueryHistoryManager.class);
 
-    private static final ConcurrentHashMap<KylinConfig, BadQueryHistoryManager> CACHE = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<KylinConfig, BadQueryHistoryManager> CACHE = new ConcurrentHashMap<>();
     private KylinConfig kylinConfig;
 
     private BadQueryHistoryManager(KylinConfig config) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 17b644d..5b45d9e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -63,7 +64,7 @@ public class Broadcaster {
     public static final String SYNC_PRJ_DATA = "project_data"; // the special entity to indicate project data has change, e.g. cube/raw_table update
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
+    private static final ConcurrentMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
 
     public static Broadcaster getInstance(KylinConfig config) {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index ca4f7f1..bb1e3ed 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -48,7 +49,7 @@ import com.google.common.collect.Lists;
 
 public class ProjectManager {
     private static final Logger logger = LoggerFactory.getLogger(ProjectManager.class);
-    private static final ConcurrentHashMap<KylinConfig, ProjectManager> CACHE = new ConcurrentHashMap<KylinConfig, ProjectManager>();
+    private static final ConcurrentMap<KylinConfig, ProjectManager> CACHE = new ConcurrentHashMap<KylinConfig, ProjectManager>();
     public static final Serializer<ProjectInstance> PROJECT_SERIALIZER = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
 
     public static ProjectManager getInstance(KylinConfig config) {
@@ -98,7 +99,7 @@ public class ProjectManager {
     }
 
     private class ProjectSyncListener extends Broadcaster.Listener {
-        
+
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {
             clearCache();
@@ -107,12 +108,12 @@ public class ProjectManager {
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
             String project = cacheKey;
-            
+
             if (event == Event.DROP)
                 removeProjectLocal(project);
             else
                 reloadProjectLocal(project);
-            
+
             broadcaster.notifyProjectSchemaUpdate(project);
             broadcaster.notifyProjectDataUpdate(project);
         }
@@ -249,7 +250,7 @@ public class ProjectManager {
         projectMap.remove(norm(proj.getName()));
         clearL2Cache();
     }
-    
+
     private void removeProjectLocal(String proj) {
         projectMap.remove(norm(proj));
         clearL2Cache();
@@ -393,7 +394,7 @@ public class ProjectManager {
         }
         return projects;
     }
-    
+
     public ExternalFilterDesc getExternalFilterDesc(String project, String extFilter) {
         return l2Cache.getExternalFilterDesc(project, extFilter);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
index 77e2679..2d1a4a5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
@@ -37,7 +38,7 @@ import com.google.common.collect.Maps;
 public class RealizationRegistry {
 
     private static final Logger logger = LoggerFactory.getLogger(RealizationRegistry.class);
-    private static final ConcurrentHashMap<KylinConfig, RealizationRegistry> CACHE = new ConcurrentHashMap<KylinConfig, RealizationRegistry>();
+    private static final ConcurrentMap<KylinConfig, RealizationRegistry> CACHE = new ConcurrentHashMap<KylinConfig, RealizationRegistry>();
 
     public static RealizationRegistry getInstance(KylinConfig config) {
         RealizationRegistry r = CACHE.get(config);

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
index 8cfe87d..48febeb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -42,7 +43,7 @@ public class StreamingManager {
     private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
+    private static final ConcurrentMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
 
     public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 748e873..2d330c0 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -47,7 +48,7 @@ public class HybridManager implements IRealizationProvider {
     private static final Logger logger = LoggerFactory.getLogger(HybridManager.class);
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, HybridManager> CACHE = new ConcurrentHashMap<KylinConfig, HybridManager>();
+    private static final ConcurrentMap<KylinConfig, HybridManager> CACHE = new ConcurrentHashMap<KylinConfig, HybridManager>();
 
     public static HybridManager getInstance(KylinConfig config) {
         HybridManager r = CACHE.get(config);

http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 775f052..50295c3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -43,7 +44,7 @@ public class KafkaConfigManager {
     private static final Logger logger = LoggerFactory.getLogger(KafkaConfigManager.class);
 
     // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, KafkaConfigManager> CACHE = new ConcurrentHashMap<KylinConfig, KafkaConfigManager>();
+    private static final ConcurrentMap<KylinConfig, KafkaConfigManager> CACHE = new ConcurrentHashMap<KylinConfig, KafkaConfigManager>();
 
     private KylinConfig config;
 
@@ -59,7 +60,7 @@ public class KafkaConfigManager {
     private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
         this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, "kafka");
-        
+
         // touch lower level metadata before registering my listener
         reloadAllKafkaConfig();
         Broadcaster.getInstance(config).registerListener(new KafkaSyncListener(), "kafka");
@@ -195,7 +196,7 @@ public class KafkaConfigManager {
             throw new IllegalArgumentException("No topic info");
         }
 
-        if (kafkaConfig.getKafkaClusterConfigs() == null || kafkaConfig.getKafkaClusterConfigs().size() ==0) {
+        if (kafkaConfig.getKafkaClusterConfigs() == null || kafkaConfig.getKafkaClusterConfigs().size() == 0) {
             throw new IllegalArgumentException("No cluster info");
         }
 
@@ -213,7 +214,7 @@ public class KafkaConfigManager {
     private void removeKafkaConfigLocal(String name) {
         kafkaMap.remove(name);
     }
-    
+
     private void reloadAllKafkaConfig() throws IOException {
         ResourceStore store = getStore();
         logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));