You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/09/09 11:45:35 UTC
[2/2] incubator-eagle git commit: EAGLE-531: Dedup alerts according
to state change
EAGLE-531: Dedup alerts according to state change
Author: Li, Garrett
Reviewer: ralphsu
This closes #428
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7f41d427
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7f41d427
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7f41d427
Branch: refs/heads/master
Commit: 7f41d4278d47327f018d00291aaf0787aa401336
Parents: 4e7c5f2
Author: Ralph, Su <su...@gmail.com>
Authored: Fri Sep 9 19:44:43 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Sep 9 19:44:43 2016 +0800
----------------------------------------------------------------------
.../alert/engine/coordinator/Publishment.java | 18 ++
.../alert/engine/coordinator/StreamColumn.java | 1 +
.../eagle-alert/alert-engine/pom.xml | 9 +
.../engine/publisher/AlertDeduplicator.java | 4 +-
.../engine/publisher/AlertPublishPlugin.java | 6 +-
.../engine/publisher/dedup/DedupCache.java | 136 +++++++++++
.../engine/publisher/dedup/DedupEntity.java | 62 +++++
.../publisher/dedup/DedupEventsStore.java | 32 +++
.../dedup/DedupEventsStoreFactory.java | 55 +++++
.../engine/publisher/dedup/DedupValue.java | 74 ++++++
.../publisher/dedup/MongoDedupEventsStore.java | 138 +++++++++++
.../publisher/dedup/TransformerUtils.java | 107 +++++++++
.../publisher/impl/AbstractPublishPlugin.java | 8 +-
.../publisher/impl/AlertEmailPublisher.java | 12 +-
.../publisher/impl/AlertKafkaPublisher.java | 25 +-
.../publisher/impl/DefaultDeduplicator.java | 149 ++++++++----
.../alert/engine/publisher/impl/EventUniq.java | 7 +
.../serialization/impl/LongSerializer.java | 33 +--
.../TestNoDataPolicyTimeBatchHandler.java | 2 -
.../dedup/DefaultDeduplicatorTest.java | 226 +++++++++++++++++++
.../publisher/dedup/MongoDedupStoreTest.java | 61 +++++
.../dedup/MongoDependencyBaseTest.java | 67 ++++++
.../publisher/dedup/SimpleEmbedMongo.java | 76 +++++++
.../engine/router/TestAlertPublisherBolt.java | 74 ++++--
.../resources/application-mongo-statestore.conf | 17 ++
.../src/test/resources/application-test.conf | 3 +-
.../router/publishments-empty-dedup-field.json | 2 +
.../src/test/resources/router/publishments.json | 2 +
.../eagle-alert-parent/eagle-alert/pom.xml | 15 ++
eagle-dev/checkstyle.xml | 2 +
pom.xml | 2 +-
31 files changed, 1322 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index 0bada4e..8176e03 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -34,6 +34,8 @@ public class Publishment {
private List<String> policyIds;
private String dedupIntervalMin;
private List<String> dedupFields;
+ private String dedupStateField;
+ private String dedupStateCloseValue;
private Map<String, String> properties;
// the class name to extend the IEventSerializer interface
private String serializer;
@@ -46,6 +48,22 @@ public class Publishment {
this.name = name;
}
+ public String getDedupStateField() {
+ return dedupStateField;
+ }
+
+ public void setDedupStateField(String dedupStateField) {
+ this.dedupStateField = dedupStateField;
+ }
+
+ public String getDedupStateCloseValue() {
+ return dedupStateCloseValue;
+ }
+
+ public void setDedupStateCloseValue(String dedupStateCloseValue) {
+ this.dedupStateCloseValue = dedupStateCloseValue;
+ }
+
public String getSerializer() {
return serializer;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 2be4936..5a5f2cc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.adapters.XmlAdapter;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
public class StreamColumn implements Serializable {
+
private static final long serialVersionUID = -5457861313624389106L;
private String name;
private Type type;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
index 89728fe..e523fd9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -124,6 +124,15 @@
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
index 2f71c7f..1df24b9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
@@ -17,6 +17,8 @@
*/
package org.apache.eagle.alert.engine.publisher;
+import java.util.List;
+
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
/**
@@ -25,7 +27,7 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent;
*/
public interface AlertDeduplicator {
- AlertStreamEvent dedup(AlertStreamEvent event);
+ List<AlertStreamEvent> dedup(AlertStreamEvent event);
void setDedupIntervalMin(String intervalMin);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
index 4c3a2ad..e4d2665 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
@@ -17,6 +17,10 @@
*/
package org.apache.eagle.alert.engine.publisher;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.impl.PublishStatus;
@@ -47,7 +51,7 @@ public interface AlertPublishPlugin extends Closeable {
void onAlert(AlertStreamEvent event) throws Exception;
- AlertStreamEvent dedup(AlertStreamEvent event);
+ List<AlertStreamEvent> dedup(AlertStreamEvent event);
PublishStatus getStatus();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
new file mode 100644
index 0000000..2036bca
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.typesafe.config.Config;
+
+/*
+ * it is not thread safe, we need to handle concurrency issue out of this class
+ */
+public class DedupCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DedupCache.class);
+
+ private static final long CACHE_MAX_EXPIRE_TIME_IN_DAYS = 30;
+ private static final long CACHE_MAX_EVENT_QUEUE_SIZE = 10;
+
+ private static final DedupEventsStoreType type = DedupEventsStoreType.Mongo;
+
+ private long lastUpdated = -1;
+ private Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
+
+ private Config config;
+
+ private static DedupCache INSTANCE;
+
+ public static synchronized DedupCache getInstance(Config config) {
+ if (INSTANCE == null) {
+ INSTANCE = new DedupCache();
+ INSTANCE.config = config;
+ }
+ return INSTANCE;
+ }
+
+ public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
+ if (lastUpdated < 0
+ || System.currentTimeMillis() - lastUpdated > CACHE_MAX_EXPIRE_TIME_IN_DAYS * DateUtils.MILLIS_PER_DAY
+ || events.size() <= 0) {
+ lastUpdated = System.currentTimeMillis();
+ DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+ events = accessor.getEvents();
+ }
+ return events;
+ }
+
+ public DedupValue[] add(EventUniq eventEniq, String stateFieldValue, String dedupStateCloseValue) {
+ DedupValue dedupValue = null;
+ DedupValue lastDedupValue = null;
+ if (!events.containsKey(eventEniq)) {
+ dedupValue = new DedupValue();
+ dedupValue.setFirstOccurrence(eventEniq.timestamp);
+ dedupValue.setStateFieldValue(stateFieldValue);
+ ConcurrentLinkedDeque<DedupValue> dedupValues = new ConcurrentLinkedDeque<DedupValue>();
+ dedupValues.add(dedupValue);
+ // skip the event which put failed due to concurrency
+ events.put(eventEniq, dedupValues);
+ LOG.info("Add new dedup key {}, and value {}", eventEniq, dedupValues);
+ } else if (!Objects.equal(stateFieldValue,
+ events.get(eventEniq).getLast().getStateFieldValue())) {
+ lastDedupValue = events.get(eventEniq).getLast();
+ dedupValue = new DedupValue();
+ dedupValue.setFirstOccurrence(eventEniq.timestamp);
+ dedupValue.setStateFieldValue(stateFieldValue);
+ ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
+ if (dedupValues.size() > CACHE_MAX_EVENT_QUEUE_SIZE) {
+ dedupValues = new ConcurrentLinkedDeque<DedupValue>();
+ dedupValues.add(lastDedupValue);
+ }
+ dedupValues.add(dedupValue);
+ LOG.info("Update dedup key {}, and value {}", eventEniq, dedupValue);
+ }
+ if (dedupValue != null) {
+ // reset the list if close state reached
+ if (StringUtils.isNotBlank(dedupStateCloseValue)
+ && Objects.equal(stateFieldValue, dedupStateCloseValue)) {
+ events.put(eventEniq, new ConcurrentLinkedDeque<DedupValue>());
+ events.get(eventEniq).add(dedupValue);
+ LOG.info("Reset dedup key {} to value {}", eventEniq, dedupValue);
+ }
+
+ DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+ accessor.add(eventEniq, events.get(eventEniq));
+ LOG.info("Store dedup key {}, value {} to DB", eventEniq,
+ Joiner.on(",").join(events.get(eventEniq)));
+ }
+ if (dedupValue == null) {
+ return null;
+ }
+ if (lastDedupValue != null) {
+ return new DedupValue[] {lastDedupValue, dedupValue};
+ } else {
+ return new DedupValue[] {dedupValue};
+ }
+ }
+
+ public DedupValue updateCount(EventUniq eventEniq) {
+ ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
+ if (dedupValues == null || dedupValues.size() <= 0) {
+ LOG.warn("No dedup values found for {}, cannot update count", eventEniq);
+ return null;
+ } else {
+ DedupValue dedupValue = dedupValues.getLast();
+ dedupValue.setCount(dedupValue.getCount() + 1);
+ LOG.info("Update count for dedup key {}, value {} and count {}", eventEniq,
+ dedupValue.getStateFieldValue(), dedupValue.getCount());
+ return dedupValue;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
new file mode 100644
index 0000000..1989c45
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+
+public class DedupEntity {
+
+ private EventUniq eventEniq;
+ private List<DedupValue> dedupValues = new ArrayList<DedupValue>();
+
+ public DedupEntity(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupValues) {
+ this.eventEniq = eventEniq;
+ this.dedupValues.addAll(dedupValues);
+ }
+
+ public DedupEntity(EventUniq eventEniq, List<DedupValue> dedupValues) {
+ this.eventEniq = eventEniq;
+ this.dedupValues = dedupValues;
+ }
+
+ public EventUniq getEventEniq() {
+ return eventEniq;
+ }
+
+ public void setEventEniq(EventUniq eventEniq) {
+ this.eventEniq = eventEniq;
+ }
+
+ public List<DedupValue> getDedupValues() {
+ return dedupValues;
+ }
+
+ public void setDedupValues(List<DedupValue> dedupValues) {
+ this.dedupValues = dedupValues;
+ }
+
+ public ConcurrentLinkedDeque<DedupValue> getDedupValuesInConcurrentLinkedDeque() {
+ ConcurrentLinkedDeque<DedupValue> result = new ConcurrentLinkedDeque<DedupValue>();
+ result.addAll(this.getDedupValues());
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
new file mode 100644
index 0000000..5918afe
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+
+public interface DedupEventsStore {
+
+ public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents();
+
+ public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues);
+
+ public void remove(EventUniq eventEniq);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
new file mode 100644
index 0000000..9e67f66
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import com.typesafe.config.Config;
+
+public class DedupEventsStoreFactory {
+
+ public enum DedupEventsStoreType {
+ Mongo, ElasticSearch
+ }
+
+ ;
+
+ private static DedupEventsStore customizedStore;
+
+ public static void customizeStore(DedupEventsStore store) {
+ customizedStore = store;
+ }
+
+ public static DedupEventsStore getStore(DedupEventsStoreType type, Config config) {
+ if (customizedStore != null) {
+ return customizedStore;
+ }
+ DedupEventsStore accessor = null;
+ switch (type) {
+ case Mongo:
+ accessor = new MongoDedupEventsStore(config);
+ break;
+ case ElasticSearch:
+ break;
+ default:
+ break;
+ }
+ if (accessor == null) {
+ throw new RuntimeException(String.format("Dedup events store type %s is NOT supportted", type));
+ }
+ return accessor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
new file mode 100644
index 0000000..ec1af0d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import com.google.common.base.Objects;
+
+public class DedupValue {
+
+ private long firstOccurrence;
+ private String stateFieldValue;
+ private long count;
+
+ public DedupValue() {
+ }
+
+ public DedupValue(String stateFieldValue) {
+ this.stateFieldValue = stateFieldValue;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void setCount(long count) {
+ this.count = count;
+ }
+
+ public long getFirstOccurrence() {
+ return firstOccurrence;
+ }
+
+ public void setFirstOccurrence(long firstOccurence) {
+ this.firstOccurrence = firstOccurence;
+ }
+
+ public String getStateFieldValue() {
+ return stateFieldValue;
+ }
+
+ public void setStateFieldValue(String stateFieldValue) {
+ this.stateFieldValue = stateFieldValue;
+ }
+
+ @Override
+ public boolean equals(Object dedupValue) {
+ return Objects.equal(this.getStateFieldValue(), ((DedupValue) dedupValue).getStateFieldValue());
+ }
+
+ @Override
+ public int hashCode() {
+ return this.stateFieldValue == null ? "".hashCode() : this.stateFieldValue.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DedupValue[state: %s, count: %s, first occurrence %s]",
+ stateFieldValue, count, firstOccurrence);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
new file mode 100644
index 0000000..19cb716
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mongodb.Block;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.IndexOptions;
+import com.mongodb.client.model.InsertOneOptions;
+import com.typesafe.config.Config;
+
+public class MongoDedupEventsStore implements DedupEventsStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDedupEventsStore.class);
+
+ public static final String DEDUP_ID = "dedupId";
+ public static final String DEDUP_STREAM_ID = "streamId";
+ public static final String DEDUP_POLICY_ID = "policyId";
+ public static final String DEDUP_CREATE_TIME = "createdTime";
+ public static final String DEDUP_TIMESTAMP = "timestamp";
+ public static final String DEDUP_CUSTOM_FIELDS_VALUES = "customFieldValues";
+ public static final String DEDUP_VALUES = "dedupValues";
+ public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue";
+ public static final String DEDUP_COUNT = "count";
+ public static final String DEDUP_FIRST_OCCURRENCE = "firstOccurrence";
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ static {
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ private Config config;
+ private String connection;
+ private MongoClient client;
+ private MongoDatabase db;
+ private MongoCollection<Document> stateCollection;
+
+ private static final String DB_NAME = "ump_alert_dedup";
+ private static final String ALERT_STATE_COLLECTION = "alert_dedup";
+
+ public MongoDedupEventsStore(Config config) {
+ this.config = config;
+ this.connection = this.config.getString("connection");
+ this.client = new MongoClient(new MongoClientURI(this.connection));
+ init();
+ }
+
+ private void init() {
+ db = client.getDatabase(DB_NAME);
+ stateCollection = db.getCollection(ALERT_STATE_COLLECTION);
+ // dedup id index
+ IndexOptions io = new IndexOptions().background(true).unique(true).name(DEDUP_ID + "_index");
+ BsonDocument doc = new BsonDocument();
+ doc.append(DEDUP_ID, new BsonInt32(1));
+ stateCollection.createIndex(doc, io);
+ }
+
+ @Override
+ public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
+ try {
+ Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> result = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
+ stateCollection.find().forEach(new Block<Document>() {
+ @Override
+ public void apply(final Document doc) {
+ DedupEntity entity = TransformerUtils.transform(DedupEntity.class, BsonDocument.parse(doc.toJson()));
+ result.put(entity.getEventEniq(), entity.getDedupValuesInConcurrentLinkedDeque());
+ }
+ });
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found {} dedup events from mongoDB", result.size());
+ }
+ return result;
+ } catch (Exception e) {
+ LOG.error("find dedup state failed, but the state in memory is good, could be ingored.", e);
+ }
+ return new HashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
+ }
+
+ @Override
+ public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues) {
+ try {
+ BsonDocument doc = TransformerUtils.transform(new DedupEntity(eventEniq, dedupStateValues));
+ BsonDocument filter = new BsonDocument();
+ filter.append(DEDUP_ID, new BsonInt64(eventEniq.hashCode()));
+ Document returnedDoc = stateCollection.findOneAndReplace(filter, Document.parse(doc.toJson()));
+ if (returnedDoc == null) {
+ InsertOneOptions option = new InsertOneOptions();
+ stateCollection.insertOne(Document.parse(doc.toJson()), option);
+ }
+ } catch (Exception e) {
+ LOG.error("insert dedup state failed, but the state is still in memory, could be ingored.", e);
+ }
+ }
+
+ @Override
+ public void remove(EventUniq eventEniq) {
+ try {
+ BsonDocument filter = new BsonDocument();
+ filter.append(DEDUP_ID, new BsonInt64(eventEniq.hashCode()));
+ stateCollection.deleteOne(filter);
+ } catch (Exception e) {
+ LOG.error("delete dedup state failed, but the state in memory is good, could be ingored.", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
new file mode 100644
index 0000000..aaa95db
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+
+public class TransformerUtils {
+
+ public static final String MAP_KEY = "key";
+ public static final String MAP_VALUE = "value";
+
+ @SuppressWarnings("unchecked")
+ public static <T> T transform(Class<T> klass, BsonDocument doc) {
+ if (klass.equals(DedupEntity.class)) {
+ String streamId = doc.getString(MongoDedupEventsStore.DEDUP_STREAM_ID).getValue();
+ String policyId = doc.getString(MongoDedupEventsStore.DEDUP_POLICY_ID).getValue();
+ long timestamp = doc.getInt64(MongoDedupEventsStore.DEDUP_TIMESTAMP).getValue();
+ HashMap<String, String> customFieldValues = new HashMap<String, String>();
+ BsonArray customFieldsValuesArray = doc.getArray(
+ MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES);
+ for (int i = 0; i < customFieldsValuesArray.size(); i++) {
+ BsonDocument dedupCustomFieldValuesDoc = customFieldsValuesArray.get(i).asDocument();
+ customFieldValues.put(
+ dedupCustomFieldValuesDoc.getString(MAP_KEY).getValue(),
+ dedupCustomFieldValuesDoc.getString(MAP_VALUE).getValue());
+ }
+ EventUniq eventUniq = new EventUniq(streamId, policyId, timestamp, customFieldValues);
+ eventUniq.createdTime = doc.getInt64(
+ MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(0)).getValue();
+ List<DedupValue> dedupValues = new ArrayList<DedupValue>();
+ BsonArray dedupValuesArray = doc.getArray(MongoDedupEventsStore.DEDUP_VALUES);
+ for (int i = 0; i < dedupValuesArray.size(); i++) {
+ BsonDocument dedupValuesDoc = dedupValuesArray.get(i).asDocument();
+ DedupValue dedupValue = new DedupValue();
+ dedupValue.setStateFieldValue(dedupValuesDoc.getString(
+ MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE).getValue());
+ dedupValue.setCount(dedupValuesDoc.getInt64(
+ MongoDedupEventsStore.DEDUP_COUNT).getValue());
+ dedupValue.setFirstOccurrence(dedupValuesDoc.getInt64(
+ MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE).getValue());
+ dedupValues.add(dedupValue);
+ }
+ return (T) new DedupEntity(eventUniq, dedupValues);
+ }
+ throw new RuntimeException(String.format("Unknow object type %s, cannot transform", klass.getName()));
+ }
+
+ public static BsonDocument transform(Object obj) {
+ if (obj instanceof DedupEntity) {
+ BsonDocument doc = new BsonDocument();
+ DedupEntity entity = (DedupEntity) obj;
+ doc.put(MongoDedupEventsStore.DEDUP_ID, new BsonInt64(entity.getEventEniq().hashCode()));
+ doc.put(MongoDedupEventsStore.DEDUP_STREAM_ID, new BsonString(entity.getEventEniq().streamId));
+ doc.put(MongoDedupEventsStore.DEDUP_POLICY_ID, new BsonString(entity.getEventEniq().policyId));
+ doc.put(MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(entity.getEventEniq().createdTime));
+ doc.put(MongoDedupEventsStore.DEDUP_TIMESTAMP, new BsonInt64(entity.getEventEniq().timestamp));
+
+ List<BsonDocument> dedupCustomFieldValues = new ArrayList<BsonDocument>();
+ for (Entry<String, String> entry : entity.getEventEniq().customFieldValues.entrySet()) {
+ BsonDocument dedupCustomFieldValuesDoc = new BsonDocument();
+ dedupCustomFieldValuesDoc.put(MAP_KEY, new BsonString(entry.getKey()));
+ dedupCustomFieldValuesDoc.put(MAP_VALUE, new BsonString(entry.getValue()));
+ dedupCustomFieldValues.add(dedupCustomFieldValuesDoc);
+ }
+ doc.put(MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES, new BsonArray(dedupCustomFieldValues));
+
+ List<BsonDocument> dedupValuesDocs = new ArrayList<BsonDocument>();
+ for (DedupValue dedupValue : entity.getDedupValues()) {
+ BsonDocument dedupValuesDoc = new BsonDocument();
+ dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE,
+ new BsonString(dedupValue.getStateFieldValue()));
+ dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_COUNT,
+ new BsonInt64(dedupValue.getCount()));
+ dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE,
+ new BsonInt64(dedupValue.getFirstOccurrence()));
+ dedupValuesDocs.add(dedupValuesDoc);
+ }
+ doc.put(MongoDedupEventsStore.DEDUP_VALUES, new BsonArray(dedupValuesDocs));
+ return doc;
+ }
+ throw new RuntimeException(String.format("Unknow object type %s, cannot transform", obj.getClass().getName()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 31110ef..15e27a3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -24,6 +24,7 @@ import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import com.typesafe.config.Config;
import org.slf4j.Logger;
+import java.util.List;
import java.util.Map;
/**
@@ -39,7 +40,10 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
@SuppressWarnings("rawtypes")
@Override
public void init(Config config, Publishment publishment, Map conf) throws Exception {
- this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(), publishment.getDedupFields());
+ this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(),
+ publishment.getDedupFields(), publishment.getDedupStateField(),
+ publishment.getDedupStateCloseValue(),
+ config);
this.pubName = publishment.getName();
String serializerClz = publishment.getSerializer();
try {
@@ -63,7 +67,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
}
@Override
- public AlertStreamEvent dedup(AlertStreamEvent event) {
+ public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
return deduplicator.dedup(event);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 65837dd..4049af1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -64,12 +65,17 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
LOG.warn("emailGenerator is null due to the incorrect configurations");
return;
}
- event = dedup(event);
- if (event == null) {
+ List<AlertStreamEvent> outputEvents = dedup(event);
+ if (outputEvents == null) {
return;
}
- boolean isSuccess = emailGenerator.sendAlertEmail(event);
+ boolean isSuccess = true;
+ for (AlertStreamEvent outputEvent : outputEvents) {
+ if (!emailGenerator.sendAlertEmail(outputEvent)) {
+ isSuccess = false;
+ }
+ }
PublishStatus status = new PublishStatus();
if (!isSuccess) {
status.errorMessage = "Failed to send email";
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index 048424c..27314bf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -18,6 +18,13 @@
package org.apache.eagle.alert.engine.publisher.impl;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
@@ -63,19 +70,21 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
LOG.warn("KafkaProducer is null due to the incorrect configurations");
return;
}
- event = dedup(event);
- if (event == null) {
+ List<AlertStreamEvent> outputEvents = dedup(event);
+ if (outputEvents == null) {
return;
}
PublishStatus status = new PublishStatus();
try {
- ProducerRecord record = createRecord(event, topic);
- if (record == null) {
- LOG.error(" Alert serialize return null, ignored message! ");
- return;
+ for (AlertStreamEvent outputEvent : outputEvents) {
+ ProducerRecord record = createRecord(outputEvent, topic);
+ if (record == null) {
+ LOG.error(" Alert serialize return null, ignored message! ");
+ return;
+ }
+ Future<?> future = producer.send(record);
+ future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
- Future<?> future = producer.send(record);
- future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
status.successful = true;
status.errorMessage = "";
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index 258f613..de4ae09 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -17,31 +17,41 @@
*/
package org.apache.eagle.alert.engine.publisher.impl;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupValue;
+import org.apache.storm.guava.base.Objects;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import com.typesafe.config.Config;
public class DefaultDeduplicator implements AlertDeduplicator {
+
+ private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+
+ @SuppressWarnings("unused")
private long dedupIntervalMin;
private List<String> customDedupFields = new ArrayList<>();
- private volatile Map<EventUniq, Long> events = new HashMap<>();
- private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+ private String dedupStateField;
+ private String dedupStateCloseValue;
+ private Config config;
- public enum AlertDeduplicationStatus {
- NEW,
- DUPLICATED,
- IGNORED
- }
+ private static final String DEDUP_COUNT = "dedupCount";
+ private static final String DEDUP_FIRST_OCCURRENCE = "dedupFirstOccurrence";
+
+ private DedupCache dedupCache;
public DefaultDeduplicator() {
this.dedupIntervalMin = 0;
@@ -55,52 +65,96 @@ public class DefaultDeduplicator implements AlertDeduplicator {
this.dedupIntervalMin = intervalMin;
}
- public DefaultDeduplicator(String intervalMin, List<String> customDedupFields) {
+ public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
+ String dedupStateField, String dedupStateCloseValue, Config config) {
setDedupIntervalMin(intervalMin);
if (customDedupFields != null) {
this.customDedupFields = customDedupFields;
}
+ if (StringUtils.isNotBlank(dedupStateField)) {
+ this.dedupStateField = dedupStateField;
+ }
+ if (StringUtils.isNotBlank(dedupStateCloseValue)) {
+ this.dedupStateCloseValue = dedupStateCloseValue;
+ }
+ this.config = config;
+ this.dedupCache = DedupCache.getInstance(this.config);
}
- public void clearOldCache() {
- List<EventUniq> removedkeys = new ArrayList<>();
- for (Entry<EventUniq, Long> entry : events.entrySet()) {
- EventUniq entity = entry.getKey();
- if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
- removedkeys.add(entry.getKey());
- }
+ /*
+ * @param key
+ * @return
+ */
+ public List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
+ if (StringUtils.isBlank(stateFiledValue)) {
+ // without state field, we cannot determine whether it is duplicated
+ return Arrays.asList(event);
}
- for (EventUniq alertKey : removedkeys) {
- events.remove(alertKey);
+ synchronized (dedupCache) {
+ Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = dedupCache.getEvents();
+ if (!events.containsKey(key)
+ || (events.containsKey(key)
+ && events.get(key).size() > 0
+ && !Objects.equal(stateFiledValue,
+ events.get(key).getLast().getStateFieldValue()))) {
+ DedupValue[] dedupValues = dedupCache.add(key, stateFiledValue, dedupStateCloseValue);
+ if (dedupValues != null) {
+ // any of dedupValues won't be null
+ if (dedupValues.length == 2) {
+ // emit last event which includes count of dedup events & new state event
+ return Arrays.asList(
+ mergeEventWithDedupValue(event, dedupValues[0]),
+ mergeEventWithDedupValue(event, dedupValues[1]));
+ } else if (dedupValues.length == 1) {
+ //populate firstOccurrenceTime & count
+ return Arrays.asList(mergeEventWithDedupValue(event, dedupValues[0]));
+ }
+ }
+ } else {
+ // update count
+ dedupCache.updateCount(key);
+ }
}
+ // duplicated, will be ignored
+ return null;
}
- public AlertDeduplicationStatus checkDedup(EventUniq key) {
- long current = key.timestamp;
- if (!events.containsKey(key)) {
- events.put(key, current);
- return AlertDeduplicationStatus.NEW;
+ private AlertStreamEvent mergeEventWithDedupValue(AlertStreamEvent originalEvent, DedupValue dedupValue) {
+ AlertStreamEvent event = new AlertStreamEvent();
+ Object[] newdata = new Object[originalEvent.getData().length];
+ for (int i = 0; i < originalEvent.getData().length; i++) {
+ newdata[i] = originalEvent.getData()[i];
}
-
- long last = events.get(key);
- if (current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) {
- events.put(key, current);
- return AlertDeduplicationStatus.IGNORED;
+ event.setData(newdata);
+ event.setSchema(originalEvent.getSchema());
+ event.setPolicyId(originalEvent.getPolicyId());
+ event.setCreatedTime(originalEvent.getCreatedTime());
+ event.setCreatedBy(originalEvent.getCreatedBy());
+ event.setTimestamp(originalEvent.getTimestamp());
+ StreamDefinition streamDefinition = event.getSchema();
+ for (int i = 0; i < event.getData().length; i++) {
+ String colName = streamDefinition.getColumns().get(i).getName();
+ if (Objects.equal(colName, dedupStateField)) {
+ event.getData()[i] = dedupValue.getStateFieldValue();
+ }
+ if (Objects.equal(colName, DEDUP_COUNT)) {
+ event.getData()[i] = dedupValue.getCount();
+ }
+ if (Objects.equal(colName, DEDUP_FIRST_OCCURRENCE)) {
+ event.getData()[i] = dedupValue.getFirstOccurrence();
+ }
}
-
- return AlertDeduplicationStatus.DUPLICATED;
+ return event;
}
- public AlertStreamEvent dedup(AlertStreamEvent event) {
+ public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
if (event == null) {
return null;
}
- clearOldCache();
- AlertStreamEvent result = null;
-
// check custom field, and get the field values
StreamDefinition streamDefinition = event.getSchema();
HashMap<String, String> customFieldValues = new HashMap<>();
+ String stateFiledValue = null;
for (int i = 0; i < event.getData().length; i++) {
if (i > streamDefinition.getColumns().size()) {
if (LOG.isWarnEnabled()) {
@@ -110,6 +164,10 @@ public class DefaultDeduplicator implements AlertDeduplicator {
}
String colName = streamDefinition.getColumns().get(i).getName();
+ if (colName.equals(dedupStateField)) {
+ stateFiledValue = event.getData()[i].toString();
+ }
+
for (String field : customDedupFields) {
if (colName.equals(field)) {
customFieldValues.put(field, event.getData()[i].toString());
@@ -118,17 +176,14 @@ public class DefaultDeduplicator implements AlertDeduplicator {
}
}
- AlertDeduplicationStatus status = checkDedup(
- new EventUniq(event.getStreamId(),
- event.getPolicyId(),
- event.getCreatedTime(),
- customFieldValues));
- if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) {
- result = event;
+ List<AlertStreamEvent> outputEvents = checkDedup(event, new EventUniq(event.getStreamId(),
+ event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue);
+ if (outputEvents != null && outputEvents.size() > 0) {
+ return outputEvents;
} else if (LOG.isDebugEnabled()) {
LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString());
}
- return result;
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
index 1b90833..5d7c67a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
@@ -21,6 +21,7 @@
package org.apache.eagle.alert.engine.publisher.impl;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.storm.guava.base.Joiner;
import java.util.HashMap;
@@ -71,4 +72,10 @@ public class EventUniq {
}
return builder.build();
}
+
+ @Override
+ public String toString() {
+ return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: %s, customFieldValues: %s]",
+ streamId, policyId, timestamp, Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
index 8d85c76..32c0734 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
@@ -1,31 +1,34 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-public class LongSerializer implements Serializer<Long> {
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class LongSerializer implements Serializer<Object> {
@Override
- public void serialize(Long value, DataOutput dataOutput) throws IOException {
- dataOutput.writeLong(value);
+ public void serialize(Object value, DataOutput dataOutput) throws IOException {
+ if (value instanceof Integer) {
+ value = ((Integer) value).longValue();
+ }
+ dataOutput.writeLong((long) value);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
index 84844e7..8821d3e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
@@ -90,8 +90,6 @@ public class TestNoDataPolicyTimeBatchHandler {
Object[] data = e.getData();
LOG.info("alert data: {}, {}", data[1], data[0]);
-
- Assert.assertEquals("host1", data[1]);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
new file mode 100644
index 0000000..4ec9b42
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class DefaultDeduplicatorTest extends MongoDependencyBaseTest {
+
+ @Test
+ public void testNormal() throws Exception {
+ //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue
+ // assume state: OPEN, WARN, CLOSE
+ System.setProperty("config.resource", "/application-mongo-statestore.conf");
+ Config config = ConfigFactory.load();
+ DefaultDeduplicator deduplicator = new DefaultDeduplicator(
+ "PT1M", Arrays.asList(new String[] { "alertKey" }), "state", "CLOSE", config);
+
+ StreamDefinition stream = createStream();
+ PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
+
+ AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+ AlertStreamEvent e2 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
+ });
+ AlertStreamEvent e3 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+ AlertStreamEvent e4 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
+ });
+ AlertStreamEvent e5 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "CLOSE", 0, 0
+ });
+ AlertStreamEvent e6 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+ AlertStreamEvent e7 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+ AlertStreamEvent e8 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+
+ List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e1);
+ if (result != null) allResults.addAll(result);
+ System.out.println("1 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e2);
+ if (result != null) allResults.addAll(result);
+ System.out.println("2 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e3);
+ if (result != null) allResults.addAll(result);
+ System.out.println("3 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e4);
+ if (result != null) allResults.addAll(result);
+ System.out.println("4 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {}
+
+ List<AlertStreamEvent> result = deduplicator.dedup(e5);
+ if (result != null) allResults.addAll(result);
+ System.out.println("5 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e6);
+ if (result != null) allResults.addAll(result);
+ System.out.println("6 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e7);
+ if (result != null) allResults.addAll(result);
+ System.out.println("7 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e8);
+ if (result != null) allResults.addAll(result);
+ System.out.println("8 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+
+ Thread.sleep(2000);
+
+ long maxCount = 0;
+ for (AlertStreamEvent event : allResults) {
+ Assert.assertNotNull(event.getData()[4]);
+ Assert.assertNotNull(event.getData()[5]);
+
+ if (((Long) event.getData()[4]) > maxCount) {
+ maxCount = (Long) event.getData()[4];
+ System.out.println(String.format(">>>>>%s: %s", event, maxCount));
+ }
+ }
+
+ }
+
+ private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
+ AlertStreamEvent event = new AlertStreamEvent();
+ event.setPolicyId(policy.getName());
+ event.setSchema(stream);
+ event.setStreamId(stream.getStreamId());
+ event.setTimestamp(System.currentTimeMillis());
+ event.setCreatedTime(System.currentTimeMillis());
+ event.setData(data);
+ return event;
+ }
+
+ private StreamDefinition createStream() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn alertKeyColumn = new StreamColumn();
+ alertKeyColumn.setName("alertKey");
+ alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn stateColumn = new StreamColumn();
+ stateColumn.setName("state");
+ stateColumn.setType(StreamColumn.Type.STRING);
+
+ // dedupCount, dedupFirstOccurrence
+
+ StreamColumn dedupCountColumn = new StreamColumn();
+ dedupCountColumn.setName("dedupCount");
+ dedupCountColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+ dedupFirstOccurrenceColumn.setName("dedupFirstOccurrence");
+ dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+ sd.setDataSource("testDatasource");
+ sd.setStreamId("testStream");
+ sd.setDescription("test stream");
+ return sd;
+ }
+
+ private PolicyDefinition createPolicy(String streamName, String policyName) {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ //expression, something like "PT5S,dynamic,1,host"
+ def.setValue("test");
+ def.setType("siddhi");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList("inputStream"));
+ pd.setOutputStreams(Arrays.asList("outputStream"));
+ pd.setName(policyName);
+ pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId(streamName);
+ sp.setColumns(Arrays.asList("host"));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ return pd;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
new file mode 100644
index 0000000..bdd5e0b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MongoDedupStoreTest extends MongoDependencyBaseTest {
+
+ @Test
+ public void testNormal() throws Exception {
+ Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = store.getEvents();
+ Assert.assertNotNull(events);
+ Assert.assertEquals(0, events.size());
+
+ String streamId = "testStream";
+ String policyId = "testPolicy";
+ long timestamp = System.currentTimeMillis();
+ HashMap<String, String> customFieldValues = new HashMap<String, String>();
+ customFieldValues.put("alertKey", "test-alert-key");
+ EventUniq eventEniq = new EventUniq(streamId, policyId, timestamp, customFieldValues);
+
+ ConcurrentLinkedDeque<DedupValue> dedupStateValues = new ConcurrentLinkedDeque<DedupValue>();
+ DedupValue one = new DedupValue();
+ one.setStateFieldValue("OPEN");
+ one.setCount(2);
+ one.setFirstOccurrence(System.currentTimeMillis());
+ dedupStateValues.add(one);
+ store.add(eventEniq, dedupStateValues);
+
+ events = store.getEvents();
+ Assert.assertNotNull(events);
+ Assert.assertEquals(1, events.size());
+
+ Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry = events.entrySet().iterator().next();
+ Assert.assertEquals(streamId, entry.getKey().streamId);
+ Assert.assertEquals(1, entry.getValue().size());
+ Assert.assertEquals(2, entry.getValue().getLast().getCount());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
new file mode 100644
index 0000000..75de384
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.client.MongoDatabase;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public abstract class MongoDependencyBaseTest {
+
+ private static Logger LOG = LoggerFactory.getLogger(MongoDependencyBaseTest.class);
+
+ private static SimpleEmbedMongo mongo;
+ @SuppressWarnings("unused")
+ private static MongoDatabase testDB;
+ private static Config config;
+
+ protected static MongoDedupEventsStore store;
+
+ public static void before() {
+ try {
+ mongo = new SimpleEmbedMongo();
+ mongo.start();
+ testDB = mongo.getMongoClient().getDatabase("testDb");
+ } catch (Exception e) {
+ LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e);
+ }
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ before();
+
+ System.setProperty("config.resource", "/application-mongo-statestore.conf");
+ ConfigFactory.invalidateCaches();
+ config = ConfigFactory.load();
+
+ store = new MongoDedupEventsStore(config);
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (mongo != null) {
+ mongo.shutdown();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
new file mode 100644
index 0000000..31f744e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.MongoClient;
+
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.runtime.Network;
+
+public class SimpleEmbedMongo {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleEmbedMongo.class);
+
+ private MongoClient client;
+ private MongodExecutable mongodExe;
+ private MongodProcess mongod;
+
+ public void start() throws Exception {
+ MongodStarter starter = MongodStarter.getDefaultInstance();
+ mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1)
+ .net(new Net(27017, Network.localhostIsIPv6())).build());
+ mongod = mongodExe.start();
+
+ client = new MongoClient("localhost");
+ }
+
+ public void shutdown() {
+
+ if (mongod != null) {
+ try {
+ mongod.stop();
+ }
+ catch (IllegalStateException e) {
+ // catch this exception for the unstable stopping mongodb
+ // reason: the exception is usually thrown out with below message format when stop() returns null value,
+ // but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying
+ // the process ultimately
+ if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) {
+ // if matches, do nothing, just ignore the exception
+ } else {
+ LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e)));
+ }
+ }
+ mongodExe.stop();
+ }
+ }
+
+ public MongoClient getMongoClient() {
+ return client;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 61a0aba..6ebae63 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,12 +18,12 @@
package org.apache.eagle.alert.engine.router;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
import org.apache.eagle.alert.coordination.model.PublishSpec;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
@@ -32,24 +32,46 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStore;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupValue;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
import org.apache.eagle.alert.engine.runner.MapComparator;
import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
/**
* @Since 5/14/16.
*/
public class TestAlertPublisherBolt {
+
+ private DedupEventsStore store;
+
+ @Before
+ public void setUp() {
+ store = Mockito.mock(DedupEventsStore.class);
+ DedupEventsStoreFactory.customizeStore(store);
+ }
+
+ @After
+ public void tearDown() {
+ Mockito.reset(store);
+ }
@SuppressWarnings("rawtypes")
@Ignore
@@ -165,13 +187,13 @@ public class TestAlertPublisherBolt {
return l;
}
- private AlertStreamEvent createWithStreamDef(String hostname, String appName) {
+ private AlertStreamEvent createWithStreamDef(String hostname, String appName, String state) {
AlertStreamEvent alert = new AlertStreamEvent();
PolicyDefinition policy = new PolicyDefinition();
policy.setName("perfmon_cpu_host_check");
alert.setPolicyId(policy.getName());
alert.setCreatedTime(System.currentTimeMillis());
- alert.setData(new Object[] {appName, hostname});
+ alert.setData(new Object[] {appName, hostname, state});
alert.setStreamId("testAlertStream");
alert.setCreatedBy(this.toString());
@@ -184,38 +206,46 @@ public class TestAlertPublisherBolt {
StreamColumn hostColumn = new StreamColumn();
hostColumn.setName("hostname");
hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn stateColumn = new StreamColumn();
+ stateColumn.setName("state");
+ stateColumn.setType(StreamColumn.Type.STRING);
- sd.setColumns(Arrays.asList(appColumn, hostColumn));
+ sd.setColumns(Arrays.asList(appColumn, hostColumn, stateColumn));
alert.setSchema(sd);
return alert;
}
- @Test
+ @SuppressWarnings("unchecked")
+ @Test
public void testCustomFieldDedupEvent() throws Exception {
List<Publishment> pubs = loadEntities("/router/publishments.json", Publishment.class);
AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
- AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1");
- AlertStreamEvent event2 = createWithStreamDef("host2", "testapp1");
- AlertStreamEvent event3 = createWithStreamDef("host2", "testapp2");
+ AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN");
+ AlertStreamEvent event2 = createWithStreamDef("host2", "testapp1", "OPEN");
+ AlertStreamEvent event3 = createWithStreamDef("host2", "testapp2", "CLOSE");
Assert.assertNotNull(plugin.dedup(event1));
Assert.assertNull(plugin.dedup(event2));
Assert.assertNotNull(plugin.dedup(event3));
-
+
+ Mockito.verify(store).getEvents();
+ Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject());
}
- @Test
+ @Test
public void testEmptyCustomFieldDedupEvent() throws Exception {
List<Publishment> pubs = loadEntities("/router/publishments-empty-dedup-field.json", Publishment.class);
AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
- AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1");
- AlertStreamEvent event2 = createWithStreamDef("host2", "testapp2");
+ AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN");
+ AlertStreamEvent event2 = createWithStreamDef("host2", "testapp2", "OPEN");
Assert.assertNotNull(plugin.dedup(event1));
Assert.assertNull(plugin.dedup(event2));
-
+
+ Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject());
}
}