You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/09/28 15:40:36 UTC

incubator-eagle git commit: EAGLE-576: change dedup db store to per publishment rather than global

Repository: incubator-eagle
Updated Branches:
  refs/heads/master d61d34698 -> 89d8e3a09


EAGLE-576: change dedup db store to per publishment rather than global

Author: Li, Garrett
Reviewer: ralphsu

This closes #463


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/89d8e3a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/89d8e3a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/89d8e3a0

Branch: refs/heads/master
Commit: 89d8e3a0935ac9d6dd04d8b9d9ccf6e3410c732e
Parents: d61d346
Author: Xiancheng Li <xi...@ebay.com>
Authored: Wed Sep 28 18:31:55 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Wed Sep 28 08:38:32 2016 -0700

----------------------------------------------------------------------
 .../alert/engine/publisher/dedup/DedupCache.java   | 17 ++++++++++-------
 .../alert/engine/publisher/dedup/DedupEntity.java  | 15 +++++++++++++--
 .../publisher/dedup/DedupEventsStoreFactory.java   |  4 ++--
 .../publisher/dedup/MongoDedupEventsStore.java     | 16 +++++++++++-----
 .../engine/publisher/dedup/TransformerUtils.java   | 12 ++++++++++--
 .../publisher/impl/AbstractPublishPlugin.java      |  2 +-
 .../publisher/dedup/DedupCacheStoreTest.java       | 13 +++++++++++--
 .../engine/publisher/dedup/DedupCacheTest.java     |  2 +-
 .../dedup/DefaultDedupWithoutStateTest.java        |  2 +-
 .../publisher/dedup/DefaultDeduplicatorTest.java   |  2 +-
 .../publisher/dedup/MongoDependencyBaseTest.java   |  2 +-
 11 files changed, 62 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index 503a1ce..b15f93c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -52,8 +52,11 @@ public class DedupCache {
 
     private Config config;
 
-    public DedupCache(Config config) {
+    private String publishName;
+    
+    public DedupCache(Config config, String publishName) {
         this.config = config;
+        this.publishName = publishName;
         // only happens during startup, won't introduce perf issue here
         synchronized (caches) {
             if (caches.size() == 0) {
@@ -94,7 +97,7 @@ public class DedupCache {
             || System.currentTimeMillis() - lastUpdated > CACHE_MAX_EXPIRE_TIME_IN_DAYS * DateUtils.MILLIS_PER_DAY
             || events.size() <= 0) {
             lastUpdated = System.currentTimeMillis();
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
             events = accessor.getEvents();
         }
         return events;
@@ -108,7 +111,7 @@ public class DedupCache {
         if (this.contains(eventEniq)) {
             this.events.remove(eventEniq);
 
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
             accessor.remove(eventEniq);
         }
     }
@@ -177,7 +180,7 @@ public class DedupCache {
             LOG.info("Update dedup key {}, and value {}", eventEniq, dedupValue);
         }
         if (dedupValue != null) {
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
             accessor.add(eventEniq, events.get(eventEniq));
             LOG.info("Store dedup key {}, value {} to DB", eventEniq,
                 Joiner.on(",").join(events.get(eventEniq)));
@@ -193,11 +196,11 @@ public class DedupCache {
     }
 
     public void persistUpdatedEventUniq(EventUniq eventEniq) {
-        DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+        DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
         accessor.add(eventEniq, events.get(eventEniq));
         LOG.info("Store dedup key {}, value {} to DB", eventEniq,
             Joiner.on(",").join(events.get(eventEniq)));
-    }
+    } 
 
     private DedupValue updateCount(EventUniq eventEniq) {
         ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
@@ -212,7 +215,7 @@ public class DedupCache {
                 dedupValue.getStateFieldValue(), dedupValue.getCount());
             if (dedupValue.getCount() > 0 && dedupValue.getCount() % 100 == 0) {
                 LOG.info(updateMsg);
-                DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config);
+                DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
                 accessor.add(eventEniq, dedupValues);
             } else if (LOG.isDebugEnabled()) {
                 LOG.debug(updateMsg);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
index 1989c45..86bc9b3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
@@ -24,19 +24,30 @@ import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 
 public class DedupEntity {
 
+    private String publishName;
     private EventUniq eventEniq;
     private List<DedupValue> dedupValues = new ArrayList<DedupValue>();
 
-    public DedupEntity(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupValues) {
+    public DedupEntity(String publishName, EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupValues) {
+        this.publishName = publishName;
         this.eventEniq = eventEniq;
         this.dedupValues.addAll(dedupValues);
     }
 
-    public DedupEntity(EventUniq eventEniq, List<DedupValue> dedupValues) {
+    public DedupEntity(String publishName, EventUniq eventEniq, List<DedupValue> dedupValues) {
+        this.publishName = publishName;
         this.eventEniq = eventEniq;
         this.dedupValues = dedupValues;
     }
 
+    public String getPublishName() {
+        return publishName;
+    }
+
+    public void setPublishName(String publishName) {
+        this.publishName = publishName;
+    }
+
     public EventUniq getEventEniq() {
         return eventEniq;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
index 0a3b206..75d8e53 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
@@ -34,14 +34,14 @@ public class DedupEventsStoreFactory {
         customizedStore = store;
     }
 
-    public static DedupEventsStore getStore(DedupEventsStoreType type, Config config) {
+    public static DedupEventsStore getStore(DedupEventsStoreType type, Config config, String publishName) {
         if (customizedStore != null) {
             return customizedStore;
         }
         switch (type) {
             case Mongo:
                 if (accessor == null) {
-                    accessor = new MongoDedupEventsStore(config);
+                    accessor = new MongoDedupEventsStore(config, publishName);
                 }
                 break;
             case ElasticSearch:

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
index 46a01b0..4140793 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
@@ -25,6 +25,7 @@ import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.bson.BsonDocument;
 import org.bson.BsonInt32;
 import org.bson.BsonInt64;
+import org.bson.BsonString;
 import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public class MongoDedupEventsStore implements DedupEventsStore {
     public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue";
     public static final String DEDUP_COUNT = "count";
     public static final String DEDUP_FIRST_OCCURRENCE = "firstOccurrence";
+    public static final String DEDUP_PUBLISH_ID = "publishId";
 
     private static final ObjectMapper mapper = new ObjectMapper();
 
@@ -67,12 +69,14 @@ public class MongoDedupEventsStore implements DedupEventsStore {
     private MongoClient client;
     private MongoDatabase db;
     private MongoCollection<Document> stateCollection;
+    private String publishName;
 
     private static final String DB_NAME = "ump_alert_dedup";
     private static final String ALERT_STATE_COLLECTION = "alert_dedup";
 
-    public MongoDedupEventsStore(Config config) {
+    public MongoDedupEventsStore(Config config, String publishName) {
         this.config = config;
+        this.publishName = publishName;
         this.connection = this.config.getString("connection");
         try {
             this.client = new MongoClient(new MongoClientURI(this.connection));
@@ -96,7 +100,9 @@ public class MongoDedupEventsStore implements DedupEventsStore {
     public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
         try {
             Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> result = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
-            stateCollection.find().forEach(new Block<Document>() {
+            BsonDocument filter = new BsonDocument();
+            filter.append(DEDUP_PUBLISH_ID, new BsonString(this.publishName));
+            stateCollection.find(filter).forEach(new Block<Document>() {
                 @Override
                 public void apply(final Document doc) {
                     DedupEntity entity = TransformerUtils.transform(DedupEntity.class, BsonDocument.parse(doc.toJson()));
@@ -116,9 +122,9 @@ public class MongoDedupEventsStore implements DedupEventsStore {
     @Override
     public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues) {
         try {
-            BsonDocument doc = TransformerUtils.transform(new DedupEntity(eventEniq, dedupStateValues));
+            BsonDocument doc = TransformerUtils.transform(new DedupEntity(this.publishName, eventEniq, dedupStateValues));
             BsonDocument filter = new BsonDocument();
-            filter.append(DEDUP_ID, new BsonInt64(eventEniq.hashCode()));
+            filter.append(DEDUP_ID, new BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq)));
             Document returnedDoc = stateCollection.findOneAndReplace(filter, Document.parse(doc.toJson()));
             if (returnedDoc == null) {
                 InsertOneOptions option = new InsertOneOptions();
@@ -133,7 +139,7 @@ public class MongoDedupEventsStore implements DedupEventsStore {
     public void remove(EventUniq eventEniq) {
         try {
             BsonDocument filter = new BsonDocument();
-            filter.append(DEDUP_ID, new BsonInt64(eventEniq.hashCode()));
+            filter.append(DEDUP_ID, new BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq)));
             stateCollection.deleteOne(filter);
         } catch (Exception e) {
             LOG.error("delete dedup state failed, but the state in memory is good, could be ingored.", e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
index e339240..5c18867 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.bson.BsonArray;
 import org.bson.BsonBoolean;
@@ -65,7 +66,8 @@ public class TransformerUtils {
                     MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE).getValue());
                 dedupValues.add(dedupValue);
             }
-            return (T) new DedupEntity(eventUniq, dedupValues);
+            String publishId = doc.getString(MongoDedupEventsStore.DEDUP_PUBLISH_ID).getValue();
+            return (T) new DedupEntity(publishId, eventUniq, dedupValues);
         }
         throw new RuntimeException(String.format("Unknow object type %s, cannot transform", klass.getName()));
     }
@@ -74,8 +76,9 @@ public class TransformerUtils {
         if (obj instanceof DedupEntity) {
             BsonDocument doc = new BsonDocument();
             DedupEntity entity = (DedupEntity) obj;
-            doc.put(MongoDedupEventsStore.DEDUP_ID, new BsonInt64(entity.getEventEniq().hashCode()));
+            doc.put(MongoDedupEventsStore.DEDUP_ID, new BsonInt64(getUniqueId(entity.getPublishName(), entity.getEventEniq())));
             doc.put(MongoDedupEventsStore.DEDUP_STREAM_ID, new BsonString(entity.getEventEniq().streamId));
+            doc.put(MongoDedupEventsStore.DEDUP_PUBLISH_ID, new BsonString(entity.getPublishName()));
             doc.put(MongoDedupEventsStore.DEDUP_POLICY_ID, new BsonString(entity.getEventEniq().policyId));
             doc.put(MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(entity.getEventEniq().createdTime));
             doc.put(MongoDedupEventsStore.DEDUP_TIMESTAMP, new BsonInt64(entity.getEventEniq().timestamp));
@@ -107,4 +110,9 @@ public class TransformerUtils {
         throw new RuntimeException(String.format("Unknow object type %s, cannot transform", obj.getClass().getName()));
     }
 
+    public static int getUniqueId(String publishName, EventUniq eventEniq) {
+        HashCodeBuilder builder = new HashCodeBuilder().append(eventEniq).append(publishName);
+        return builder.build();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 3e293cd..87aac29 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -47,7 +47,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
     @SuppressWarnings("rawtypes")
     @Override
     public void init(Config config, Publishment publishment, Map conf) throws Exception {
-        DedupCache dedupCache = new DedupCache(config);
+        DedupCache dedupCache = new DedupCache(config, publishment.getName());
         OverrideDeduplicatorSpec spec = publishment.getOverrideDeduplicator();
         if (spec != null && StringUtils.isNotBlank(spec.getClassName())) {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
index 5e56bb7..54aedb8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
@@ -25,12 +25,14 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
 import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.apache.storm.guava.base.Joiner;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
 public class DedupCacheStoreTest extends MongoDependencyBaseTest {
@@ -50,16 +52,19 @@ public class DedupCacheStoreTest extends MongoDependencyBaseTest {
 		
 		System.setProperty("config.resource", "/application-mongo-statestore.conf");
 		Config config = ConfigFactory.load();
-		DedupCache cache = new DedupCache(config);
+		DedupCache cache = new DedupCache(config, "testPublishment");
 		cache.addOrUpdate(eventUniq, (String) event.getData()[event.getSchema().getColumnIndex("state")]);
 		
-		DedupEventsStore accessor = DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config);
+		DedupEventsStore accessor = DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config, "testPublishment");
 		Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = accessor.getEvents();
 		for (EventUniq one : events.keySet()) {
 			if (one.equals(eventUniq)) {
 				Assert.assertEquals(false, one.removable);
 			}
 		}
+		for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry : events.entrySet()) {
+			System.out.println(entry.getKey() + " >>> " + Joiner.on("\n\t").join(entry.getValue()));
+		}
 		
 		eventUniq.removable = true;
 		cache.persistUpdatedEventUniq(eventUniq);
@@ -70,6 +75,10 @@ public class DedupCacheStoreTest extends MongoDependencyBaseTest {
 				Assert.assertEquals(true, one.removable);
 			}
 		}
+		
+		for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry : events.entrySet()) {
+			System.out.println(entry.getKey() + " >>> " + Joiner.on("\n\t").join(entry.getValue()));
+		}
 	}
 	
 	private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index e996376..d3dc717 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -55,7 +55,7 @@ public class DedupCacheTest {
 	@Test
 	public void testNormal() throws Exception {
 		Config config = ConfigFactory.load();
-		DedupCache dedupCache = new DedupCache(config);
+		DedupCache dedupCache = new DedupCache(config, "testPublishment");
 		
 		StreamDefinition stream = createStream();
 		PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
index 57c113b..456b1ea 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -42,7 +42,7 @@ public class DefaultDedupWithoutStateTest {
 		// assume state: OPEN, WARN, CLOSE
 		System.setProperty("config.resource", "/application-mongo-statestore.conf");
 		Config config = ConfigFactory.load();
-		DedupCache dedupCache = new DedupCache(config);
+		DedupCache dedupCache = new DedupCache(config, "testPublishment");
 		DefaultDeduplicator deduplicator = new DefaultDeduplicator(
 				"PT10S", Arrays.asList(new String[] { "alertKey" }), null, dedupCache);
 		

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 3a17e53..0556e3d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -40,7 +40,7 @@ public class DefaultDeduplicatorTest extends MongoDependencyBaseTest {
 		// assume state: OPEN, WARN, CLOSE
 		System.setProperty("config.resource", "/application-mongo-statestore.conf");
 		Config config = ConfigFactory.load();
-		DedupCache dedupCache = new DedupCache(config);
+		DedupCache dedupCache = new DedupCache(config, "testPublishment");
 		DefaultDeduplicator deduplicator = new DefaultDeduplicator(
 				"PT1M", Arrays.asList(new String[] { "alertKey" }), "state", dedupCache);
 		

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
index 75de384..b7d7613 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
@@ -54,7 +54,7 @@ public abstract class MongoDependencyBaseTest {
         ConfigFactory.invalidateCaches();
         config = ConfigFactory.load();
         
-        store = new MongoDedupEventsStore(config);
+        store = new MongoDedupEventsStore(config, "testPublishment");
     }
 
     @AfterClass