You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/09 04:54:42 UTC
[2/3] incubator-kylin git commit: KYLIN-1127 Broadcaster be
configured by KylinConfig (instead of spring profile)
KYLIN-1127 Broadcaster be configured by KylinConfig (instead of spring profile)
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f4595ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f4595ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f4595ae
Branch: refs/heads/2.x-staging
Commit: 2f4595ae9db582a5b1c498534916f90db22b55b2
Parents: f1dedab
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 11:15:12 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 11:15:12 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/cache/CacheUpdater.java | 11 ----
.../kylin/common/cache/LocalCacheUpdater.java | 18 ------
.../kylin/common/cache/RemoteCacheUpdater.java | 14 -----
.../common/restclient/AbstractRestCache.java | 17 +++---
.../kylin/common/restclient/Broadcaster.java | 58 +++++++++++++++-----
.../restclient/CaseInsensitiveStringCache.java | 6 +-
.../common/restclient/SingleValueCache.java | 16 +++---
.../org/apache/kylin/cube/CubeDescManager.java | 3 +-
.../java/org/apache/kylin/cube/CubeManager.java | 4 +-
.../apache/kylin/metadata/MetadataManager.java | 10 +++-
.../kylin/metadata/project/ProjectManager.java | 3 +-
.../kylin/storage/hybrid/HybridManager.java | 4 +-
.../engine/streaming/StreamingManager.java | 3 +-
.../engine/streaming/cli/StreamingCLI.java | 7 +--
.../test_case_data/localmeta/kylin.properties | 2 +-
.../kylin/invertedindex/IIDescManager.java | 3 +-
.../apache/kylin/invertedindex/IIManager.java | 4 +-
.../kylin/rest/service/CacheServiceTest.java | 6 +-
.../kylin/source/kafka/KafkaConfigManager.java | 23 ++++----
19 files changed, 105 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
deleted file mode 100644
index 615ee14..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings("rawtypes")
-public interface CacheUpdater {
- void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
deleted file mode 100644
index 8d3b648..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class LocalCacheUpdater implements CacheUpdater {
- @Override
- public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
- if (syncAction == Broadcaster.EVENT.CREATE || syncAction == Broadcaster.EVENT.UPDATE) {
- cache.putLocal(key, value);
- } else if (syncAction == Broadcaster.EVENT.DROP) {
- cache.removeLocal(key);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
deleted file mode 100644
index 2927d2d..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings("rawtypes")
-public class RemoteCacheUpdater implements CacheUpdater {
- @Override
- public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
- Broadcaster.getInstance().queue(type.getType(), syncAction.getType(), key.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
index 68d9be5..fc030b4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
@@ -18,8 +18,7 @@
package org.apache.kylin.common.restclient;
-import org.apache.kylin.common.cache.CacheUpdater;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.KylinConfig;
/**
* @author xjiang
@@ -27,17 +26,17 @@ import org.apache.kylin.common.cache.RemoteCacheUpdater;
*/
public abstract class AbstractRestCache<K, V> {
- protected static CacheUpdater cacheUpdater = new RemoteCacheUpdater();
-
- public static void setCacheUpdater(CacheUpdater cu) {
- cacheUpdater = cu;
- }
-
+ protected final KylinConfig config;
protected final Broadcaster.TYPE syncType;
- protected AbstractRestCache(Broadcaster.TYPE syncType) {
+ protected AbstractRestCache(KylinConfig config, Broadcaster.TYPE syncType) {
+ this.config = config;
this.syncType = syncType;
}
+
+ public Broadcaster getBroadcaster() {
+ return Broadcaster.getInstance(config);
+ }
public abstract void put(K key, V value);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 80ec33c..871d77c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
@@ -43,24 +44,52 @@ public class Broadcaster {
private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
+ // static cached instances
+ private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
+
+ public static Broadcaster getInstance(KylinConfig config) {
+ Broadcaster r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+
+ synchronized (Broadcaster.class) {
+ r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+
+ r = new Broadcaster(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one cubemanager singleton exist");
+ }
+ return r;
+ }
+ }
+
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
+ // ============================================================================
+
private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
private AtomicLong counter = new AtomicLong();
- static class BroadcasterHolder {
- static final Broadcaster INSTANCE = new Broadcaster();
- }
+ private Broadcaster(final KylinConfig config) {
+ final String[] nodes = config.getRestServers();
+ if (nodes == null || nodes.length < 1) {
+ logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
+ broadcastEvents = null; // disable the broadcaster
+ return;
+ }
+ logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
- private Broadcaster() {
Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
@Override
public void run() {
- final String[] nodes = KylinConfig.getInstanceFromEnv().getRestServers();
- if (nodes == null || nodes.length < 1) {//TODO if the node count is greater than 1, it means it is a cluster
- logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
- return;
- }
- logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
final List<RestClient> restClients = Lists.newArrayList();
for (String node : nodes) {
restClients.add(new RestClient(node));
@@ -90,10 +119,6 @@ public class Broadcaster {
});
}
- public static Broadcaster getInstance() {
- return BroadcasterHolder.INSTANCE;
- }
-
/**
* Broadcast the cubedesc event out
*
@@ -101,6 +126,9 @@ public class Broadcaster {
* event action
*/
public void queue(String type, String action, String key) {
+ if (broadcastEvents == null)
+ return;
+
try {
counter.incrementAndGet();
broadcastEvents.putFirst(new BroadcastEvent(type, action, key));
@@ -138,7 +166,7 @@ public class Broadcaster {
}
public enum TYPE {
- ALL("all"), CUBE("cube"),STREAMING("streaming"),KAFKA("kafka"),CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
+ ALL("all"), CUBE("cube"), STREAMING("streaming"), KAFKA("kafka"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
private String text;
TYPE(String text) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
index 68e3c04..2bcddbf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
@@ -20,12 +20,14 @@ package org.apache.kylin.common.restclient;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.kylin.common.KylinConfig;
+
/**
*/
public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> {
- public CaseInsensitiveStringCache(Broadcaster.TYPE syncType) {
- super(syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
+ public CaseInsensitiveStringCache(KylinConfig config, Broadcaster.TYPE syncType) {
+ super(config, syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
index cb6c286..9acfeca 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
@@ -25,6 +25,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.kylin.common.KylinConfig;
+
/**
* @author xjiang
*
@@ -33,12 +35,12 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
private final ConcurrentMap<K, V> innerCache;
- public SingleValueCache(Broadcaster.TYPE syncType) {
- this(syncType, new ConcurrentHashMap<K, V>());
+ public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType) {
+ this(config, syncType, new ConcurrentHashMap<K, V>());
}
- public SingleValueCache(Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
- super(syncType);
+ public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
+ super(config, syncType);
this.innerCache = innerCache;
}
@@ -48,9 +50,9 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
innerCache.put(key, value);
if (!exists) {
- cacheUpdater.updateCache(key, value, Broadcaster.EVENT.CREATE, syncType, this);
+ getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.CREATE.getType(), key.toString());
} else {
- cacheUpdater.updateCache(key, value, Broadcaster.EVENT.UPDATE, syncType, this);
+ getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.UPDATE.getType(), key.toString());
}
}
@@ -64,7 +66,7 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
innerCache.remove(key);
if (exists) {
- cacheUpdater.updateCache(key, null, Broadcaster.EVENT.DROP, syncType, this);
+ getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.DROP.getType(), key.toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 8e75d29..c50836c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -57,7 +57,7 @@ public class CubeDescManager {
private KylinConfig config;
// name ==> CubeDesc
- private CaseInsensitiveStringCache<CubeDesc> cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(Broadcaster.TYPE.CUBE_DESC);
+ private CaseInsensitiveStringCache<CubeDesc> cubeDescMap;
public static CubeDescManager getInstance(KylinConfig config) {
CubeDescManager r = CACHE.get(config);
@@ -90,6 +90,7 @@ public class CubeDescManager {
private CubeDescManager(KylinConfig config) throws IOException {
logger.info("Initializing CubeDescManager with config " + config);
this.config = config;
+ this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, Broadcaster.TYPE.CUBE_DESC);
reloadAllCubeDesc();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 9b7a024..a249cd7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -116,7 +116,7 @@ public class CubeManager implements IRealizationProvider {
private KylinConfig config;
// cube name ==> CubeInstance
- private CaseInsensitiveStringCache<CubeInstance> cubeMap = new CaseInsensitiveStringCache<CubeInstance>(Broadcaster.TYPE.CUBE);
+ private CaseInsensitiveStringCache<CubeInstance> cubeMap;
// "table/column" ==> lookup table
// private SingleValueCache<String, LookupStringTable> lookupTables = new SingleValueCache<String, LookupStringTable>(Broadcaster.TYPE.METADATA);
@@ -126,7 +126,7 @@ public class CubeManager implements IRealizationProvider {
private CubeManager(KylinConfig config) throws IOException {
logger.info("Initializing CubeManager with config " + config);
this.config = config;
-
+ this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, Broadcaster.TYPE.CUBE);
loadAllCubeInstance();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index b7e7dc5..c907afd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -103,11 +103,11 @@ public class MetadataManager {
private KylinConfig config;
// table name ==> SourceTable
- private CaseInsensitiveStringCache<TableDesc> srcTableMap = new CaseInsensitiveStringCache<TableDesc>(Broadcaster.TYPE.TABLE);
+ private CaseInsensitiveStringCache<TableDesc> srcTableMap;
// name => value
- private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(Broadcaster.TYPE.TABLE);
+ private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap;
// name => DataModelDesc
- private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(Broadcaster.TYPE.DATA_MODEL);
+ private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap;
private MetadataManager(KylinConfig config) throws IOException {
init(config);
@@ -198,6 +198,10 @@ public class MetadataManager {
private void init(KylinConfig config) throws IOException {
this.config = config;
+ this.srcTableMap = new CaseInsensitiveStringCache<TableDesc>(config, Broadcaster.TYPE.TABLE);
+ this.srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(config, Broadcaster.TYPE.TABLE);
+ this.dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(config, Broadcaster.TYPE.DATA_MODEL);
+
reloadAllSourceTable();
reloadAllSourceTableExd();
reloadAllDataModel();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index fd41f59..8304128 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -81,11 +81,12 @@ public class ProjectManager {
private KylinConfig config;
private ProjectL2Cache l2Cache;
// project name => ProjrectInstance
- private CaseInsensitiveStringCache<ProjectInstance> projectMap = new CaseInsensitiveStringCache<ProjectInstance>(Broadcaster.TYPE.PROJECT);
+ private CaseInsensitiveStringCache<ProjectInstance> projectMap;
private ProjectManager(KylinConfig config) throws IOException {
logger.info("Initializing ProjectManager with metadata url " + config);
this.config = config;
+ this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, Broadcaster.TYPE.PROJECT);
this.l2Cache = new ProjectL2Cache(this);
reloadAllProjects();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 0f00f1a..9392ef5 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -76,12 +76,12 @@ public class HybridManager implements IRealizationProvider {
private KylinConfig config;
- private CaseInsensitiveStringCache<HybridInstance> hybridMap = new CaseInsensitiveStringCache<HybridInstance>(Broadcaster.TYPE.HYBRID);
+ private CaseInsensitiveStringCache<HybridInstance> hybridMap;
private HybridManager(KylinConfig config) throws IOException {
logger.info("Initializing HybridManager with config " + config);
this.config = config;
-
+ this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, Broadcaster.TYPE.HYBRID);
loadAllHybridInstance();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index fa7d0f8..8cabe1b 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -79,7 +79,7 @@ public class StreamingManager {
private KylinConfig config;
// name ==> StreamingConfig
- private CaseInsensitiveStringCache<StreamingConfig> streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(Broadcaster.TYPE.STREAMING);
+ private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
public static void clearCache() {
CACHE.clear();
@@ -87,6 +87,7 @@ public class StreamingManager {
private StreamingManager(KylinConfig config) throws IOException {
this.config = config;
+ this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, Broadcaster.TYPE.STREAMING);
reloadAllStreaming();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index e3a7133..a73a6ac 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -34,11 +34,10 @@
package org.apache.kylin.engine.streaming.cli;
-import com.google.common.base.Preconditions;
+import java.util.List;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
-import org.apache.kylin.common.restclient.AbstractRestCache;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.streaming.BootstrapConfig;
import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
@@ -48,7 +47,7 @@ import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
+import com.google.common.base.Preconditions;
public class StreamingCLI {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 92773a1..48f01f5 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -1,7 +1,7 @@
## Config for Kylin Engine ##
# List of web servers in use, this enables one web server instance to sync up with other servers.
-kylin.rest.servers=localhost:7070
+#kylin.rest.servers=localhost:7070
# The metadata store in hbase
kylin.metadata.url=
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
index a166ae7..917fe46 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
@@ -54,7 +54,7 @@ public class IIDescManager {
private KylinConfig config;
// name ==> IIDesc
- private CaseInsensitiveStringCache<IIDesc> iiDescMap = new CaseInsensitiveStringCache<IIDesc>(Broadcaster.TYPE.INVERTED_INDEX_DESC);
+ private CaseInsensitiveStringCache<IIDesc> iiDescMap;
public static IIDescManager getInstance(KylinConfig config) {
IIDescManager r = CACHE.get(config);
@@ -87,6 +87,7 @@ public class IIDescManager {
private IIDescManager(KylinConfig config) throws IOException {
logger.info("Initializing IIDescManager with config " + config);
this.config = config;
+ this.iiDescMap = new CaseInsensitiveStringCache<IIDesc>(config, Broadcaster.TYPE.INVERTED_INDEX_DESC);
reloadAllIIDesc();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 5633004..b6dfdf1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -92,7 +92,7 @@ public class IIManager implements IRealizationProvider {
private KylinConfig config;
// ii name ==> IIInstance
- private CaseInsensitiveStringCache<IIInstance> iiMap = new CaseInsensitiveStringCache<IIInstance>(Broadcaster.TYPE.INVERTED_INDEX);
+ private CaseInsensitiveStringCache<IIInstance> iiMap;
// for generation hbase table name of a new segment
private Multimap<String, String> usedStorageLocation = HashMultimap.create();
@@ -100,7 +100,7 @@ public class IIManager implements IRealizationProvider {
private IIManager(KylinConfig config) throws IOException {
logger.info("Initializing IIManager with config " + config);
this.config = config;
-
+ this.iiMap = new CaseInsensitiveStringCache<IIInstance>(config, Broadcaster.TYPE.INVERTED_INDEX);
loadAllIIInstance();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index af1bbc0..b90c10a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -71,7 +71,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
public static void beforeClass() throws Exception {
staticCreateTestMetadata();
configA = KylinConfig.getInstanceFromEnv();
+ configA.setProperty("kylin.rest.servers", "localhost:7070");
configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
+ configB.setProperty("kylin.rest.servers", "localhost:7070");
configB.setMetadataUrl("../examples/test_metadata");
server = new Server(7070);
@@ -209,7 +211,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
@Test
public void testCubeCRUD() throws Exception {
- final Broadcaster broadcaster = Broadcaster.getInstance();
+ final Broadcaster broadcaster = Broadcaster.getInstance(configA);
broadcaster.getCounterAndClear();
getStore().deleteResource("/cube/a_whole_new_cube.json");
@@ -306,7 +308,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
public void testMetaCRUD() throws Exception {
final MetadataManager metadataManager = MetadataManager.getInstance(configA);
final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
- final Broadcaster broadcaster = Broadcaster.getInstance();
+ final Broadcaster broadcaster = Broadcaster.getInstance(configA);
broadcaster.getCounterAndClear();
TableDesc tableDesc = createTestTableDesc();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 3032d13..8cf51b6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -34,10 +34,12 @@
package org.apache.kylin.source.kafka;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -45,17 +47,15 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
/**
*/
@@ -69,7 +69,7 @@ public class KafkaConfigManager {
private KylinConfig config;
// name ==> StreamingConfig
- private CaseInsensitiveStringCache<KafkaConfig> kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(Broadcaster.TYPE.KAFKA);
+ private CaseInsensitiveStringCache<KafkaConfig> kafkaMap;
public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
@@ -79,6 +79,7 @@ public class KafkaConfigManager {
private KafkaConfigManager(KylinConfig config) throws IOException {
this.config = config;
+ this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, Broadcaster.TYPE.KAFKA);
reloadAllKafkaConfig();
}