You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/22 23:49:59 UTC
[15/16] kylin git commit: half way
half way
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/52d0bc17
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/52d0bc17
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/52d0bc17
Branch: refs/heads/KYLIN-2033
Commit: 52d0bc1703d89b68548005b071e4da9bb25234ed
Parents: 478066d
Author: Yang Li <li...@apache.org>
Authored: Thu Sep 22 21:03:39 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Thu Sep 22 21:04:05 2016 +0800
----------------------------------------------------------------------
.../kylin/metadata/project/ProjectManager.java | 3 --
.../kylin/storage/hybrid/HybridManager.java | 54 ++++++++++++--------
.../engine/streaming/StreamingManager.java | 14 ++---
.../kylin/source/kafka/KafkaConfigManager.java | 18 ++++---
4 files changed, 51 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 972d40f..be69df3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -31,9 +31,6 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;
http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index f772777..d73a1a9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -29,8 +29,9 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.cube.CubeManager.SyncListener;
import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -85,42 +86,49 @@ public class HybridManager implements IRealizationProvider {
private HybridManager(KylinConfig config) throws IOException {
logger.info("Initializing HybridManager with config " + config);
this.config = config;
- this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid");
- Broadcaster.getInstance(config).registerListener(new SyncListener(), "hybrid", "cube");
- loadAllHybridInstance();
+ this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid", new SyncListener());
+ reloadAllHybridInstance();
}
- private class SyncListener implements Broadcaster.Listener {
+ private class SyncListener extends Broadcaster.Listener {
+
@Override
- public void clearAll() {
- // TODO Auto-generated method stub
-
+ public void onClearAll(Broadcaster broadcaster) throws IOException {
+ clearCache();
}
@Override
- public void notify(String entity, Event event, String cacheKey) {
- if (event == Event.CREATE || event == Event.UPDATE) {
- switch (entity) {
- case "hybrid":
- loadAllHybridInstance();
- break;
- case "cube":
- reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
- break;
+ public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+ for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
+ if (real instanceof HybridInstance) {
+ reloadHybridInstance(real.getName());
}
}
+ }
+
+ @Override
+ public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+ String hybridName = cacheKey;
+
+ if (event == Event.DROP)
+ hybridMap.removeLocal(hybridName);
+ else
+ reloadHybridInstance(hybridName);
+ for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID, hybridName)) {
+ broadcaster.notifyProjectSchemaUpdate(prj.getName());
+ }
}
}
- private void loadAllHybridInstance() throws IOException {
+ private void reloadAllHybridInstance() throws IOException {
ResourceStore store = getStore();
List<String> paths = store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json");
logger.debug("Loading Hybrid from folder " + store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT));
for (String path : paths) {
- loadHybridInstance(path);
+ reloadHybridInstanceAt(path);
}
logger.debug("Loaded " + paths.size() + " Hybrid(s)");
@@ -137,11 +145,15 @@ public class HybridManager implements IRealizationProvider {
}
if (includes == true)
- loadHybridInstance(HybridInstance.concatResourcePath(hybridInstance.getName()));
+ reloadHybridInstance(hybridInstance.getName());
}
}
- private synchronized HybridInstance loadHybridInstance(String path) {
+ public void reloadHybridInstance(String name) {
+ reloadHybridInstanceAt(HybridInstance.concatResourcePath(name));
+ }
+
+ private synchronized HybridInstance reloadHybridInstanceAt(String path) {
ResourceStore store = getStore();
HybridInstance hybridInstance = null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 87dd5d5..5a3f104 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -77,18 +77,18 @@ public class StreamingManager {
reloadAllStreaming();
}
- private class SyncListener implements Broadcaster.Listener {
+ private class SyncListener extends Broadcaster.Listener {
@Override
- public void clearAll() {
- // TODO Auto-generated method stub
-
+ public void onClearAll(Broadcaster broadcaster) throws IOException {
+ clearCache();
}
@Override
- public void notify(String entity, Event event, String cacheKey) throws IOException {
- if (event == Event.CREATE || event == Event.UPDATE) {
+ public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+ if (event == Event.DROP)
+ removeStreamingLocal(cacheKey);
+ else
reloadStreamingConfigLocal(cacheKey);
- }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index a3b675b..8b982e2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -78,18 +78,18 @@ public class KafkaConfigManager {
reloadAllKafkaConfig();
}
- private class SyncListener implements Broadcaster.Listener {
+ private class SyncListener extends Broadcaster.Listener {
@Override
- public void clearAll() {
- // TODO Auto-generated method stub
-
+ public void onClearAll(Broadcaster broadcaster) throws IOException {
+ clearCache();
}
@Override
- public void notify(String entity, Event event, String cacheKey) throws IOException {
- if (event == Event.CREATE || event == Event.UPDATE) {
+ public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+ if (event == Event.DROP)
+ removeKafkaConfigLocal(cacheKey);
+ else
reloadKafkaConfigLocal(cacheKey);
- }
}
}
@@ -215,6 +215,10 @@ public class KafkaConfigManager {
kafkaMap.remove(kafkaConfig.getName());
}
+ private void removeKafkaConfigLocal(String name) {
+ kafkaMap.remove(name);
+ }
+
private void reloadAllKafkaConfig() throws IOException {
ResourceStore store = getStore();
logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));