You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/02/24 10:46:57 UTC
kylin git commit: KYLIN_2464,
replace ConcurrentHashMap with ConcurrentMap
Repository: kylin
Updated Branches:
refs/heads/master 6adb73d3a -> 693c6faf2
KYLIN_2464, replace ConcurrentHashMap with ConcurrentMap
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/693c6faf
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/693c6faf
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/693c6faf
Branch: refs/heads/master
Commit: 693c6faf2f38d19816c8e0c9e0a48c6caaaf18cd
Parents: 6adb73d
Author: Cheng Wang <ch...@kyligence.io>
Authored: Fri Feb 24 18:33:48 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Feb 24 18:46:44 2017 +0800
----------------------------------------------------------------------
.../kylin/common/persistence/ResourceStore.java | 4 ++--
.../common/util/MemoryBudgetController.java | 3 ++-
.../org/apache/kylin/cube/CubeDescManager.java | 16 +++++++-------
.../java/org/apache/kylin/cube/CubeManager.java | 3 ++-
.../apache/kylin/dict/DictionaryManager.java | 3 ++-
.../kylin/dict/lookup/SnapshotManager.java | 3 ++-
.../org/apache/kylin/job/dao/ExecutableDao.java | 3 ++-
.../kylin/job/execution/ExecutableManager.java | 22 +++++++++++---------
.../impl/threadpool/DistributedScheduler.java | 3 ++-
.../apache/kylin/metadata/MetadataManager.java | 3 ++-
.../badquery/BadQueryHistoryManager.java | 3 ++-
.../kylin/metadata/cachesync/Broadcaster.java | 3 ++-
.../kylin/metadata/project/ProjectManager.java | 13 ++++++------
.../realization/RealizationRegistry.java | 3 ++-
.../metadata/streaming/StreamingManager.java | 3 ++-
.../kylin/storage/hybrid/HybridManager.java | 3 ++-
.../kylin/source/kafka/KafkaConfigManager.java | 9 ++++----
17 files changed, 58 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 77143b0..b0e06f5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -63,10 +64,9 @@ abstract public class ResourceStore {
public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
public static final String BAD_QUERY_RESOURCE_ROOT = "/bad_query";
-
protected static final String DEFAULT_STORE_NAME = "kylin_metadata";
- private static final ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
+ private static final ConcurrentMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
private static final ArrayList<Class<? extends ResourceStore>> knownImpl = new ArrayList<Class<? extends ResourceStore>>();
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index ade929c..7a0b919 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -19,6 +19,7 @@
package org.apache.kylin.common.util;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
@@ -64,7 +65,7 @@ public class MemoryBudgetController {
// all budget numbers are in MB
private final int totalBudgetMB;
- private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+ private final ConcurrentMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
private int totalReservedMB;
private final ReentrantLock lock = new ReentrantLock();
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 1bf7e97..00fa705 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -63,7 +64,7 @@ public class CubeDescManager {
public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<CubeDesc>(CubeDesc.class);
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, CubeDescManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeDescManager>();
+ private static final ConcurrentMap<KylinConfig, CubeDescManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeDescManager>();
public static CubeDescManager getInstance(KylinConfig config) {
CubeDescManager r = CACHE.get(config);
@@ -103,14 +104,14 @@ public class CubeDescManager {
logger.info("Initializing CubeDescManager with config " + config);
this.config = config;
this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc");
-
+
// touch lower level metadata before registering my listener
reloadAllCubeDesc();
Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc");
}
-
+
private class CubeDescSyncListener extends Broadcaster.Listener {
-
+
@Override
public void onClearAll(Broadcaster broadcaster) throws IOException {
clearCache();
@@ -122,7 +123,7 @@ public class CubeDescManager {
for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
if (real instanceof CubeInstance) {
String descName = ((CubeInstance) real).getDescName();
- reloadCubeDescLocal(descName);
+ reloadCubeDescLocal(descName);
}
}
}
@@ -132,12 +133,12 @@ public class CubeDescManager {
String cubeDescName = cacheKey;
CubeDesc cubeDesc = getCubeDesc(cubeDescName);
String modelName = cubeDesc == null ? null : cubeDesc.getModel().getName();
-
+
if (event == Event.DROP)
removeLocalCubeDesc(cubeDescName);
else
reloadCubeDescLocal(cubeDescName);
-
+
for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByModel(modelName)) {
broadcaster.notifyProjectSchemaUpdate(prj.getName());
}
@@ -237,7 +238,6 @@ public class CubeDescManager {
return cubeDesc;
}
-
/**
* if there is some change need be applied after getting a cubeDesc from front-end, do it here
* @param cubeDesc
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 9670b89..073f516 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -33,6 +33,7 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -89,7 +90,7 @@ public class CubeManager implements IRealizationProvider {
private static final Logger logger = LoggerFactory.getLogger(CubeManager.class);
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, CubeManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeManager>();
+ private static final ConcurrentMap<KylinConfig, CubeManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeManager>();
public static CubeManager getInstance(KylinConfig config) {
CubeManager r = CACHE.get(config);
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 3ba24cf..427bd14 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -59,7 +60,7 @@ public class DictionaryManager {
private static final DictionaryInfo NONE_INDICATOR = new DictionaryInfo();
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, DictionaryManager> CACHE = new ConcurrentHashMap<KylinConfig, DictionaryManager>();
+ private static final ConcurrentMap<KylinConfig, DictionaryManager> CACHE = new ConcurrentHashMap<KylinConfig, DictionaryManager>();
public static DictionaryManager getInstance(KylinConfig config) {
DictionaryManager r = CACHE.get(config);
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index b45d017..a912696 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup;
import java.io.IOException;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -47,7 +48,7 @@ public class SnapshotManager {
private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
+ private static final ConcurrentMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
public static SnapshotManager getInstance(KylinConfig config) {
SnapshotManager r = SERVICE_CACHE.get(config);
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 96505e6..70799d8 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -43,7 +44,7 @@ public class ExecutableDao {
private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
- private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
+ private static final ConcurrentMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
private ResourceStore store;
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 48cedb5..0c86d72 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -18,9 +18,13 @@
package org.apache.kylin.job.execution;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.job.dao.ExecutableDao;
@@ -31,18 +35,16 @@ import org.apache.kylin.job.exception.PersistentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
*/
public class ExecutableManager {
private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
- private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
+ private static final ConcurrentMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
private final KylinConfig config;
private final ExecutableDao executableDao;
@@ -347,7 +349,7 @@ public class ExecutableManager {
for (AbstractExecutable task : tasks) {
if (task.getId().compareTo(stepId) >= 0) {
logger.debug("rollback task : " + task);
- updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "");
+ updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "");
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 84e62d5..1f2e958 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -72,7 +73,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private DistributedJobLock jobLock;
private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
- private static final ConcurrentHashMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
+ private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
//keep all segments having running job
private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>();
private volatile boolean initialized = false;
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index 49ec96e..9427ace 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -65,7 +66,7 @@ public class MetadataManager {
public static final Serializer<ExternalFilterDesc> EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>(ExternalFilterDesc.class);
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, MetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, MetadataManager>();
+ private static final ConcurrentMap<KylinConfig, MetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, MetadataManager>();
public static MetadataManager getInstance(KylinConfig config) {
MetadataManager r = CACHE.get(config);
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
index 86e282e..c7eb133 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.badquery;
import java.io.IOException;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -35,7 +36,7 @@ public class BadQueryHistoryManager {
public static final Serializer<BadQueryHistory> BAD_QUERY_INSTANCE_SERIALIZER = new JsonSerializer<>(BadQueryHistory.class);
private static final Logger logger = LoggerFactory.getLogger(BadQueryHistoryManager.class);
- private static final ConcurrentHashMap<KylinConfig, BadQueryHistoryManager> CACHE = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<KylinConfig, BadQueryHistoryManager> CACHE = new ConcurrentHashMap<>();
private KylinConfig kylinConfig;
private BadQueryHistoryManager(KylinConfig config) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 17b644d..5b45d9e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
@@ -63,7 +64,7 @@ public class Broadcaster {
public static final String SYNC_PRJ_DATA = "project_data"; // the special entity to indicate project data has change, e.g. cube/raw_table update
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
+ private static final ConcurrentMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
public static Broadcaster getInstance(KylinConfig config) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index ca4f7f1..bb1e3ed 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -48,7 +49,7 @@ import com.google.common.collect.Lists;
public class ProjectManager {
private static final Logger logger = LoggerFactory.getLogger(ProjectManager.class);
- private static final ConcurrentHashMap<KylinConfig, ProjectManager> CACHE = new ConcurrentHashMap<KylinConfig, ProjectManager>();
+ private static final ConcurrentMap<KylinConfig, ProjectManager> CACHE = new ConcurrentHashMap<KylinConfig, ProjectManager>();
public static final Serializer<ProjectInstance> PROJECT_SERIALIZER = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
public static ProjectManager getInstance(KylinConfig config) {
@@ -98,7 +99,7 @@ public class ProjectManager {
}
private class ProjectSyncListener extends Broadcaster.Listener {
-
+
@Override
public void onClearAll(Broadcaster broadcaster) throws IOException {
clearCache();
@@ -107,12 +108,12 @@ public class ProjectManager {
@Override
public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
String project = cacheKey;
-
+
if (event == Event.DROP)
removeProjectLocal(project);
else
reloadProjectLocal(project);
-
+
broadcaster.notifyProjectSchemaUpdate(project);
broadcaster.notifyProjectDataUpdate(project);
}
@@ -249,7 +250,7 @@ public class ProjectManager {
projectMap.remove(norm(proj.getName()));
clearL2Cache();
}
-
+
private void removeProjectLocal(String proj) {
projectMap.remove(norm(proj));
clearL2Cache();
@@ -393,7 +394,7 @@ public class ProjectManager {
}
return projects;
}
-
+
public ExternalFilterDesc getExternalFilterDesc(String project, String extFilter) {
return l2Cache.getExternalFilterDesc(project, extFilter);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
index 77e2679..2d1a4a5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
@@ -37,7 +38,7 @@ import com.google.common.collect.Maps;
public class RealizationRegistry {
private static final Logger logger = LoggerFactory.getLogger(RealizationRegistry.class);
- private static final ConcurrentHashMap<KylinConfig, RealizationRegistry> CACHE = new ConcurrentHashMap<KylinConfig, RealizationRegistry>();
+ private static final ConcurrentMap<KylinConfig, RealizationRegistry> CACHE = new ConcurrentHashMap<KylinConfig, RealizationRegistry>();
public static RealizationRegistry getInstance(KylinConfig config) {
RealizationRegistry r = CACHE.get(config);
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
index 8cfe87d..48febeb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -42,7 +43,7 @@ public class StreamingManager {
private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
+ private static final ConcurrentMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 748e873..2d330c0 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -47,7 +48,7 @@ public class HybridManager implements IRealizationProvider {
private static final Logger logger = LoggerFactory.getLogger(HybridManager.class);
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, HybridManager> CACHE = new ConcurrentHashMap<KylinConfig, HybridManager>();
+ private static final ConcurrentMap<KylinConfig, HybridManager> CACHE = new ConcurrentHashMap<KylinConfig, HybridManager>();
public static HybridManager getInstance(KylinConfig config) {
HybridManager r = CACHE.get(config);
http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 775f052..50295c3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -43,7 +44,7 @@ public class KafkaConfigManager {
private static final Logger logger = LoggerFactory.getLogger(KafkaConfigManager.class);
// static cached instances
- private static final ConcurrentHashMap<KylinConfig, KafkaConfigManager> CACHE = new ConcurrentHashMap<KylinConfig, KafkaConfigManager>();
+ private static final ConcurrentMap<KylinConfig, KafkaConfigManager> CACHE = new ConcurrentHashMap<KylinConfig, KafkaConfigManager>();
private KylinConfig config;
@@ -59,7 +60,7 @@ public class KafkaConfigManager {
private KafkaConfigManager(KylinConfig config) throws IOException {
this.config = config;
this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, "kafka");
-
+
// touch lower level metadata before registering my listener
reloadAllKafkaConfig();
Broadcaster.getInstance(config).registerListener(new KafkaSyncListener(), "kafka");
@@ -195,7 +196,7 @@ public class KafkaConfigManager {
throw new IllegalArgumentException("No topic info");
}
- if (kafkaConfig.getKafkaClusterConfigs() == null || kafkaConfig.getKafkaClusterConfigs().size() ==0) {
+ if (kafkaConfig.getKafkaClusterConfigs() == null || kafkaConfig.getKafkaClusterConfigs().size() == 0) {
throw new IllegalArgumentException("No cluster info");
}
@@ -213,7 +214,7 @@ public class KafkaConfigManager {
private void removeKafkaConfigLocal(String name) {
kafkaMap.remove(name);
}
-
+
private void reloadAllKafkaConfig() throws IOException {
ResourceStore store = getStore();
logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));