You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/27 00:26:19 UTC

[26/50] [abbrv] kylin git commit: KYLIN-2033 refactor metadata sync mechanismn

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index d594873..2de8527 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -44,9 +44,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,8 +74,26 @@ public class KafkaConfigManager {
 
     private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
-        this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, Broadcaster.TYPE.KAFKA);
+        this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, "kafka");
+        
+        // touch lower level metadata before registering my listener
         reloadAllKafkaConfig();
+        Broadcaster.getInstance(config).registerListener(new KafkaSyncListener(), "kafka");
+    }
+
+    private class KafkaSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                removeKafkaConfigLocal(cacheKey);
+            else
+                reloadKafkaConfigLocal(cacheKey);
+        }
     }
 
     private ResourceStore getStore() {
@@ -199,6 +218,10 @@ public class KafkaConfigManager {
         kafkaMap.remove(kafkaConfig.getName());
     }
 
+    private void removeKafkaConfigLocal(String name) {
+        kafkaMap.remove(name);
+    }
+    
     private void reloadAllKafkaConfig() throws IOException {
         ResourceStore store = getStore();
         logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));

http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 3066fb5..52aa7ea 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -44,7 +44,6 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Dictionary;
@@ -57,6 +56,7 @@ import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -564,7 +564,7 @@ public class CubeMigrationCLI {
             RestClient restClient = new RestClient(node);
             try {
                 logger.info("update meta cache for " + node);
-                restClient.wipeCache(Broadcaster.TYPE.ALL.getType(), Broadcaster.EVENT.UPDATE.getType(), "all");
+                restClient.wipeCache(Broadcaster.SYNC_ALL, Broadcaster.Event.UPDATE.getType(), Broadcaster.SYNC_ALL);
             } catch (IOException e) {
                 logger.error(e.getMessage());
             }