You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/11/08 03:46:31 UTC
[1/2] incubator-eagle git commit: EAGLE-741: Make publishment
settings both policy & stream awareness
Repository: incubator-eagle
Updated Branches:
refs/heads/master eaab4a9e0 -> a1c5eca05
EAGLE-741: Make publishment settings both policy & stream awareness
Currently our publishment is defined policy specific, we cannot publish the alert into different places for one policy although there are multiple output streams for this policy.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/ccc5ffb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/ccc5ffb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/ccc5ffb5
Branch: refs/heads/master
Commit: ccc5ffb528f8ffd2c750ad57d5bbb3ad5bcbdab8
Parents: ca0fae4
Author: Xiancheng Li <xi...@ebay.com>
Authored: Mon Nov 7 13:55:20 2016 +0800
Committer: Xiancheng Li <xi...@ebay.com>
Committed: Mon Nov 7 16:31:07 2016 +0800
----------------------------------------------------------------------
.../alert/engine/coordinator/Publishment.java | 13 ++-
.../publisher/impl/AbstractPublishPlugin.java | 1 -
.../publisher/impl/AlertPublisherImpl.java | 101 +++++++++++++------
.../engine/router/TestAlertPublisherBolt.java | 2 +
.../src/test/resources/testPublishSpec.json | 6 +-
.../src/test/resources/testPublishSpec2.json | 6 +-
.../src/test/resources/testPublishSpec3.json | 42 ++++++++
7 files changed, 135 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index b224a4b..dbb1844 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -32,6 +32,7 @@ public class Publishment {
private String name;
private String type;
private List<String> policyIds;
+ private List<String> streamIds;
private String dedupIntervalMin;
private List<String> dedupFields;
private String dedupStateField;
@@ -97,6 +98,14 @@ public class Publishment {
this.policyIds = policyIds;
}
+ public List<String> getStreamIds() {
+ return streamIds;
+ }
+
+ public void setStreamIds(List<String> streamIds) {
+ this.streamIds = streamIds;
+ }
+
public String getDedupIntervalMin() {
return dedupIntervalMin;
}
@@ -130,7 +139,9 @@ public class Publishment {
&& Objects.equals(dedupFields, p.getDedupFields())
&& Objects.equals(dedupStateField, p.getDedupStateField())
&& Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
- && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties()));
+ && Objects.equals(policyIds, p.getPolicyIds())
+ && Objects.equals(streamIds, p.getStreamIds())
+ && properties.equals(p.getProperties()));
}
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index f0f048d..f68ae52 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.codec.IEventSerializer;
import org.apache.eagle.alert.engine.coordinator.OverrideDeduplicatorSpec;
import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index 87ac30f..210fd1b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p/>
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,8 +17,16 @@
package org.apache.eagle.alert.engine.publisher.impl;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.ListUtils;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
@@ -27,20 +35,19 @@ import org.apache.eagle.alert.engine.publisher.AlertPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.typesafe.config.Config;
@SuppressWarnings("rawtypes")
public class AlertPublisherImpl implements AlertPublisher {
+
private static final long serialVersionUID = 4809983246198138865L;
private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
+
+ private static final String STREAM_NAME_DEFAULT = "default";
+
private final String name;
- private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1);
+ private volatile Map<String, Set<String>> psPublishPluginMapping = new ConcurrentHashMap<>(1);
private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
private Config config;
private Map conf;
@@ -74,13 +81,18 @@ public class AlertPublisherImpl implements AlertPublisher {
LOG.warn("policyId cannot be null for event to be published");
return;
}
- List<String> pubIds = policyPublishPluginMapping.get(policyId);
+ // use default stream name if specified stream publisher is not found
+ Set<String> pubIds = psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId, event.getStreamId()));
if (pubIds == null) {
- LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId);
+ pubIds = psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId));
+ }
+ if (pubIds == null) {
+ LOG.warn("Policy {} Stream {} does *NOT* subscribe any publishment!", policyId, event.getStreamId());
return;
}
for (String pubId : pubIds) {
+ @SuppressWarnings("resource")
AlertPublishPlugin plugin = pubId != null ? publishPluginMapping.get(pubId) : null;
if (plugin == null) {
LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId);
@@ -100,12 +112,11 @@ public class AlertPublisherImpl implements AlertPublisher {
publishPluginMapping.values().forEach(plugin -> plugin.close());
}
- @SuppressWarnings("unchecked")
@Override
public synchronized void onPublishChange(List<Publishment> added,
- List<Publishment> removed,
- List<Publishment> afterModified,
- List<Publishment> beforeModified) {
+ List<Publishment> removed,
+ List<Publishment> afterModified,
+ List<Publishment> beforeModified) {
if (added == null) {
added = new ArrayList<>();
}
@@ -125,7 +136,7 @@ public class AlertPublisherImpl implements AlertPublisher {
}
// copy and swap to avoid concurrency issue
- Map<String, List<String>> newPolicyPublishPluginMapping = new HashMap<>(policyPublishPluginMapping);
+ Map<String, Set<String>> newPSPublishPluginMapping = new HashMap<>(psPublishPluginMapping);
Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
// added
@@ -135,7 +146,7 @@ public class AlertPublisherImpl implements AlertPublisher {
AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
if (plugin != null) {
newPublishMap.put(publishment.getName(), plugin);
- addPublishmentPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), publishment.getName());
+ addPublishmentPoliciesStreams(newPSPublishPluginMapping, publishment.getPolicyIds(), publishment.getStreamIds(), publishment.getName());
} else {
LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
}
@@ -144,7 +155,7 @@ public class AlertPublisherImpl implements AlertPublisher {
List<AlertPublishPlugin> toBeClosed = new ArrayList<>();
for (Publishment publishment : removed) {
String pubName = publishment.getName();
- removePublihsPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), pubName);
+ removePublihsPoliciesStreams(newPSPublishPluginMapping, publishment.getPolicyIds(), pubName);
toBeClosed.add(newPublishMap.get(pubName));
newPublishMap.remove(publishment.getName());
}
@@ -152,13 +163,14 @@ public class AlertPublisherImpl implements AlertPublisher {
for (int i = 0; i < afterModified.size(); i++) {
String pubName = afterModified.get(i).getName();
List<String> newPolicies = afterModified.get(i).getPolicyIds();
+ List<String> newStreams = afterModified.get(i).getStreamIds();
List<String> oldPolicies = beforeModified.get(i).getPolicyIds();
+ List<String> oldStreams = beforeModified.get(i).getStreamIds();
- if (!newPolicies.equals(oldPolicies)) {
- List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies);
- removePublihsPolicies(newPolicyPublishPluginMapping, deletedPolicies, pubName);
- List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies);
- addPublishmentPolicies(newPolicyPublishPluginMapping, addedPolicies, pubName);
+ if (!newPolicies.equals(oldPolicies) || !newStreams.equals(oldStreams)) {
+ // since both policy & stream may change, skip the compare and difference update
+ removePublihsPoliciesStreams(newPSPublishPluginMapping, oldPolicies, pubName);
+ addPublishmentPoliciesStreams(newPSPublishPluginMapping, newPolicies, newStreams, pubName);
}
Publishment newPub = afterModified.get(i);
newPublishMap.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
@@ -166,7 +178,7 @@ public class AlertPublisherImpl implements AlertPublisher {
// now do the swap
publishPluginMapping = newPublishMap;
- policyPublishPluginMapping = newPolicyPublishPluginMapping;
+ psPublishPluginMapping = newPSPublishPluginMapping;
// safely close : it depend on plugin to check if want to wait all data to be flushed.
closePlugins(toBeClosed);
@@ -182,26 +194,51 @@ public class AlertPublisherImpl implements AlertPublisher {
}
}
- private void addPublishmentPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> addedPolicyIds, String pubName) {
+ private void addPublishmentPoliciesStreams(Map<String, Set<String>> newPSPublishPluginMapping,
+ List<String> addedPolicyIds, List<String> addedStreamIds, String pubName) {
if (addedPolicyIds == null || pubName == null) {
return;
}
+ if (addedStreamIds == null || addedStreamIds.size() <= 0) {
+ addedStreamIds = new ArrayList<String>();
+ addedStreamIds.add(STREAM_NAME_DEFAULT);
+ }
+
for (String policyId : addedPolicyIds) {
- newPolicyPublishPluginMapping.putIfAbsent(policyId, new ArrayList<>());
- newPolicyPublishPluginMapping.get(policyId).add(pubName);
+ for (String streamId : addedStreamIds) {
+ String psUniqueId = getPolicyStreamUniqueId(policyId, streamId);
+ newPSPublishPluginMapping.putIfAbsent(psUniqueId, new HashSet<>());
+ newPSPublishPluginMapping.get(psUniqueId).add(pubName);
+ }
}
}
- private synchronized void removePublihsPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> deletedPolicyIds, String pubName) {
+ private synchronized void removePublihsPoliciesStreams(Map<String, Set<String>> newPSPublishPluginMapping,
+ List<String> deletedPolicyIds, String pubName) {
if (deletedPolicyIds == null || pubName == null) {
return;
}
for (String policyId : deletedPolicyIds) {
- List<String> publishIds = newPolicyPublishPluginMapping.get(policyId);
- publishIds.remove(pubName);
+ for (Entry<String, Set<String>> entry : newPSPublishPluginMapping.entrySet()) {
+ if (entry.getKey().startsWith("policyId:" + policyId)) {
+ entry.getValue().remove(pubName);
+ break;
+ }
+ }
+ }
+ }
+
+ private String getPolicyStreamUniqueId(String policyId) {
+ return getPolicyStreamUniqueId(policyId, STREAM_NAME_DEFAULT);
+ }
+
+ private String getPolicyStreamUniqueId(String policyId, String streamId) {
+ if (StringUtils.isBlank(streamId)) {
+ streamId = STREAM_NAME_DEFAULT;
}
+ return String.format("policyId:%s,streamId:%s", policyId, streamId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 5cdb6f1..c95cab1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -136,6 +136,7 @@ public class TestAlertPublisherBolt {
public void testMapComparator() {
PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class);
+ PublishSpec spec3 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec3.json"), PublishSpec.class);
Map<String, Publishment> map1 = new HashMap<>();
Map<String, Publishment> map2 = new HashMap<>();
spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
@@ -148,6 +149,7 @@ public class TestAlertPublisherBolt {
AlertPublisherBolt publisherBolt = new AlertPublisherBolt("alert-publisher-test", null, null);
publisherBolt.onAlertPublishSpecChange(spec1, null);
publisherBolt.onAlertPublishSpecChange(spec2, null);
+ publisherBolt.onAlertPublishSpecChange(spec3, null);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
index 70ea6b3..a8f4105 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
@@ -11,6 +11,9 @@
"policy2",
"policy3"
],
+ "streamIds": [
+ "stream1"
+ ],
"dedupIntervalMin": "PT1M",
"properties": {
"subject": "Test Alert",
@@ -24,7 +27,8 @@
"mail.debug": "false",
"mail.connection": "tls",
"mail.smtp.port": "587"
- }
+ },
+ "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
}
/* {
"type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
index e14db43..e31e1f4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
@@ -9,6 +9,9 @@
"policyIds": [
"policy1"
],
+ "streamIds": [
+ "stream1"
+ ],
"dedupIntervalMin": "PT2M",
"properties": {
"subject": "Test Alert",
@@ -22,7 +25,8 @@
"mail.debug": "false",
"mail.connection": "tls",
"mail.smtp.port": "587"
- }
+ },
+ "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
}
// {
// "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json
new file mode 100644
index 0000000..0bf0e2a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json
@@ -0,0 +1,42 @@
+{
+ "version": "version1",
+ "topologyName": "testTopology",
+ "boltId": "alertPublishBolt",
+ "publishments": [
+ {
+ "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+ "name": "email-testAlertStream",
+ "policyIds": [
+ "policy1"
+ ],
+ "streamIds": [
+ "stream2"
+ ],
+ "dedupIntervalMin": "PT2M",
+ "properties": {
+ "subject": "Test Alert",
+ "template": "",
+ "sender": "sender@corp.com",
+ "recipients": "receiver@corp.com",
+ "mail.smtp.host": "smtp.mailhost.com",
+ "mail.smtp.auth": "true",
+ "mail.username": "username",
+ "mail.password": "password",
+ "mail.debug": "false",
+ "mail.connection": "tls",
+ "mail.smtp.port": "587"
+ },
+ "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
+ }
+ // {
+ // "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+ // "name":"kafka-testAlertStream",
+ // "policyIds": ["testPolicy"],
+ // "dedupIntervalMin": "PT1M",
+ // "properties":{
+ // "kafka_broker":"sandbox.hortonworks.com:6667",
+ // "topic":"test_kafka"
+ // }
+ // }
+ ]
+}
\ No newline at end of file
[2/2] incubator-eagle git commit: Merge branch 'eagle741' of
https://github.com/garrettlish/incubator-eagle
Posted by ra...@apache.org.
Merge branch 'eagle741' of https://github.com/garrettlish/incubator-eagle
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a1c5eca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a1c5eca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a1c5eca0
Branch: refs/heads/master
Commit: a1c5eca057f7f77663189264800205ff44e9e4f7
Parents: eaab4a9 ccc5ffb
Author: Ralph, Su <su...@gmail.com>
Authored: Tue Nov 8 10:25:32 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Tue Nov 8 10:25:32 2016 +0800
----------------------------------------------------------------------
.../alert/engine/coordinator/Publishment.java | 13 ++-
.../publisher/impl/AbstractPublishPlugin.java | 1 -
.../publisher/impl/AlertPublisherImpl.java | 101 +++++++++++++------
.../engine/router/TestAlertPublisherBolt.java | 2 +
.../src/test/resources/testPublishSpec.json | 6 +-
.../src/test/resources/testPublishSpec2.json | 6 +-
.../src/test/resources/testPublishSpec3.json | 42 ++++++++
7 files changed, 135 insertions(+), 36 deletions(-)
----------------------------------------------------------------------