You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/08/17 09:12:49 UTC
incubator-eagle git commit: EAGLE-462: Alert Dudup
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 2cd17cbfa -> 48bb082e7
EAGLE-462: Alert Dudup
Author: Zeng, Bryant
Reviewer: Su, Ralph (ralphsu)
This closes #352
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/48bb082e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/48bb082e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/48bb082e
Branch: refs/heads/develop
Commit: 48bb082e709b029aee61684fff7a4eb2522aa153
Parents: 2cd17cb
Author: Ralph, Su <su...@gmail.com>
Authored: Wed Aug 17 17:12:55 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Wed Aug 17 17:12:55 2016 +0800
----------------------------------------------------------------------
.../engine/coordinator/PolicyDefinition.java | 2 +-
.../alert/engine/coordinator/Publishment.java | 13 +++-
.../alert/engine/model/AlertStreamEvent.java | 1 -
.../publisher/impl/AbstractPublishPlugin.java | 7 +-
.../publisher/impl/AlertEmailPublisher.java | 14 ++--
.../publisher/impl/DefaultDeduplicator.java | 55 +++++++++++++---
.../alert/engine/publisher/impl/EventUniq.java | 27 ++++++--
.../engine/router/TestAlertPublisherBolt.java | 67 +++++++++++++++++---
.../src/test/resources/router/publishments.json | 23 +++++++
9 files changed, 170 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 37e17a7..363264e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -240,6 +240,6 @@ public class PolicyDefinition implements Serializable{
@Override
public String toString() {
- return String.format("{name=\"%s\",definition=%s}",this.getName(),this.getDefinition().toString());
+ return String.format("{name=\"%s\",definition=%s}",this.getName(),this.getDefinition()==null?"null": this.getDefinition().toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index 3ba2dcf..b16714f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -16,12 +16,12 @@
*/
package org.apache.eagle.alert.engine.coordinator;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
/**
* @since Apr 11, 2016
*
@@ -32,6 +32,7 @@ public class Publishment {
private String type;
private List<String> policyIds;
private String dedupIntervalMin;
+ private List<String> dedupFields;
private Map<String, String> properties;
// the class name to extend the IEventSerializer interface
private String serializer;
@@ -76,6 +77,14 @@ public class Publishment {
this.dedupIntervalMin = dedupIntervalMin;
}
+ public List<String> getDedupFields() {
+ return dedupFields;
+ }
+
+ public void setDedupFields(List<String> dedupFields) {
+ this.dedupFields = dedupFields;
+ }
+
public Map<String, String> getProperties() {
return properties;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index a73eccd..6e784c5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -44,7 +44,6 @@ public class AlertStreamEvent extends StreamEvent {
this.policy = policy;
}
-
public String getPolicyId() {
return policy.getName();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index bd21415..b71bf4a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -16,8 +16,7 @@
*/
package org.apache.eagle.alert.engine.publisher.impl;
-import java.util.Map;
-
+import com.typesafe.config.Config;
import org.apache.eagle.alert.engine.codec.IEventSerializer;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
@@ -25,7 +24,7 @@ import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.slf4j.Logger;
-import com.typesafe.config.Config;
+import java.util.Map;
/**
* @since Jun 3, 2016
@@ -41,7 +40,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
@SuppressWarnings("rawtypes")
@Override
public void init(Config config, Publishment publishment, Map conf) throws Exception {
- this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+ this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(), publishment.getDedupFields());
this.pubName = publishment.getName();
String serializerClz = publishment.getSerializer();
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 9d191c0..a87fc7d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -18,12 +18,7 @@
package org.apache.eagle.alert.engine.publisher.impl;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
+import com.typesafe.config.Config;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
@@ -33,7 +28,11 @@ import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.typesafe.config.Config;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class AlertEmailPublisher extends AbstractPublishPlugin {
@@ -69,6 +68,7 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
if(event == null) {
return;
}
+
boolean isSuccess = emailGenerator.sendAlertEmail(event);
PublishStatus status = new PublishStatus();
if(!isSuccess) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index 054d679..a5e7b5a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -17,21 +17,23 @@
*/
package org.apache.eagle.alert.engine.publisher.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
public class DefaultDeduplicator implements AlertDeduplicator {
private long dedupIntervalMin;
+ private List<String> customDedupFields = new ArrayList<>();
private volatile Map<EventUniq, Long> events = new HashMap<>();
private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
@@ -52,6 +54,11 @@ public class DefaultDeduplicator implements AlertDeduplicator {
public DefaultDeduplicator(long intervalMin) {
this.dedupIntervalMin = intervalMin;
}
+
+ public DefaultDeduplicator(String intervalMin, List<String> customDedupFields) {
+ setDedupIntervalMin(intervalMin);
+ this.customDedupFields = customDedupFields;
+ }
public void clearOldCache() {
List<EventUniq> removedkeys = new ArrayList<>();
@@ -65,7 +72,12 @@ public class DefaultDeduplicator implements AlertDeduplicator {
events.remove(alertKey);
}
}
-
+
+ /***
+ *
+ * @param key
+ * @return
+ */
public AlertDeduplicationStatus checkDedup(EventUniq key) {
long current = key.timestamp;
if(!events.containsKey(key)) {
@@ -78,7 +90,7 @@ public class DefaultDeduplicator implements AlertDeduplicator {
events.put(key, current);
return AlertDeduplicationStatus.IGNORED;
}
-
+
return AlertDeduplicationStatus.DUPLICATED;
}
@@ -86,7 +98,32 @@ public class DefaultDeduplicator implements AlertDeduplicator {
if (event == null) return null;
clearOldCache();
AlertStreamEvent result = null;
- AlertDeduplicationStatus status = checkDedup(new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime()));
+
+ // check custom field, and get the field values
+ StreamDefinition streamDefinition = event.getSchema();
+ HashMap<String, String> customFieldValues = new HashMap<>();
+ for (int i = 0; i < event.getData().length; i++){
+ if (i > streamDefinition.getColumns().size()) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("output column does not found for event data, this indicate code error!");
+ }
+ continue;
+ }
+ String colName = streamDefinition.getColumns().get(i).getName();
+
+ for (String field : customDedupFields){
+ if (colName.equals(field)){
+ customFieldValues.put(field, event.getData()[i].toString());
+ break;
+ }
+ }
+ }
+
+ AlertDeduplicationStatus status = checkDedup(
+ new EventUniq(event.getStreamId(),
+ event.getPolicyId(),
+ event.getCreatedTime(),
+ customFieldValues));
if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) {
result = event;
} else if(LOG.isDebugEnabled()){
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
index df472d0..79fa6cc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
@@ -22,6 +22,8 @@ package org.apache.eagle.alert.engine.publisher.impl;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+import java.util.HashMap;
+
/**
* @since Mar 19, 2015
*/
@@ -30,6 +32,7 @@ public class EventUniq {
public String policyId;
public Long timestamp; // event's createTimestamp
public long createdTime; // created time, for cache removal;
+ public HashMap<String, String> customFieldValues;
public EventUniq(String streamId, String policyId, long timestamp) {
this.streamId = streamId;
@@ -38,20 +41,34 @@ public class EventUniq {
this.createdTime = System.currentTimeMillis();
}
+ public EventUniq(String streamId, String policyId, long timestamp, HashMap<String, String> customFieldValues) {
+ this.streamId = streamId;
+ this.timestamp = timestamp;
+ this.policyId = policyId;
+ this.createdTime = System.currentTimeMillis();
+ this.customFieldValues = customFieldValues;
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof EventUniq) {
EventUniq au = (EventUniq) obj;
- return (this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId));
+ boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId);
+ if (this.customFieldValues != null && au.customFieldValues != null) {
+ result = result & this.customFieldValues.equals(au.customFieldValues);
+ }
+ return result;
}
return false;
}
@Override
public int hashCode() {
- return new HashCodeBuilder()
- .append(streamId)
- .append(policyId)
- .build();
+ HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId);
+
+ if (customFieldValues != null){
+ builder.append(customFieldValues);
+ }
+ return builder.build();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 01f6de7..9afa21d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,15 +18,21 @@
package org.apache.eagle.alert.engine.router;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.eagle.alert.coordination.model.PublishSpec;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
import org.apache.eagle.alert.engine.runner.MapComparator;
@@ -35,12 +41,10 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* @Since 5/14/16.
@@ -108,4 +112,47 @@ public class TestAlertPublisherBolt {
return l;
}
+ private AlertStreamEvent createWithStreamDef(String hostname, String appName){
+ AlertStreamEvent alert = new AlertStreamEvent();
+ PolicyDefinition policy = new PolicyDefinition();
+ policy.setName("perfmon_cpu_host_check");
+ alert.setPolicy(policy);
+ alert.setCreatedTime(System.currentTimeMillis());
+ alert.setData(new Object[]{appName, hostname});
+ alert.setStreamId("testAlertStream");
+ alert.setCreatedBy(this.toString());
+
+ // build stream definition
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn appColumn = new StreamColumn();
+ appColumn.setName("appname");
+ appColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("hostname");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ sd.setColumns(Arrays.asList(appColumn, hostColumn));
+
+ alert.setSchema(sd);
+ return alert;
+ }
+
+ @Test
+ public void testCustomFieldDedupEvent() throws Exception {
+ List<Publishment> pubs = loadEntities("/router/publishments.json", Publishment.class);
+
+ AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
+ AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1");
+ AlertStreamEvent event2 = createWithStreamDef("host2", "testapp1");
+ AlertStreamEvent event3 = createWithStreamDef("host2", "testapp2");
+
+ Assert.assertNotNull(plugin.dedup(event1));
+ Assert.assertNull(plugin.dedup(event2));
+ Assert.assertNotNull(plugin.dedup(event3));
+
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/48bb082e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json
new file mode 100644
index 0000000..4db6590
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json
@@ -0,0 +1,23 @@
+[
+{
+ "name":"test-stream-output",
+ "type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+ "policyIds": [
+ "perfmon_cpu_host_check", "perfmon_cpu_pool_check"
+ ],
+ "properties": {
+ "subject":"Eagle Test Alert",
+ "template":"",
+ "sender": "sender@corp.com",
+ "recipients": "receiver@corp.com",
+ "smtp.server":"mailhost.com",
+ "connection": "plaintext",
+ "smtp.port": "25"
+ },
+ "dedupIntervalMin" : "PT1M",
+ "dedupFields": [
+ "appname"
+ ],
+ "serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+}
+]
\ No newline at end of file