You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/31 09:06:43 UTC

incubator-eagle git commit: [EAGLE-701] AlertEngine: bolt should check ZK for latest spec when start

Repository: incubator-eagle
Updated Branches:
  refs/heads/master e71d6d852 -> 7848680d3


[EAGLE-701] AlertEngine: bolt should check ZK for latest spec when start

Author: wujinhu <wu...@126.com>

Closes #586 from wujinhu/EAGLE-701.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7848680d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7848680d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7848680d

Branch: refs/heads/master
Commit: 7848680d362e3c14480149e8c100efdfcd6782dc
Parents: e71d6d8
Author: wujinhu <wu...@126.com>
Authored: Mon Oct 31 17:06:33 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Mon Oct 31 17:06:33 2016 +0800

----------------------------------------------------------------------
 .../apache/eagle/alert/config/ConfigBusConsumer.java  | 14 ++++++++++----
 .../coordinator/IMetadataChangeNotifyService.java     |  4 ++++
 .../impl/ZKMetadataChangeNotifyService.java           |  5 +++++
 .../eagle/alert/engine/runner/AbstractStreamBolt.java |  5 +++++
 4 files changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7848680d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
index c2849a2..d5e6b4e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
@@ -30,17 +30,17 @@ public class ConfigBusConsumer extends ConfigBusBase {
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusConsumer.class);
 
     private NodeCache cache;
+    private String zkPath;
+    public static final ObjectMapper mapper = new ObjectMapper();
 
     public ConfigBusConsumer(ZKConfig config, String topic, ConfigChangeCallback callback) {
         super(config);
-        String zkPath = zkRoot + "/" + topic;
+        this.zkPath = zkRoot + "/" + topic;
         LOG.info("monitor change for zkPath " + zkPath);
         cache = new NodeCache(curator, zkPath);
         cache.getListenable().addListener(() -> {
                 // get node value and notify callback
-                byte[] value = curator.getData().forPath(zkPath);
-                ObjectMapper mapper = new ObjectMapper();
-                ConfigValue v = mapper.readValue(value, ConfigValue.class);
+                ConfigValue v = getConfigValue();
                 callback.onNewConfig(v);
             }
         );
@@ -51,4 +51,10 @@ public class ConfigBusConsumer extends ConfigBusBase {
             throw new RuntimeException(ex);
         }
     }
+
+    public ConfigValue getConfigValue() throws Exception {
+        byte[] value = curator.getData().forPath(zkPath);
+        ConfigValue v = mapper.readValue(value, ConfigValue.class);
+        return v;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7848680d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
index 5aa754e..dab9f5a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
@@ -50,4 +50,8 @@ public interface IMetadataChangeNotifyService extends Closeable, Serializable {
     void registerListener(StreamRouterBoltSpecListener listener);
 
     void registerListener(AlertPublishSpecListener listener);
+
+    default void activateFetchMetaData() throws Exception {
+
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7848680d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
index 8dc6fdd..bd30b87 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
@@ -72,6 +72,11 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS
         LOG.info("init called for client");
     }
 
+    @Override
+    public void activateFetchMetaData() throws Exception {
+        this.onNewConfig(consumer.getConfigValue());
+    }
+
     private String getMetadataTopicSuffix() {
         switch (type) {
             case ALERT_BOLT:

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7848680d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
index 92e9c8c..ab05b48 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -82,6 +82,11 @@ public abstract class AbstractStreamBolt extends BaseRichBolt implements Seriali
         this.collector = collector;
         this.serializer = Serializers.newPartitionedEventSerializer(this);
         internalPrepare(collector, this.changeNotifyService, this.config, context);
+        try {
+            this.changeNotifyService.activateFetchMetaData();
+        } catch (Exception e) {
+            LOG.warn(e.getMessage(), e);
+        }
     }