You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/31 09:06:43 UTC
incubator-eagle git commit: [EAGLE-701] AlertEngine: bolt should
check ZK for latest spec when start
Repository: incubator-eagle
Updated Branches:
refs/heads/master e71d6d852 -> 7848680d3
[EAGLE-701] AlertEngine: bolt should check ZK for latest spec when start
Author: wujinhu <wu...@126.com>
Closes #586 from wujinhu/EAGLE-701.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7848680d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7848680d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7848680d
Branch: refs/heads/master
Commit: 7848680d362e3c14480149e8c100efdfcd6782dc
Parents: e71d6d8
Author: wujinhu <wu...@126.com>
Authored: Mon Oct 31 17:06:33 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Mon Oct 31 17:06:33 2016 +0800
----------------------------------------------------------------------
.../apache/eagle/alert/config/ConfigBusConsumer.java | 14 ++++++++++----
.../coordinator/IMetadataChangeNotifyService.java | 4 ++++
.../impl/ZKMetadataChangeNotifyService.java | 5 +++++
.../eagle/alert/engine/runner/AbstractStreamBolt.java | 5 +++++
4 files changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7848680d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
index c2849a2..d5e6b4e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
@@ -30,17 +30,17 @@ public class ConfigBusConsumer extends ConfigBusBase {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusConsumer.class);
private NodeCache cache;
+ private String zkPath;
+ public static final ObjectMapper mapper = new ObjectMapper();
public ConfigBusConsumer(ZKConfig config, String topic, ConfigChangeCallback callback) {
super(config);
- String zkPath = zkRoot + "/" + topic;
+ this.zkPath = zkRoot + "/" + topic;
LOG.info("monitor change for zkPath " + zkPath);
cache = new NodeCache(curator, zkPath);
cache.getListenable().addListener(() -> {
// get node value and notify callback
- byte[] value = curator.getData().forPath(zkPath);
- ObjectMapper mapper = new ObjectMapper();
- ConfigValue v = mapper.readValue(value, ConfigValue.class);
+ ConfigValue v = getConfigValue();
callback.onNewConfig(v);
}
);
@@ -51,4 +51,10 @@ public class ConfigBusConsumer extends ConfigBusBase {
throw new RuntimeException(ex);
}
}
+
+ public ConfigValue getConfigValue() throws Exception {
+ byte[] value = curator.getData().forPath(zkPath);
+ ConfigValue v = mapper.readValue(value, ConfigValue.class);
+ return v;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7848680d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
index 5aa754e..dab9f5a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
@@ -50,4 +50,8 @@ public interface IMetadataChangeNotifyService extends Closeable, Serializable {
void registerListener(StreamRouterBoltSpecListener listener);
void registerListener(AlertPublishSpecListener listener);
+
+ default void activateFetchMetaData() throws Exception {
+
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7848680d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
index 8dc6fdd..bd30b87 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
@@ -72,6 +72,11 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS
LOG.info("init called for client");
}
+ @Override
+ public void activateFetchMetaData() throws Exception {
+ this.onNewConfig(consumer.getConfigValue());
+ }
+
private String getMetadataTopicSuffix() {
switch (type) {
case ALERT_BOLT:
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7848680d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
index 92e9c8c..ab05b48 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -82,6 +82,11 @@ public abstract class AbstractStreamBolt extends BaseRichBolt implements Seriali
this.collector = collector;
this.serializer = Serializers.newPartitionedEventSerializer(this);
internalPrepare(collector, this.changeNotifyService, this.config, context);
+ try {
+ this.changeNotifyService.activateFetchMetaData();
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
}