You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/09/09 11:45:35 UTC

[2/2] incubator-eagle git commit: EAGLE-531: Dedup alerts according to state change

EAGLE-531: Dedup alerts according to state change

Author: Li, Garrett
Reviewer: ralphsu

This closes #428


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7f41d427
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7f41d427
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7f41d427

Branch: refs/heads/master
Commit: 7f41d4278d47327f018d00291aaf0787aa401336
Parents: 4e7c5f2
Author: Ralph, Su <su...@gmail.com>
Authored: Fri Sep 9 19:44:43 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Sep 9 19:44:43 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/Publishment.java   |  18 ++
 .../alert/engine/coordinator/StreamColumn.java  |   1 +
 .../eagle-alert/alert-engine/pom.xml            |   9 +
 .../engine/publisher/AlertDeduplicator.java     |   4 +-
 .../engine/publisher/AlertPublishPlugin.java    |   6 +-
 .../engine/publisher/dedup/DedupCache.java      | 136 +++++++++++
 .../engine/publisher/dedup/DedupEntity.java     |  62 +++++
 .../publisher/dedup/DedupEventsStore.java       |  32 +++
 .../dedup/DedupEventsStoreFactory.java          |  55 +++++
 .../engine/publisher/dedup/DedupValue.java      |  74 ++++++
 .../publisher/dedup/MongoDedupEventsStore.java  | 138 +++++++++++
 .../publisher/dedup/TransformerUtils.java       | 107 +++++++++
 .../publisher/impl/AbstractPublishPlugin.java   |   8 +-
 .../publisher/impl/AlertEmailPublisher.java     |  12 +-
 .../publisher/impl/AlertKafkaPublisher.java     |  25 +-
 .../publisher/impl/DefaultDeduplicator.java     | 149 ++++++++----
 .../alert/engine/publisher/impl/EventUniq.java  |   7 +
 .../serialization/impl/LongSerializer.java      |  33 +--
 .../TestNoDataPolicyTimeBatchHandler.java       |   2 -
 .../dedup/DefaultDeduplicatorTest.java          | 226 +++++++++++++++++++
 .../publisher/dedup/MongoDedupStoreTest.java    |  61 +++++
 .../dedup/MongoDependencyBaseTest.java          |  67 ++++++
 .../publisher/dedup/SimpleEmbedMongo.java       |  76 +++++++
 .../engine/router/TestAlertPublisherBolt.java   |  74 ++++--
 .../resources/application-mongo-statestore.conf |  17 ++
 .../src/test/resources/application-test.conf    |   3 +-
 .../router/publishments-empty-dedup-field.json  |   2 +
 .../src/test/resources/router/publishments.json |   2 +
 .../eagle-alert-parent/eagle-alert/pom.xml      |  15 ++
 eagle-dev/checkstyle.xml                        |   2 +
 pom.xml                                         |   2 +-
 31 files changed, 1322 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index 0bada4e..8176e03 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -34,6 +34,8 @@ public class Publishment {
     private List<String> policyIds;
     private String dedupIntervalMin;
     private List<String> dedupFields;
+    private String dedupStateField;
+    private String dedupStateCloseValue;
     private Map<String, String> properties;
     // the class name to extend the IEventSerializer interface
     private String serializer;
@@ -46,6 +48,22 @@ public class Publishment {
         this.name = name;
     }
 
+    public String getDedupStateField() {
+        return dedupStateField;
+    }
+
+    public void setDedupStateField(String dedupStateField) {
+        this.dedupStateField = dedupStateField;
+    }
+
+    public String getDedupStateCloseValue() {
+        return dedupStateCloseValue;
+    }
+
+    public void setDedupStateCloseValue(String dedupStateCloseValue) {
+        this.dedupStateCloseValue = dedupStateCloseValue;
+    }
+
     public String getSerializer() {
         return serializer;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 2be4936..5a5f2cc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.adapters.XmlAdapter;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
 public class StreamColumn implements Serializable {
+
     private static final long serialVersionUID = -5457861313624389106L;
     private String name;
     private Type type;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
index 89728fe..e523fd9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -124,6 +124,15 @@
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+        </dependency>
+		<dependency>
+ 			<groupId>de.flapdoodle.embed</groupId>
+ 			<artifactId>de.flapdoodle.embed.mongo</artifactId>
+ 			<scope>test</scope>
+ 		</dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
index 2f71c7f..1df24b9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.eagle.alert.engine.publisher;
 
+import java.util.List;
+
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 
 /**
@@ -25,7 +27,7 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent;
  */
 public interface AlertDeduplicator {
 
-    AlertStreamEvent dedup(AlertStreamEvent event);
+    List<AlertStreamEvent> dedup(AlertStreamEvent event);
 
     void setDedupIntervalMin(String intervalMin);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
index 4c3a2ad..e4d2665 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
@@ -17,6 +17,10 @@
  */
 package org.apache.eagle.alert.engine.publisher;
 
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.impl.PublishStatus;
@@ -47,7 +51,7 @@ public interface AlertPublishPlugin extends Closeable {
 
     void onAlert(AlertStreamEvent event) throws Exception;
 
-    AlertStreamEvent dedup(AlertStreamEvent event);
+    List<AlertStreamEvent> dedup(AlertStreamEvent event);
 
     PublishStatus getStatus();
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
new file mode 100644
index 0000000..2036bca
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.typesafe.config.Config;
+
+/*
+ * it is not thread safe, we need to handle concurrency issue out of this class
+ */
+public class DedupCache {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DedupCache.class);
+
+    private static final long CACHE_MAX_EXPIRE_TIME_IN_DAYS = 30;
+    private static final long CACHE_MAX_EVENT_QUEUE_SIZE = 10;
+
+    private static final DedupEventsStoreType type = DedupEventsStoreType.Mongo;
+
+    private long lastUpdated = -1;
+    private Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
+
+    private Config config;
+
+    private static DedupCache INSTANCE;
+
+    public static synchronized DedupCache getInstance(Config config) {
+        if (INSTANCE == null) {
+            INSTANCE = new DedupCache();
+            INSTANCE.config = config;
+        }
+        return INSTANCE;
+    }
+
+    public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
+        if (lastUpdated < 0
+            || System.currentTimeMillis() - lastUpdated > CACHE_MAX_EXPIRE_TIME_IN_DAYS * DateUtils.MILLIS_PER_DAY
+            || events.size() <= 0) {
+            lastUpdated = System.currentTimeMillis();
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+            events = accessor.getEvents();
+        }
+        return events;
+    }
+
+    public DedupValue[] add(EventUniq eventEniq, String stateFieldValue, String dedupStateCloseValue) {
+        DedupValue dedupValue = null;
+        DedupValue lastDedupValue = null;
+        if (!events.containsKey(eventEniq)) {
+            dedupValue = new DedupValue();
+            dedupValue.setFirstOccurrence(eventEniq.timestamp);
+            dedupValue.setStateFieldValue(stateFieldValue);
+            ConcurrentLinkedDeque<DedupValue> dedupValues = new ConcurrentLinkedDeque<DedupValue>();
+            dedupValues.add(dedupValue);
+            // skip the event which put failed due to concurrency
+            events.put(eventEniq, dedupValues);
+            LOG.info("Add new dedup key {}, and value {}", eventEniq, dedupValues);
+        } else if (!Objects.equal(stateFieldValue,
+            events.get(eventEniq).getLast().getStateFieldValue())) {
+            lastDedupValue = events.get(eventEniq).getLast();
+            dedupValue = new DedupValue();
+            dedupValue.setFirstOccurrence(eventEniq.timestamp);
+            dedupValue.setStateFieldValue(stateFieldValue);
+            ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
+            if (dedupValues.size() > CACHE_MAX_EVENT_QUEUE_SIZE) {
+                dedupValues = new ConcurrentLinkedDeque<DedupValue>();
+                dedupValues.add(lastDedupValue);
+            }
+            dedupValues.add(dedupValue);
+            LOG.info("Update dedup key {}, and value {}", eventEniq, dedupValue);
+        }
+        if (dedupValue != null) {
+            // reset the list if close state reached
+            if (StringUtils.isNotBlank(dedupStateCloseValue)
+                && Objects.equal(stateFieldValue, dedupStateCloseValue)) {
+                events.put(eventEniq, new ConcurrentLinkedDeque<DedupValue>());
+                events.get(eventEniq).add(dedupValue);
+                LOG.info("Reset dedup key {} to value {}", eventEniq, dedupValue);
+            }
+
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+            accessor.add(eventEniq, events.get(eventEniq));
+            LOG.info("Store dedup key {}, value {} to DB", eventEniq,
+                Joiner.on(",").join(events.get(eventEniq)));
+        }
+        if (dedupValue == null) {
+            return null;
+        }
+        if (lastDedupValue != null) {
+            return new DedupValue[] {lastDedupValue, dedupValue};
+        } else {
+            return new DedupValue[] {dedupValue};
+        }
+    }
+
+    public DedupValue updateCount(EventUniq eventEniq) {
+        ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
+        if (dedupValues == null || dedupValues.size() <= 0) {
+            LOG.warn("No dedup values found for {}, cannot update count", eventEniq);
+            return null;
+        } else {
+            DedupValue dedupValue = dedupValues.getLast();
+            dedupValue.setCount(dedupValue.getCount() + 1);
+            LOG.info("Update count for dedup key {}, value {} and count {}", eventEniq,
+                dedupValue.getStateFieldValue(), dedupValue.getCount());
+            return dedupValue;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
new file mode 100644
index 0000000..1989c45
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+
+public class DedupEntity {
+
+    private EventUniq eventEniq;
+    private List<DedupValue> dedupValues = new ArrayList<DedupValue>();
+
+    public DedupEntity(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupValues) {
+        this.eventEniq = eventEniq;
+        this.dedupValues.addAll(dedupValues);
+    }
+
+    public DedupEntity(EventUniq eventEniq, List<DedupValue> dedupValues) {
+        this.eventEniq = eventEniq;
+        this.dedupValues = dedupValues;
+    }
+
+    public EventUniq getEventEniq() {
+        return eventEniq;
+    }
+
+    public void setEventEniq(EventUniq eventEniq) {
+        this.eventEniq = eventEniq;
+    }
+
+    public List<DedupValue> getDedupValues() {
+        return dedupValues;
+    }
+
+    public void setDedupValues(List<DedupValue> dedupValues) {
+        this.dedupValues = dedupValues;
+    }
+
+    public ConcurrentLinkedDeque<DedupValue> getDedupValuesInConcurrentLinkedDeque() {
+        ConcurrentLinkedDeque<DedupValue> result = new ConcurrentLinkedDeque<DedupValue>();
+        result.addAll(this.getDedupValues());
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
new file mode 100644
index 0000000..5918afe
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+
+public interface DedupEventsStore {
+
+    public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents();
+
+    public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues);
+
+    public void remove(EventUniq eventEniq);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
new file mode 100644
index 0000000..9e67f66
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import com.typesafe.config.Config;
+
+public class DedupEventsStoreFactory {
+
+    public enum DedupEventsStoreType {
+        Mongo, ElasticSearch
+    }
+
+    ;
+
+    private static DedupEventsStore customizedStore;
+
+    public static void customizeStore(DedupEventsStore store) {
+        customizedStore = store;
+    }
+
+    public static DedupEventsStore getStore(DedupEventsStoreType type, Config config) {
+        if (customizedStore != null) {
+            return customizedStore;
+        }
+        DedupEventsStore accessor = null;
+        switch (type) {
+            case Mongo:
+                accessor = new MongoDedupEventsStore(config);
+                break;
+            case ElasticSearch:
+                break;
+            default:
+                break;
+        }
+        if (accessor == null) {
+            throw new RuntimeException(String.format("Dedup events store type %s is NOT supportted", type));
+        }
+        return accessor;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
new file mode 100644
index 0000000..ec1af0d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import com.google.common.base.Objects;
+
+public class DedupValue {
+
+    private long firstOccurrence;
+    private String stateFieldValue;
+    private long count;
+
+    public DedupValue() {
+    }
+
+    public DedupValue(String stateFieldValue) {
+        this.stateFieldValue = stateFieldValue;
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public void setCount(long count) {
+        this.count = count;
+    }
+
+    public long getFirstOccurrence() {
+        return firstOccurrence;
+    }
+
+    public void setFirstOccurrence(long firstOccurence) {
+        this.firstOccurrence = firstOccurence;
+    }
+
+    public String getStateFieldValue() {
+        return stateFieldValue;
+    }
+
+    public void setStateFieldValue(String stateFieldValue) {
+        this.stateFieldValue = stateFieldValue;
+    }
+
+    @Override
+    public boolean equals(Object dedupValue) {
+        return Objects.equal(this.getStateFieldValue(), ((DedupValue) dedupValue).getStateFieldValue());
+    }
+
+    @Override
+    public int hashCode() {
+        return this.stateFieldValue == null ? "".hashCode() : this.stateFieldValue.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DedupValue[state: %s, count: %s, first occurrence %s]",
+            stateFieldValue, count, firstOccurrence);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
new file mode 100644
index 0000000..19cb716
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mongodb.Block;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.IndexOptions;
+import com.mongodb.client.model.InsertOneOptions;
+import com.typesafe.config.Config;
+
+public class MongoDedupEventsStore implements DedupEventsStore {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDedupEventsStore.class);
+
+    public static final String DEDUP_ID = "dedupId";
+    public static final String DEDUP_STREAM_ID = "streamId";
+    public static final String DEDUP_POLICY_ID = "policyId";
+    public static final String DEDUP_CREATE_TIME = "createdTime";
+    public static final String DEDUP_TIMESTAMP = "timestamp";
+    public static final String DEDUP_CUSTOM_FIELDS_VALUES = "customFieldValues";
+    public static final String DEDUP_VALUES = "dedupValues";
+    public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue";
+    public static final String DEDUP_COUNT = "count";
+    public static final String DEDUP_FIRST_OCCURRENCE = "firstOccurrence";
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    private Config config;
+    private String connection;
+    private MongoClient client;
+    private MongoDatabase db;
+    private MongoCollection<Document> stateCollection;
+
+    private static final String DB_NAME = "ump_alert_dedup";
+    private static final String ALERT_STATE_COLLECTION = "alert_dedup";
+
+    public MongoDedupEventsStore(Config config) {
+        this.config = config;
+        this.connection = this.config.getString("connection");
+        this.client = new MongoClient(new MongoClientURI(this.connection));
+        init();
+    }
+
+    private void init() {
+        db = client.getDatabase(DB_NAME);
+        stateCollection = db.getCollection(ALERT_STATE_COLLECTION);
+        // dedup id index
+        IndexOptions io = new IndexOptions().background(true).unique(true).name(DEDUP_ID + "_index");
+        BsonDocument doc = new BsonDocument();
+        doc.append(DEDUP_ID, new BsonInt32(1));
+        stateCollection.createIndex(doc, io);
+    }
+
+    @Override
+    public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
+        try {
+            Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> result = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
+            stateCollection.find().forEach(new Block<Document>() {
+                @Override
+                public void apply(final Document doc) {
+                    DedupEntity entity = TransformerUtils.transform(DedupEntity.class, BsonDocument.parse(doc.toJson()));
+                    result.put(entity.getEventEniq(), entity.getDedupValuesInConcurrentLinkedDeque());
+                }
+            });
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Found {} dedup events from mongoDB", result.size());
+            }
+            return result;
+        } catch (Exception e) {
+            LOG.error("find dedup state failed, but the state in memory is good, could be ingored.", e);
+        }
+        return new HashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
+    }
+
+    @Override
+    public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues) {
+        try {
+            BsonDocument doc = TransformerUtils.transform(new DedupEntity(eventEniq, dedupStateValues));
+            BsonDocument filter = new BsonDocument();
+            filter.append(DEDUP_ID, new BsonInt64(eventEniq.hashCode()));
+            Document returnedDoc = stateCollection.findOneAndReplace(filter, Document.parse(doc.toJson()));
+            if (returnedDoc == null) {
+                InsertOneOptions option = new InsertOneOptions();
+                stateCollection.insertOne(Document.parse(doc.toJson()), option);
+            }
+        } catch (Exception e) {
+            LOG.error("insert dedup state failed, but the state is still in memory, could be ingored.", e);
+        }
+    }
+
+    @Override
+    public void remove(EventUniq eventEniq) {
+        try {
+            BsonDocument filter = new BsonDocument();
+            filter.append(DEDUP_ID, new BsonInt64(eventEniq.hashCode()));
+            stateCollection.deleteOne(filter);
+        } catch (Exception e) {
+            LOG.error("delete dedup state failed, but the state in memory is good, could be ingored.", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
new file mode 100644
index 0000000..aaa95db
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+
+public class TransformerUtils {
+
+    public static final String MAP_KEY = "key";
+    public static final String MAP_VALUE = "value";
+
+    @SuppressWarnings("unchecked")
+    public static <T> T transform(Class<T> klass, BsonDocument doc) {
+        if (klass.equals(DedupEntity.class)) {
+            String streamId = doc.getString(MongoDedupEventsStore.DEDUP_STREAM_ID).getValue();
+            String policyId = doc.getString(MongoDedupEventsStore.DEDUP_POLICY_ID).getValue();
+            long timestamp = doc.getInt64(MongoDedupEventsStore.DEDUP_TIMESTAMP).getValue();
+            HashMap<String, String> customFieldValues = new HashMap<String, String>();
+            BsonArray customFieldsValuesArray = doc.getArray(
+                MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES);
+            for (int i = 0; i < customFieldsValuesArray.size(); i++) {
+                BsonDocument dedupCustomFieldValuesDoc = customFieldsValuesArray.get(i).asDocument();
+                customFieldValues.put(
+                    dedupCustomFieldValuesDoc.getString(MAP_KEY).getValue(),
+                    dedupCustomFieldValuesDoc.getString(MAP_VALUE).getValue());
+            }
+            EventUniq eventUniq = new EventUniq(streamId, policyId, timestamp, customFieldValues);
+            eventUniq.createdTime = doc.getInt64(
+                MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(0)).getValue();
+            List<DedupValue> dedupValues = new ArrayList<DedupValue>();
+            BsonArray dedupValuesArray = doc.getArray(MongoDedupEventsStore.DEDUP_VALUES);
+            for (int i = 0; i < dedupValuesArray.size(); i++) {
+                BsonDocument dedupValuesDoc = dedupValuesArray.get(i).asDocument();
+                DedupValue dedupValue = new DedupValue();
+                dedupValue.setStateFieldValue(dedupValuesDoc.getString(
+                    MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE).getValue());
+                dedupValue.setCount(dedupValuesDoc.getInt64(
+                    MongoDedupEventsStore.DEDUP_COUNT).getValue());
+                dedupValue.setFirstOccurrence(dedupValuesDoc.getInt64(
+                    MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE).getValue());
+                dedupValues.add(dedupValue);
+            }
+            return (T) new DedupEntity(eventUniq, dedupValues);
+        }
+        throw new RuntimeException(String.format("Unknow object type %s, cannot transform", klass.getName()));
+    }
+
+    public static BsonDocument transform(Object obj) {
+        if (obj instanceof DedupEntity) {
+            BsonDocument doc = new BsonDocument();
+            DedupEntity entity = (DedupEntity) obj;
+            doc.put(MongoDedupEventsStore.DEDUP_ID, new BsonInt64(entity.getEventEniq().hashCode()));
+            doc.put(MongoDedupEventsStore.DEDUP_STREAM_ID, new BsonString(entity.getEventEniq().streamId));
+            doc.put(MongoDedupEventsStore.DEDUP_POLICY_ID, new BsonString(entity.getEventEniq().policyId));
+            doc.put(MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(entity.getEventEniq().createdTime));
+            doc.put(MongoDedupEventsStore.DEDUP_TIMESTAMP, new BsonInt64(entity.getEventEniq().timestamp));
+
+            List<BsonDocument> dedupCustomFieldValues = new ArrayList<BsonDocument>();
+            for (Entry<String, String> entry : entity.getEventEniq().customFieldValues.entrySet()) {
+                BsonDocument dedupCustomFieldValuesDoc = new BsonDocument();
+                dedupCustomFieldValuesDoc.put(MAP_KEY, new BsonString(entry.getKey()));
+                dedupCustomFieldValuesDoc.put(MAP_VALUE, new BsonString(entry.getValue()));
+                dedupCustomFieldValues.add(dedupCustomFieldValuesDoc);
+            }
+            doc.put(MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES, new BsonArray(dedupCustomFieldValues));
+
+            List<BsonDocument> dedupValuesDocs = new ArrayList<BsonDocument>();
+            for (DedupValue dedupValue : entity.getDedupValues()) {
+                BsonDocument dedupValuesDoc = new BsonDocument();
+                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE,
+                    new BsonString(dedupValue.getStateFieldValue()));
+                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_COUNT,
+                    new BsonInt64(dedupValue.getCount()));
+                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE,
+                    new BsonInt64(dedupValue.getFirstOccurrence()));
+                dedupValuesDocs.add(dedupValuesDoc);
+            }
+            doc.put(MongoDedupEventsStore.DEDUP_VALUES, new BsonArray(dedupValuesDocs));
+            return doc;
+        }
+        throw new RuntimeException(String.format("Unknow object type %s, cannot transform", obj.getClass().getName()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 31110ef..15e27a3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -24,6 +24,7 @@ import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -39,7 +40,10 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
     @SuppressWarnings("rawtypes")
     @Override
     public void init(Config config, Publishment publishment, Map conf) throws Exception {
-        this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(), publishment.getDedupFields());
+        this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(),
+            publishment.getDedupFields(), publishment.getDedupStateField(),
+            publishment.getDedupStateCloseValue(),
+            config);
         this.pubName = publishment.getName();
         String serializerClz = publishment.getSerializer();
         try {
@@ -63,7 +67,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
     }
 
     @Override
-    public AlertStreamEvent dedup(AlertStreamEvent event) {
+    public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
         return deduplicator.dedup(event);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 65837dd..4049af1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -64,12 +65,17 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
             LOG.warn("emailGenerator is null due to the incorrect configurations");
             return;
         }
-        event = dedup(event);
-        if (event == null) {
+        List<AlertStreamEvent> outputEvents = dedup(event);
+        if (outputEvents == null) {
             return;
         }
 
-        boolean isSuccess = emailGenerator.sendAlertEmail(event);
+        boolean isSuccess = true;
+        for (AlertStreamEvent outputEvent : outputEvents) {
+            if (!emailGenerator.sendAlertEmail(outputEvent)) {
+                isSuccess = false;
+            }
+        }
         PublishStatus status = new PublishStatus();
         if (!isSuccess) {
             status.errorMessage = "Failed to send email";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index 048424c..27314bf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -18,6 +18,13 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
@@ -63,19 +70,21 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             LOG.warn("KafkaProducer is null due to the incorrect configurations");
             return;
         }
-        event = dedup(event);
-        if (event == null) {
+        List<AlertStreamEvent> outputEvents = dedup(event);
+        if (outputEvents == null) {
             return;
         }
         PublishStatus status = new PublishStatus();
         try {
-            ProducerRecord record = createRecord(event, topic);
-            if (record == null) {
-                LOG.error(" Alert serialize return null, ignored message! ");
-                return;
+            for (AlertStreamEvent outputEvent : outputEvents) {
+                ProducerRecord record = createRecord(outputEvent, topic);
+                if (record == null) {
+                    LOG.error(" Alert serialize return null, ignored message! ");
+                    return;
+                }
+                Future<?> future = producer.send(record);
+                future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
             }
-            Future<?> future = producer.send(record);
-            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
             status.successful = true;
             status.errorMessage = "";
             if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index 258f613..de4ae09 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -17,31 +17,41 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupValue;
+import org.apache.storm.guava.base.Objects;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import com.typesafe.config.Config;
 
 public class DefaultDeduplicator implements AlertDeduplicator {
+
+    private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+
+    @SuppressWarnings("unused")
     private long dedupIntervalMin;
     private List<String> customDedupFields = new ArrayList<>();
-    private volatile Map<EventUniq, Long> events = new HashMap<>();
-    private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+    private String dedupStateField;
+    private String dedupStateCloseValue;
+    private Config config;
 
-    public enum AlertDeduplicationStatus {
-        NEW,
-        DUPLICATED,
-        IGNORED
-    }
+    private static final String DEDUP_COUNT = "dedupCount";
+    private static final String DEDUP_FIRST_OCCURRENCE = "dedupFirstOccurrence";
+
+    private DedupCache dedupCache;
 
     public DefaultDeduplicator() {
         this.dedupIntervalMin = 0;
@@ -55,52 +65,96 @@ public class DefaultDeduplicator implements AlertDeduplicator {
         this.dedupIntervalMin = intervalMin;
     }
 
-    public DefaultDeduplicator(String intervalMin, List<String> customDedupFields) {
+    public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
+                               String dedupStateField, String dedupStateCloseValue, Config config) {
         setDedupIntervalMin(intervalMin);
         if (customDedupFields != null) {
             this.customDedupFields = customDedupFields;
         }
+        if (StringUtils.isNotBlank(dedupStateField)) {
+            this.dedupStateField = dedupStateField;
+        }
+        if (StringUtils.isNotBlank(dedupStateCloseValue)) {
+            this.dedupStateCloseValue = dedupStateCloseValue;
+        }
+        this.config = config;
+        this.dedupCache = DedupCache.getInstance(this.config);
     }
 
-    public void clearOldCache() {
-        List<EventUniq> removedkeys = new ArrayList<>();
-        for (Entry<EventUniq, Long> entry : events.entrySet()) {
-            EventUniq entity = entry.getKey();
-            if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
-                removedkeys.add(entry.getKey());
-            }
+    /*
+     * @param key
+     * @return
+     */
+    public List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
+        if (StringUtils.isBlank(stateFiledValue)) {
+            // without state field, we cannot determine whether it is duplicated
+            return Arrays.asList(event);
         }
-        for (EventUniq alertKey : removedkeys) {
-            events.remove(alertKey);
+        synchronized (dedupCache) {
+            Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = dedupCache.getEvents();
+            if (!events.containsKey(key)
+                || (events.containsKey(key)
+                && events.get(key).size() > 0
+                && !Objects.equal(stateFiledValue,
+                        events.get(key).getLast().getStateFieldValue()))) {
+                DedupValue[] dedupValues = dedupCache.add(key, stateFiledValue, dedupStateCloseValue);
+                if (dedupValues != null) {
+                    // any of dedupValues won't be null
+                    if (dedupValues.length == 2) {
+                        // emit last event which includes count of dedup events & new state event
+                        return Arrays.asList(
+                            mergeEventWithDedupValue(event, dedupValues[0]),
+                            mergeEventWithDedupValue(event, dedupValues[1]));
+                    } else if (dedupValues.length == 1) {
+                        //populate firstOccurrenceTime & count
+                        return Arrays.asList(mergeEventWithDedupValue(event, dedupValues[0]));
+                    }
+                }
+            } else {
+                // update count
+                dedupCache.updateCount(key);
+            }
         }
+        // duplicated, will be ignored
+        return null;
     }
 
-    public AlertDeduplicationStatus checkDedup(EventUniq key) {
-        long current = key.timestamp;
-        if (!events.containsKey(key)) {
-            events.put(key, current);
-            return AlertDeduplicationStatus.NEW;
+    private AlertStreamEvent mergeEventWithDedupValue(AlertStreamEvent originalEvent, DedupValue dedupValue) {
+        AlertStreamEvent event = new AlertStreamEvent();
+        Object[] newdata = new Object[originalEvent.getData().length];
+        for (int i = 0; i < originalEvent.getData().length; i++) {
+            newdata[i] = originalEvent.getData()[i];
         }
-
-        long last = events.get(key);
-        if (current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) {
-            events.put(key, current);
-            return AlertDeduplicationStatus.IGNORED;
+        event.setData(newdata);
+        event.setSchema(originalEvent.getSchema());
+        event.setPolicyId(originalEvent.getPolicyId());
+        event.setCreatedTime(originalEvent.getCreatedTime());
+        event.setCreatedBy(originalEvent.getCreatedBy());
+        event.setTimestamp(originalEvent.getTimestamp());
+        StreamDefinition streamDefinition = event.getSchema();
+        for (int i = 0; i < event.getData().length; i++) {
+            String colName = streamDefinition.getColumns().get(i).getName();
+            if (Objects.equal(colName, dedupStateField)) {
+                event.getData()[i] = dedupValue.getStateFieldValue();
+            }
+            if (Objects.equal(colName, DEDUP_COUNT)) {
+                event.getData()[i] = dedupValue.getCount();
+            }
+            if (Objects.equal(colName, DEDUP_FIRST_OCCURRENCE)) {
+                event.getData()[i] = dedupValue.getFirstOccurrence();
+            }
         }
-
-        return AlertDeduplicationStatus.DUPLICATED;
+        return event;
     }
 
-    public AlertStreamEvent dedup(AlertStreamEvent event) {
+    public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
         if (event == null) {
             return null;
         }
-        clearOldCache();
-        AlertStreamEvent result = null;
-
         // check custom field, and get the field values
         StreamDefinition streamDefinition = event.getSchema();
         HashMap<String, String> customFieldValues = new HashMap<>();
+        String stateFiledValue = null;
         for (int i = 0; i < event.getData().length; i++) {
             if (i > streamDefinition.getColumns().size()) {
                 if (LOG.isWarnEnabled()) {
@@ -110,6 +164,10 @@ public class DefaultDeduplicator implements AlertDeduplicator {
             }
             String colName = streamDefinition.getColumns().get(i).getName();
 
+            if (colName.equals(dedupStateField)) {
+                stateFiledValue = event.getData()[i].toString();
+            }
+
             for (String field : customDedupFields) {
                 if (colName.equals(field)) {
                     customFieldValues.put(field, event.getData()[i].toString());
@@ -118,17 +176,14 @@ public class DefaultDeduplicator implements AlertDeduplicator {
             }
         }
 
-        AlertDeduplicationStatus status = checkDedup(
-            new EventUniq(event.getStreamId(),
-                event.getPolicyId(),
-                event.getCreatedTime(),
-                customFieldValues));
-        if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) {
-            result = event;
+        List<AlertStreamEvent> outputEvents = checkDedup(event, new EventUniq(event.getStreamId(),
+            event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue);
+        if (outputEvents != null && outputEvents.size() > 0) {
+            return outputEvents;
         } else if (LOG.isDebugEnabled()) {
             LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString());
         }
-        return result;
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
index 1b90833..5d7c67a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
@@ -21,6 +21,7 @@
 package org.apache.eagle.alert.engine.publisher.impl;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.storm.guava.base.Joiner;
 
 import java.util.HashMap;
 
@@ -71,4 +72,10 @@ public class EventUniq {
         }
         return builder.build();
     }
+
+    @Override
+    public String toString() {
+        return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: %s, customFieldValues: %s]",
+            streamId, policyId, timestamp, Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
index 8d85c76..32c0734 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
@@ -1,31 +1,34 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-public class LongSerializer implements Serializer<Long> {
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class LongSerializer implements Serializer<Object> {
     @Override
-    public void serialize(Long value, DataOutput dataOutput) throws IOException {
-        dataOutput.writeLong(value);
+    public void serialize(Object value, DataOutput dataOutput) throws IOException {
+        if (value instanceof Integer) {
+            value = ((Integer) value).longValue();
+        }
+        dataOutput.writeLong((long) value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
index 84844e7..8821d3e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
@@ -90,8 +90,6 @@ public class TestNoDataPolicyTimeBatchHandler {
             Object[] data = e.getData();
 
             LOG.info("alert data: {}, {}", data[1], data[0]);
-
-            Assert.assertEquals("host1", data[1]);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
new file mode 100644
index 0000000..4ec9b42
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class DefaultDeduplicatorTest extends MongoDependencyBaseTest {
+
+	@Test
+	public void testNormal() throws Exception {
+		//String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue
+		// assume state: OPEN, WARN, CLOSE
+		System.setProperty("config.resource", "/application-mongo-statestore.conf");
+		Config config = ConfigFactory.load();
+		DefaultDeduplicator deduplicator = new DefaultDeduplicator(
+				"PT1M", Arrays.asList(new String[] { "alertKey" }), "state", "CLOSE", config);
+		
+		StreamDefinition stream = createStream();
+		PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
+		
+		AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
+				System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+		});
+		AlertStreamEvent e2 = createEvent(stream, policy, new Object[] {
+				System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
+		});
+		AlertStreamEvent e3 = createEvent(stream, policy, new Object[] {
+				System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+		});
+		AlertStreamEvent e4 = createEvent(stream, policy, new Object[] {
+				System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
+		});
+		AlertStreamEvent e5 = createEvent(stream, policy, new Object[] {
+				System.currentTimeMillis(), "host1", "testPolicy-host1-01", "CLOSE", 0, 0
+		});
+		AlertStreamEvent e6 = createEvent(stream, policy, new Object[] {
+				System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+		});
+		AlertStreamEvent e7 = createEvent(stream, policy, new Object[] {
+				System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+		});
+		AlertStreamEvent e8 = createEvent(stream, policy, new Object[] {
+				System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+		});
+		
+		List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>();
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				List<AlertStreamEvent> result = deduplicator.dedup(e1);
+				if (result != null) allResults.addAll(result);
+				System.out.println("1 >>>> " + ToStringBuilder.reflectionToString(result));
+			}
+		}).start();
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				List<AlertStreamEvent> result = deduplicator.dedup(e2);
+				if (result != null) allResults.addAll(result);
+				System.out.println("2 >>>> " + ToStringBuilder.reflectionToString(result));
+			}
+		}).start();
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				List<AlertStreamEvent> result = deduplicator.dedup(e3);
+				if (result != null) allResults.addAll(result);
+				System.out.println("3 >>>> " + ToStringBuilder.reflectionToString(result));
+			}
+		}).start();
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				List<AlertStreamEvent> result = deduplicator.dedup(e4);
+				if (result != null) allResults.addAll(result);
+				System.out.println("4 >>>> " + ToStringBuilder.reflectionToString(result));
+			}
+		}).start();
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e) {}
+				
+				List<AlertStreamEvent> result = deduplicator.dedup(e5);
+				if (result != null) allResults.addAll(result);
+				System.out.println("5 >>>> " + ToStringBuilder.reflectionToString(result));
+			}
+		}).start();
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				List<AlertStreamEvent> result = deduplicator.dedup(e6);
+				if (result != null) allResults.addAll(result);
+				System.out.println("6 >>>> " + ToStringBuilder.reflectionToString(result));
+			}
+		}).start();
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				List<AlertStreamEvent> result = deduplicator.dedup(e7);
+				if (result != null) allResults.addAll(result);
+				System.out.println("7 >>>> " + ToStringBuilder.reflectionToString(result));
+			}
+		}).start();
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				List<AlertStreamEvent> result = deduplicator.dedup(e8);
+				if (result != null) allResults.addAll(result);
+				System.out.println("8 >>>> " + ToStringBuilder.reflectionToString(result));
+			}
+		}).start();
+		
+		Thread.sleep(2000);
+		
+		long maxCount = 0;
+		for (AlertStreamEvent event : allResults) {
+			Assert.assertNotNull(event.getData()[4]);
+			Assert.assertNotNull(event.getData()[5]);
+			
+			if (((Long) event.getData()[4]) > maxCount) {
+				maxCount = (Long) event.getData()[4];
+				System.out.println(String.format(">>>>>%s: %s", event, maxCount));
+			}
+		}
+		
+	}
+	
+	private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
+		AlertStreamEvent event = new AlertStreamEvent();
+		event.setPolicyId(policy.getName());
+		event.setSchema(stream);
+		event.setStreamId(stream.getStreamId());
+		event.setTimestamp(System.currentTimeMillis());
+		event.setCreatedTime(System.currentTimeMillis());
+		event.setData(data);
+		return event;
+	}
+	
+	private StreamDefinition createStream() {
+		StreamDefinition sd = new StreamDefinition();
+		StreamColumn tsColumn = new StreamColumn();
+		tsColumn.setName("timestamp");
+		tsColumn.setType(StreamColumn.Type.LONG);
+		
+		StreamColumn hostColumn = new StreamColumn();
+		hostColumn.setName("host");
+		hostColumn.setType(StreamColumn.Type.STRING);
+		
+		StreamColumn alertKeyColumn = new StreamColumn();
+		alertKeyColumn.setName("alertKey");
+		alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+		StreamColumn stateColumn = new StreamColumn();
+		stateColumn.setName("state");
+		stateColumn.setType(StreamColumn.Type.STRING);
+		
+		// dedupCount, dedupFirstOccurrence
+		
+		StreamColumn dedupCountColumn = new StreamColumn();
+		dedupCountColumn.setName("dedupCount");
+		dedupCountColumn.setType(StreamColumn.Type.LONG);
+		
+		StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+		dedupFirstOccurrenceColumn.setName("dedupFirstOccurrence");
+		dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+		
+		sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+		sd.setDataSource("testDatasource");
+		sd.setStreamId("testStream");
+		sd.setDescription("test stream");
+		return sd;
+	}
+	
+	private PolicyDefinition createPolicy(String streamName, String policyName) {
+		PolicyDefinition pd = new PolicyDefinition();
+		PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+		//expression, something like "PT5S,dynamic,1,host"
+		def.setValue("test");
+		def.setType("siddhi");
+		pd.setDefinition(def);
+		pd.setInputStreams(Arrays.asList("inputStream"));
+		pd.setOutputStreams(Arrays.asList("outputStream"));
+		pd.setName(policyName);
+		pd.setDescription(String.format("Test policy for stream %s", streamName));
+		
+		StreamPartition sp = new StreamPartition();
+		sp.setStreamId(streamName);
+		sp.setColumns(Arrays.asList("host"));
+		sp.setType(StreamPartition.Type.GROUPBY);
+		pd.addPartition(sp);
+		return pd;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
new file mode 100644
index 0000000..bdd5e0b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MongoDedupStoreTest extends MongoDependencyBaseTest {
+
+	@Test
+    public void testNormal() throws Exception {
+		Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = store.getEvents();
+		Assert.assertNotNull(events);
+		Assert.assertEquals(0, events.size());
+		
+		String streamId = "testStream"; 
+		String policyId = "testPolicy"; 
+		long timestamp = System.currentTimeMillis(); 
+		HashMap<String, String> customFieldValues = new HashMap<String, String>();
+		customFieldValues.put("alertKey", "test-alert-key");
+		EventUniq eventEniq = new EventUniq(streamId, policyId, timestamp, customFieldValues);
+		
+		ConcurrentLinkedDeque<DedupValue> dedupStateValues = new ConcurrentLinkedDeque<DedupValue>();
+		DedupValue one = new DedupValue();
+		one.setStateFieldValue("OPEN");
+		one.setCount(2);
+		one.setFirstOccurrence(System.currentTimeMillis());
+		dedupStateValues.add(one);
+		store.add(eventEniq, dedupStateValues);
+		
+		events = store.getEvents();
+		Assert.assertNotNull(events);
+		Assert.assertEquals(1, events.size());
+		
+		Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry = events.entrySet().iterator().next();
+		Assert.assertEquals(streamId, entry.getKey().streamId);
+		Assert.assertEquals(1, entry.getValue().size());
+		Assert.assertEquals(2, entry.getValue().getLast().getCount());
+	}
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
new file mode 100644
index 0000000..75de384
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.client.MongoDatabase;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public abstract class MongoDependencyBaseTest {
+
+	private static Logger LOG = LoggerFactory.getLogger(MongoDependencyBaseTest.class);
+	
+	private static SimpleEmbedMongo mongo;
+    @SuppressWarnings("unused")
+	private static MongoDatabase testDB;
+    private static Config config;
+
+    protected static MongoDedupEventsStore store;
+    
+    public static void before() {
+        try {
+            mongo = new SimpleEmbedMongo();
+            mongo.start();
+            testDB = mongo.getMongoClient().getDatabase("testDb");
+        } catch (Exception e) {
+            LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e);
+        }
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        before();
+
+        System.setProperty("config.resource", "/application-mongo-statestore.conf");
+        ConfigFactory.invalidateCaches();
+        config = ConfigFactory.load();
+        
+        store = new MongoDedupEventsStore(config);
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if (mongo != null) {
+            mongo.shutdown();
+        }
+    }
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
new file mode 100644
index 0000000..31f744e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.MongoClient;
+
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.runtime.Network;
+
+public class SimpleEmbedMongo {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SimpleEmbedMongo.class);
+    
+    private MongoClient client;
+    private MongodExecutable mongodExe;
+    private MongodProcess mongod;
+
+    public void start() throws Exception {
+        MongodStarter starter = MongodStarter.getDefaultInstance();
+        mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1)
+                .net(new Net(27017, Network.localhostIsIPv6())).build());
+        mongod = mongodExe.start();
+
+        client = new MongoClient("localhost");
+    }
+
+    public void shutdown() {
+
+        if (mongod != null) {
+            try {
+                mongod.stop();
+            }
+            catch (IllegalStateException e) {
+                // catch this exception for the unstable stopping mongodb
+                // reason: the exception is usually thrown out with below message format when stop() returns null value,
+                //         but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying
+                //         the process ultimately
+                if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) {
+                    // if matches, do nothing, just ignore the exception
+                } else {
+                    LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e)));
+                }
+            }
+            mongodExe.stop();
+        }
+    }
+
+    public MongoClient getMongoClient() {
+        return client;
+    }
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f41d427/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 61a0aba..6ebae63 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,12 +18,12 @@
 
 package org.apache.eagle.alert.engine.router;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
@@ -32,24 +32,46 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStore;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupValue;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
 import org.apache.eagle.alert.engine.runner.MapComparator;
 import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 /**
  * @Since 5/14/16.
  */
 public class TestAlertPublisherBolt {
+	
+	private DedupEventsStore store;
+	
+	@Before
+	public void setUp() {
+		store = Mockito.mock(DedupEventsStore.class);
+    	DedupEventsStoreFactory.customizeStore(store);
+	}
+	
+	@After
+	public void tearDown() {
+        Mockito.reset(store);
+	}
 
     @SuppressWarnings("rawtypes")
     @Ignore
@@ -165,13 +187,13 @@ public class TestAlertPublisherBolt {
         return l;
     }
 
-    private AlertStreamEvent createWithStreamDef(String hostname, String appName) {
+    private AlertStreamEvent createWithStreamDef(String hostname, String appName, String state) {
         AlertStreamEvent alert = new AlertStreamEvent();
         PolicyDefinition policy = new PolicyDefinition();
         policy.setName("perfmon_cpu_host_check");
         alert.setPolicyId(policy.getName());
         alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[] {appName, hostname});
+        alert.setData(new Object[] {appName, hostname, state});
         alert.setStreamId("testAlertStream");
         alert.setCreatedBy(this.toString());
 
@@ -184,38 +206,46 @@ public class TestAlertPublisherBolt {
         StreamColumn hostColumn = new StreamColumn();
         hostColumn.setName("hostname");
         hostColumn.setType(StreamColumn.Type.STRING);
+        
+        StreamColumn stateColumn = new StreamColumn();
+        stateColumn.setName("state");
+        stateColumn.setType(StreamColumn.Type.STRING);
 
-        sd.setColumns(Arrays.asList(appColumn, hostColumn));
+        sd.setColumns(Arrays.asList(appColumn, hostColumn, stateColumn));
 
         alert.setSchema(sd);
         return alert;
     }
 
-    @Test
+	@SuppressWarnings("unchecked")
+	@Test
     public void testCustomFieldDedupEvent() throws Exception {
         List<Publishment> pubs = loadEntities("/router/publishments.json", Publishment.class);
 
         AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
-        AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1");
-        AlertStreamEvent event2 = createWithStreamDef("host2", "testapp1");
-        AlertStreamEvent event3 = createWithStreamDef("host2", "testapp2");
+        AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN");
+        AlertStreamEvent event2 = createWithStreamDef("host2", "testapp1", "OPEN");
+        AlertStreamEvent event3 = createWithStreamDef("host2", "testapp2", "CLOSE");
 
         Assert.assertNotNull(plugin.dedup(event1));
         Assert.assertNull(plugin.dedup(event2));
         Assert.assertNotNull(plugin.dedup(event3));
-
+        
+        Mockito.verify(store).getEvents();
+        Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject());
     }
 
-    @Test
+	@Test
     public void testEmptyCustomFieldDedupEvent() throws Exception {
         List<Publishment> pubs = loadEntities("/router/publishments-empty-dedup-field.json", Publishment.class);
 
         AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
-        AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1");
-        AlertStreamEvent event2 = createWithStreamDef("host2", "testapp2");
+        AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN");
+        AlertStreamEvent event2 = createWithStreamDef("host2", "testapp2", "OPEN");
 
         Assert.assertNotNull(plugin.dedup(event1));
         Assert.assertNull(plugin.dedup(event2));
-
+        
+        Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject());
     }
 }