You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/22 23:49:59 UTC

[15/16] kylin git commit: half way

half way


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/52d0bc17
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/52d0bc17
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/52d0bc17

Branch: refs/heads/KYLIN-2033
Commit: 52d0bc1703d89b68548005b071e4da9bb25234ed
Parents: 478066d
Author: Yang Li <li...@apache.org>
Authored: Thu Sep 22 21:03:39 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Thu Sep 22 21:04:05 2016 +0800

----------------------------------------------------------------------
 .../kylin/metadata/project/ProjectManager.java  |  3 --
 .../kylin/storage/hybrid/HybridManager.java     | 54 ++++++++++++--------
 .../engine/streaming/StreamingManager.java      | 14 ++---
 .../kylin/source/kafka/KafkaConfigManager.java  | 18 ++++---
 4 files changed, 51 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 972d40f..be69df3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -31,9 +31,6 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;

http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index f772777..d73a1a9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -29,8 +29,9 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.cube.CubeManager.SyncListener;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -85,42 +86,49 @@ public class HybridManager implements IRealizationProvider {
     private HybridManager(KylinConfig config) throws IOException {
         logger.info("Initializing HybridManager with config " + config);
         this.config = config;
-        this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid");
-        Broadcaster.getInstance(config).registerListener(new SyncListener(), "hybrid", "cube");
-        loadAllHybridInstance();
+        this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid", new SyncListener());
+        reloadAllHybridInstance();
     }
 
-    private class SyncListener implements Broadcaster.Listener {
+    private class SyncListener extends Broadcaster.Listener {
+        
         @Override
-        public void clearAll() {
-            // TODO Auto-generated method stub
-            
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
         }
 
         @Override
-        public void notify(String entity, Event event, String cacheKey) {
-            if (event == Event.CREATE || event == Event.UPDATE) {
-                switch (entity) {
-                case "hybrid":
-                    loadAllHybridInstance();
-                    break;
-                case "cube":
-                    reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
-                    break;
+        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
+            for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
+                if (real instanceof HybridInstance) {
+                    reloadHybridInstance(real.getName());
                 }
             }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            String hybridName = cacheKey;
+            
+            if (event == Event.DROP)
+                hybridMap.removeLocal(hybridName);
+            else
+                reloadHybridInstance(hybridName);
             
+            for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID, hybridName)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
         }
     }
 
-    private void loadAllHybridInstance() throws IOException {
+    private void reloadAllHybridInstance() throws IOException {
         ResourceStore store = getStore();
         List<String> paths = store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json");
 
         logger.debug("Loading Hybrid from folder " + store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT));
 
         for (String path : paths) {
-            loadHybridInstance(path);
+            reloadHybridInstanceAt(path);
         }
 
         logger.debug("Loaded " + paths.size() + " Hybrid(s)");
@@ -137,11 +145,15 @@ public class HybridManager implements IRealizationProvider {
             }
 
             if (includes == true)
-                loadHybridInstance(HybridInstance.concatResourcePath(hybridInstance.getName()));
+                reloadHybridInstance(hybridInstance.getName());
         }
     }
 
-    private synchronized HybridInstance loadHybridInstance(String path) {
+    public void reloadHybridInstance(String name) {
+        reloadHybridInstanceAt(HybridInstance.concatResourcePath(name));
+    }
+    
+    private synchronized HybridInstance reloadHybridInstanceAt(String path) {
         ResourceStore store = getStore();
 
         HybridInstance hybridInstance = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 87dd5d5..5a3f104 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -77,18 +77,18 @@ public class StreamingManager {
         reloadAllStreaming();
     }
 
-    private class SyncListener implements Broadcaster.Listener {
+    private class SyncListener extends Broadcaster.Listener {
         @Override
-        public void clearAll() {
-            // TODO Auto-generated method stub
-            
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
         }
 
         @Override
-        public void notify(String entity, Event event, String cacheKey) throws IOException {
-            if (event == Event.CREATE || event == Event.UPDATE) {
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                removeStreamingLocal(cacheKey);
+            else
                 reloadStreamingConfigLocal(cacheKey);
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index a3b675b..8b982e2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -78,18 +78,18 @@ public class KafkaConfigManager {
         reloadAllKafkaConfig();
     }
 
-    private class SyncListener implements Broadcaster.Listener {
+    private class SyncListener extends Broadcaster.Listener {
         @Override
-        public void clearAll() {
-            // TODO Auto-generated method stub
-            
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
         }
 
         @Override
-        public void notify(String entity, Event event, String cacheKey) throws IOException {
-            if (event == Event.CREATE || event == Event.UPDATE) {
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                removeKafkaConfigLocal(cacheKey);
+            else
                 reloadKafkaConfigLocal(cacheKey);
-            }
         }
     }
 
@@ -215,6 +215,10 @@ public class KafkaConfigManager {
         kafkaMap.remove(kafkaConfig.getName());
     }
 
+    private void removeKafkaConfigLocal(String name) {
+        kafkaMap.remove(name);
+    }
+    
     private void reloadAllKafkaConfig() throws IOException {
         ResourceStore store = getStore();
         logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));