You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/10/24 06:18:31 UTC
incubator-eagle git commit: EAGLE-672: remove mongo state store for
dedup
Repository: incubator-eagle
Updated Branches:
refs/heads/master e520e4011 -> c15e7f814
EAGLE-672: remove mongo state store for dedup
Author: Li, Garrett
Reviewer: ralphsu
This closes #553
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c15e7f81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c15e7f81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c15e7f81
Branch: refs/heads/master
Commit: c15e7f81497f6873dee25421c69f4c1351fdcb9c
Parents: e520e40
Author: Xiancheng Li <xi...@ebay.com>
Authored: Mon Oct 24 09:32:21 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Mon Oct 24 14:18:13 2016 +0800
----------------------------------------------------------------------
.../engine/publisher/dedup/DedupCache.java | 93 +----
.../publisher/dedup/DedupEventsStore.java | 32 --
.../dedup/DedupEventsStoreFactory.java | 57 ---
.../publisher/dedup/MongoDedupEventsStore.java | 146 -------
.../publisher/dedup/TransformerUtils.java | 117 ------
.../publisher/dedup/DedupCacheStoreTest.java | 150 -------
.../engine/publisher/dedup/DedupCacheTest.java | 202 +++++-----
.../dedup/DefaultDeduplicatorTest.java | 399 ++++++++++---------
.../dedup/ExtendedDeduplicatorTest.java | 35 +-
.../publisher/dedup/MongoDedupStoreTest.java | 68 ----
.../dedup/MongoDependencyBaseTest.java | 67 ----
.../engine/router/TestAlertPublisherBolt.java | 54 +--
12 files changed, 344 insertions(+), 1076 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index 2080441..abb83d6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -16,42 +16,37 @@
*/
package org.apache.eagle.alert.engine.publisher.dedup;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.typesafe.config.Config;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
+import com.google.common.base.Objects;
+import com.typesafe.config.Config;
public class DedupCache {
private static final Logger LOG = LoggerFactory.getLogger(DedupCache.class);
private static final long CACHE_MAX_EXPIRE_TIME_IN_DAYS = 30;
- private static final long CACHE_MAX_EVENT_QUEUE_SIZE = 10;
public static final String DEDUP_COUNT = "dedupCount";
public static final String DOC_ID = "docId";
public static final String DEDUP_FIRST_OCCURRENCE = "dedupFirstOccurrenceTime";
- private static final DedupEventsStoreType type = DedupEventsStoreType.Mongo;
-
private long lastUpdated = -1;
private Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
- private static final ConcurrentLinkedDeque<DedupCache> caches = new ConcurrentLinkedDeque<DedupCache>();
-
+ @SuppressWarnings("unused")
private Config config;
private String publishName;
@@ -59,39 +54,6 @@ public class DedupCache {
public DedupCache(Config config, String publishName) {
this.config = config;
this.publishName = publishName;
- // only happens during startup, won't introduce perf issue here
- synchronized (caches) {
- if (caches.size() == 0) {
- // create daemon to clean up old removable events periodically
- ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setDaemon(true);
- return t;
- }
- });
- scheduleSrv.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- for (DedupCache cache : caches) {
- if (cache == null || cache.getEvents() == null) {
- continue;
- }
- HashSet<EventUniq> eventUniqs = new HashSet<EventUniq>(cache.getEvents().keySet());
- for (EventUniq one : eventUniqs) {
- if (one.removable && one.createdTime < System.currentTimeMillis() - 3600000 * 24) {
- cache.removeEvent(one);
- LOG.info("Remove dedup key {} from cache & db", one);
- }
- }
- }
- }
- }, 5, 60, TimeUnit.MINUTES);
- LOG.info("Create daemon to clean up old removable events periodically");
- }
- caches.add(this);
- }
}
public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
@@ -99,8 +61,6 @@ public class DedupCache {
|| System.currentTimeMillis() - lastUpdated > CACHE_MAX_EXPIRE_TIME_IN_DAYS * DateUtils.MILLIS_PER_DAY
|| events.size() <= 0) {
lastUpdated = System.currentTimeMillis();
- DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
- events = accessor.getEvents();
}
return events;
}
@@ -112,9 +72,6 @@ public class DedupCache {
public void removeEvent(EventUniq eventEniq) {
if (this.contains(eventEniq)) {
this.events.remove(eventEniq);
-
- DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
- accessor.remove(eventEniq);
}
}
@@ -142,9 +99,9 @@ public class DedupCache {
Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = this.getEvents();
if (!events.containsKey(eventEniq)
|| (events.containsKey(eventEniq)
- && events.get(eventEniq).size() > 0
- && !StringUtils.equalsIgnoreCase(stateFieldValue,
- events.get(eventEniq).getLast().getStateFieldValue()))) {
+ && events.get(eventEniq).size() > 0
+ && !StringUtils.equalsIgnoreCase(stateFieldValue,
+ events.get(eventEniq).getLast().getStateFieldValue()))) {
DedupValue[] dedupValues = this.add(eventEniq, event, stateFieldValue, stateCloseValue);
return dedupValues;
} else {
@@ -164,19 +121,12 @@ public class DedupCache {
events.put(eventEniq, dedupValues);
LOG.info("{} Add new dedup key {}, and value {}", this.publishName, eventEniq, dedupValues);
} else if (!StringUtils.equalsIgnoreCase(stateFieldValue,
- events.get(eventEniq).getLast().getStateFieldValue())) {
+ events.get(eventEniq).getLast().getStateFieldValue())) {
// existing a de-dup value, try update or reset
DedupValue lastDedupValue = events.get(eventEniq).getLast();
dedupValue = updateDedupValue(lastDedupValue, eventEniq, event, stateFieldValue, stateCloseValue);
LOG.info("{} Update dedup key {}, and value {}", this.publishName, eventEniq, dedupValue);
}
-
- if (dedupValue != null) {
- DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
- accessor.add(eventEniq, events.get(eventEniq));
- LOG.info("{} Store dedup key {}, value {} to DB", this.publishName, eventEniq,
- Joiner.on(",").join(events.get(eventEniq)));
- }
if (dedupValue == null) {
return null;
}
@@ -190,7 +140,7 @@ public class DedupCache {
}
if (lastDedupValue.getStateFieldValue().equals(stateCloseValue)
- && eventEniq.timestamp < lastDedupValue.getCloseTime()) {
+ && eventEniq.timestamp < lastDedupValue.getCloseTime()) {
DedupValue dv = createDedupValue(eventEniq, event, stateFieldValue);
lastDedupValue.resetTo(dv);
} else {
@@ -208,7 +158,7 @@ public class DedupCache {
dedupValue = new DedupValue();
dedupValue.setFirstOccurrence(eventEniq.timestamp);
int idx = event.getSchema().getColumnIndex(DOC_ID);
- if (idx >= 0 ) {
+ if (idx >= 0) {
dedupValue.setDocId(event.getData()[idx].toString());
} else {
dedupValue.setDocId("");
@@ -219,13 +169,6 @@ public class DedupCache {
return dedupValue;
}
- public void persistUpdatedEventUniq(EventUniq eventEniq) {
- DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
- accessor.add(eventEniq, events.get(eventEniq));
- LOG.info("{} Store dedup key {}, value {} to DB", this.publishName, eventEniq,
- Joiner.on(",").join(events.get(eventEniq)));
- }
-
private DedupValue updateCount(EventUniq eventEniq) {
ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
if (dedupValues == null || dedupValues.size() <= 0) {
@@ -237,11 +180,7 @@ public class DedupCache {
String updateMsg = String.format(
"%s Update count for dedup key %s, value %s and count %s", this.publishName, eventEniq,
dedupValue.getStateFieldValue(), dedupValue.getCount());
- if (dedupValue.getCount() > 0 && dedupValue.getCount() % 100 == 0) {
- LOG.info(updateMsg);
- DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
- accessor.add(eventEniq, dedupValues);
- } else if (LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug(updateMsg);
}
return dedupValue;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
deleted file mode 100644
index 5918afe..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-
-public interface DedupEventsStore {
-
- public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents();
-
- public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues);
-
- public void remove(EventUniq eventEniq);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
deleted file mode 100644
index 75d8e53..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import com.typesafe.config.Config;
-
-public class DedupEventsStoreFactory {
-
- public enum DedupEventsStoreType {
- Mongo, ElasticSearch
- }
-
- ;
-
- private static DedupEventsStore customizedStore;
-
- private static MongoDedupEventsStore accessor;
-
- public static void customizeStore(DedupEventsStore store) {
- customizedStore = store;
- }
-
- public static DedupEventsStore getStore(DedupEventsStoreType type, Config config, String publishName) {
- if (customizedStore != null) {
- return customizedStore;
- }
- switch (type) {
- case Mongo:
- if (accessor == null) {
- accessor = new MongoDedupEventsStore(config, publishName);
- }
- break;
- case ElasticSearch:
- default:
- break;
- }
- if (accessor == null) {
- throw new RuntimeException(String.format("Dedup events store type %s is NOT supportted", type));
- }
- return accessor;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
deleted file mode 100644
index 35281bf..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.mongodb.Block;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.IndexOptions;
-import com.mongodb.client.model.InsertOneOptions;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.bson.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-public class MongoDedupEventsStore implements DedupEventsStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(MongoDedupEventsStore.class);
-
- public static final String DEDUP_ID = "dedupId";
- public static final String DEDUP_STREAM_ID = "streamId";
- public static final String DOC_ID = "docId";
- public static final String DEDUP_POLICY_ID = "policyId";
- public static final String DEDUP_CREATE_TIME = "createdTime";
- public static final String DEDUP_TIMESTAMP = "timestamp";
- public static final String DEDUP_REMOVABLE = "removable";
- public static final String DEDUP_CUSTOM_FIELDS_VALUES = "customFieldValues";
- public static final String DEDUP_VALUES = "dedupValues";
- public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue";
- public static final String DEDUP_COUNT = "count";
- public static final String DEDUP_FIRST_OCCURRENCE = "firstOccurrence";
- public static final String DEDUP_CLOSE_TIME = "closeTime";
- public static final String DEDUP_PUBLISH_ID = "publishId";
-
- private static final ObjectMapper mapper = new ObjectMapper();
-
- static {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- private Config config;
- private String connection;
- private MongoClient client;
- private MongoDatabase db;
- private MongoCollection<Document> stateCollection;
- private String publishName;
-
- private static final String DB_NAME = "ump_alert_dedup";
- private static final String ALERT_STATE_COLLECTION = "alert_dedup";
-
- public MongoDedupEventsStore(Config config, String publishName) {
- this.config = config;
- this.publishName = publishName;
- this.connection = this.config.getString("connection");
- try {
- this.client = new MongoClient(new MongoClientURI(this.connection));
- init();
- } catch (Throwable t) {
- LOG.error(String.format("initialize mongodb %s client failed", this.connection), t);
- }
- }
-
- private void init() {
- db = client.getDatabase(DB_NAME);
- stateCollection = db.getCollection(ALERT_STATE_COLLECTION);
- // dedup id index
- IndexOptions io = new IndexOptions().background(true).unique(true).name(DEDUP_ID + "_index");
- BsonDocument doc = new BsonDocument();
- doc.append(DEDUP_ID, new BsonInt32(1));
- stateCollection.createIndex(doc, io);
- }
-
- @Override
- public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
- try {
- Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> result = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
- BsonDocument filter = new BsonDocument();
- filter.append(DEDUP_PUBLISH_ID, new BsonString(this.publishName));
- stateCollection.find(filter).forEach(new Block<Document>() {
- @Override
- public void apply(final Document doc) {
- DedupEntity entity = TransformerUtils.transform(DedupEntity.class, BsonDocument.parse(doc.toJson()));
- result.put(entity.getEventEniq(), entity.getDedupValuesInConcurrentLinkedDeque());
- }
- });
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found {} dedup events from mongoDB", result.size());
- }
- return result;
- } catch (Exception e) {
- LOG.error("find dedup state failed, but the state in memory is good, could be ingored.", e);
- }
- return new HashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
- }
-
- @Override
- public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues) {
- try {
- BsonDocument doc = TransformerUtils.transform(new DedupEntity(this.publishName, eventEniq, dedupStateValues));
- BsonDocument filter = new BsonDocument();
- filter.append(DEDUP_ID, new BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq)));
- Document returnedDoc = stateCollection.findOneAndReplace(filter, Document.parse(doc.toJson()));
- if (returnedDoc == null) {
- InsertOneOptions option = new InsertOneOptions();
- stateCollection.insertOne(Document.parse(doc.toJson()), option);
- }
- } catch (Exception e) {
- LOG.error("insert dedup state failed, but the state is still in memory, could be ingored.", e);
- }
- }
-
- @Override
- public void remove(EventUniq eventEniq) {
- try {
- BsonDocument filter = new BsonDocument();
- filter.append(DEDUP_ID, new BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq)));
- stateCollection.deleteOne(filter);
- } catch (Exception e) {
- LOG.error("delete dedup state failed, but the state in memory is good, could be ingored.", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
deleted file mode 100644
index 1ce6898..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.bson.*;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
-public class TransformerUtils {
-
- public static final String MAP_KEY = "key";
- public static final String MAP_VALUE = "value";
-
- @SuppressWarnings("unchecked")
- public static <T> T transform(Class<T> klass, BsonDocument doc) {
- if (klass.equals(DedupEntity.class)) {
- String streamId = doc.getString(MongoDedupEventsStore.DEDUP_STREAM_ID).getValue();
- String policyId = doc.getString(MongoDedupEventsStore.DEDUP_POLICY_ID).getValue();
- long timestamp = doc.getInt64(MongoDedupEventsStore.DEDUP_TIMESTAMP).getValue();
- HashMap<String, String> customFieldValues = new HashMap<String, String>();
- BsonArray customFieldsValuesArray = doc.getArray(
- MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES);
- for (int i = 0; i < customFieldsValuesArray.size(); i++) {
- BsonDocument dedupCustomFieldValuesDoc = customFieldsValuesArray.get(i).asDocument();
- customFieldValues.put(
- dedupCustomFieldValuesDoc.getString(MAP_KEY).getValue(),
- dedupCustomFieldValuesDoc.getString(MAP_VALUE).getValue());
- }
- EventUniq eventUniq = new EventUniq(streamId, policyId, timestamp, customFieldValues);
- eventUniq.removable = doc.getBoolean(MongoDedupEventsStore.DEDUP_REMOVABLE).getValue();
- eventUniq.createdTime = doc.getInt64(
- MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(0)).getValue();
- List<DedupValue> dedupValues = new ArrayList<DedupValue>();
- BsonArray dedupValuesArray = doc.getArray(MongoDedupEventsStore.DEDUP_VALUES);
- for (int i = 0; i < dedupValuesArray.size(); i++) {
- BsonDocument dedupValuesDoc = dedupValuesArray.get(i).asDocument();
- DedupValue dedupValue = new DedupValue();
- dedupValue.setStateFieldValue(dedupValuesDoc.getString(
- MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE).getValue());
- dedupValue.setCount(dedupValuesDoc.getInt64(
- MongoDedupEventsStore.DEDUP_COUNT).getValue());
- dedupValue.setFirstOccurrence(dedupValuesDoc.getInt64(
- MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE).getValue());
- dedupValue.setCloseTime(dedupValuesDoc.getInt64(
- MongoDedupEventsStore.DEDUP_CLOSE_TIME).getValue());
- dedupValue.setDocId(dedupValuesDoc.getString(
- MongoDedupEventsStore.DOC_ID).getValue());
- dedupValues.add(dedupValue);
- }
- String publishId = doc.getString(MongoDedupEventsStore.DEDUP_PUBLISH_ID).getValue();
- return (T) new DedupEntity(publishId, eventUniq, dedupValues);
- }
- throw new RuntimeException(String.format("Unknow object type %s, cannot transform", klass.getName()));
- }
-
- public static BsonDocument transform(Object obj) {
- if (obj instanceof DedupEntity) {
- BsonDocument doc = new BsonDocument();
- DedupEntity entity = (DedupEntity) obj;
- doc.put(MongoDedupEventsStore.DEDUP_ID, new BsonInt64(getUniqueId(entity.getPublishName(), entity.getEventEniq())));
- doc.put(MongoDedupEventsStore.DEDUP_STREAM_ID, new BsonString(entity.getEventEniq().streamId));
- doc.put(MongoDedupEventsStore.DEDUP_PUBLISH_ID, new BsonString(entity.getPublishName()));
- doc.put(MongoDedupEventsStore.DEDUP_POLICY_ID, new BsonString(entity.getEventEniq().policyId));
- doc.put(MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(entity.getEventEniq().createdTime));
- doc.put(MongoDedupEventsStore.DEDUP_TIMESTAMP, new BsonInt64(entity.getEventEniq().timestamp));
- doc.put(MongoDedupEventsStore.DEDUP_REMOVABLE, new BsonBoolean(entity.getEventEniq().removable));
-
- List<BsonDocument> dedupCustomFieldValues = new ArrayList<BsonDocument>();
- for (Entry<String, String> entry : entity.getEventEniq().customFieldValues.entrySet()) {
- BsonDocument dedupCustomFieldValuesDoc = new BsonDocument();
- dedupCustomFieldValuesDoc.put(MAP_KEY, new BsonString(entry.getKey()));
- dedupCustomFieldValuesDoc.put(MAP_VALUE, new BsonString(entry.getValue()));
- dedupCustomFieldValues.add(dedupCustomFieldValuesDoc);
- }
- doc.put(MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES, new BsonArray(dedupCustomFieldValues));
-
- List<BsonDocument> dedupValuesDocs = new ArrayList<BsonDocument>();
- for (DedupValue dedupValue : entity.getDedupValues()) {
- BsonDocument dedupValuesDoc = new BsonDocument();
- dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE, new BsonString(dedupValue.getStateFieldValue()));
- dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_COUNT, new BsonInt64(dedupValue.getCount()));
- dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE,new BsonInt64(dedupValue.getFirstOccurrence()));
- dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_CLOSE_TIME, new BsonInt64(dedupValue.getCloseTime()));
- dedupValuesDoc.put(MongoDedupEventsStore.DOC_ID, new BsonString(dedupValue.getDocId()));
- dedupValuesDocs.add(dedupValuesDoc);
- }
- doc.put(MongoDedupEventsStore.DEDUP_VALUES, new BsonArray(dedupValuesDocs));
- return doc;
- }
- throw new RuntimeException(String.format("Unknow object type %s, cannot transform", obj.getClass().getName()));
- }
-
- public static int getUniqueId(String publishName, EventUniq eventEniq) {
- HashCodeBuilder builder = new HashCodeBuilder().append(eventEniq).append(publishName);
- return builder.build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
deleted file mode 100644
index 25518de..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.apache.storm.guava.base.Joiner;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-public class DedupCacheStoreTest extends MongoDependencyBaseTest {
-
- @Test
- public void testPersistUpdatedEventUniq() throws Exception {
- StreamDefinition stream = createStream();
- PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
-
- AlertStreamEvent event = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
- });
-
- HashMap<String, String> dedupFieldValues = new HashMap<String, String>();
- dedupFieldValues.put("alertKey", (String) event.getData()[event.getSchema().getColumnIndex("alertKey")]);
- EventUniq eventUniq = new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues);
-
- System.setProperty("config.resource", "/application-mongo-statestore.conf");
- Config config = ConfigFactory.load();
- DedupCache cache = new DedupCache(config, "testPublishment");
- cache.addOrUpdate(eventUniq, event, (String) event.getData()[event.getSchema().getColumnIndex("state")], "closed");
-
- DedupEventsStore accessor = DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config, "testPublishment");
- Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = accessor.getEvents();
- for (EventUniq one : events.keySet()) {
- if (one.equals(eventUniq)) {
- Assert.assertEquals(false, one.removable);
- }
- }
- for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry : events.entrySet()) {
- System.out.println(entry.getKey() + " >>> " + Joiner.on("\n\t").join(entry.getValue()));
- }
-
- eventUniq.removable = true;
- cache.persistUpdatedEventUniq(eventUniq);
-
- events = accessor.getEvents();
- for (EventUniq one : events.keySet()) {
- if (one.equals(eventUniq)) {
- Assert.assertEquals(true, one.removable);
- }
- }
-
- for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry : events.entrySet()) {
- System.out.println(entry.getKey() + " >>> " + Joiner.on("\n\t").join(entry.getValue()));
- }
- }
-
- private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
- AlertStreamEvent event = new AlertStreamEvent();
- event.setPolicyId(policy.getName());
- event.setSchema(stream);
- event.setStreamId(stream.getStreamId());
- event.setTimestamp(System.currentTimeMillis());
- event.setCreatedTime(System.currentTimeMillis());
- event.setData(data);
- return event;
- }
-
- private StreamDefinition createStream() {
- StreamDefinition sd = new StreamDefinition();
- StreamColumn tsColumn = new StreamColumn();
- tsColumn.setName("timestamp");
- tsColumn.setType(StreamColumn.Type.LONG);
-
- StreamColumn hostColumn = new StreamColumn();
- hostColumn.setName("host");
- hostColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn alertKeyColumn = new StreamColumn();
- alertKeyColumn.setName("alertKey");
- alertKeyColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn stateColumn = new StreamColumn();
- stateColumn.setName("state");
- stateColumn.setType(StreamColumn.Type.STRING);
-
- // dedupCount, dedupFirstOccurrence
-
- StreamColumn dedupCountColumn = new StreamColumn();
- dedupCountColumn.setName("dedupCount");
- dedupCountColumn.setType(StreamColumn.Type.LONG);
-
- StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
- dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
- dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-
- sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
- sd.setDataSource("testDatasource");
- sd.setStreamId("testStream");
- sd.setDescription("test stream");
- return sd;
- }
-
- private PolicyDefinition createPolicy(String streamName, String policyName) {
- PolicyDefinition pd = new PolicyDefinition();
- PolicyDefinition.Definition def = new PolicyDefinition.Definition();
- //expression, something like "PT5S,dynamic,1,host"
- def.setValue("test");
- def.setType("siddhi");
- pd.setDefinition(def);
- pd.setInputStreams(Arrays.asList("inputStream"));
- pd.setOutputStreams(Arrays.asList("outputStream"));
- pd.setName(policyName);
- pd.setDescription(String.format("Test policy for stream %s", streamName));
-
- StreamPartition sp = new StreamPartition();
- sp.setStreamId(streamName);
- sp.setColumns(Arrays.asList("host"));
- sp.setType(StreamPartition.Type.GROUPBY);
- pd.addPartition(sp);
- return pd;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index d1c8457..5bf0410 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -28,120 +28,104 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class DedupCacheTest {
-
- private DedupEventsStore store;
-
- @Before
- public void setUp() {
- store = Mockito.mock(DedupEventsStore.class);
- DedupEventsStoreFactory.customizeStore(store);
- }
-
- @After
- public void tearDown() {
- Mockito.reset(store);
- }
-
- @Test
- public void testNormal() throws Exception {
- Config config = ConfigFactory.load();
- DedupCache dedupCache = new DedupCache(config, "testPublishment");
-
- StreamDefinition stream = createStream();
- PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
-
- String[] states = new String[] { "OPEN", "WARN", "CLOSE" };
- Random random = new Random();
- for (int i = 0; i < 20; i ++) {
- AlertStreamEvent event = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", states[random.nextInt(3)], 0, 0
- });
- HashMap<String, String> dedupFieldValues = new HashMap<String, String>();
- dedupFieldValues.put("alertKey", (String) event.getData()[event.getSchema().getColumnIndex("alertKey")]);
- List<AlertStreamEvent> result = dedupCache.dedup(event,
- new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues),
- "state",
- (String) event.getData()[event.getSchema().getColumnIndex("state")], "closed");
- System.out.println((i + 1) + " >>>> " + ToStringBuilder.reflectionToString(result));
- }
-
- Assert.assertTrue(true);
- }
-
- private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
- AlertStreamEvent event = new AlertStreamEvent();
- event.setPolicyId(policy.getName());
- event.setSchema(stream);
- event.setStreamId(stream.getStreamId());
- event.setTimestamp(System.currentTimeMillis());
- event.setCreatedTime(System.currentTimeMillis());
- event.setData(data);
- return event;
- }
-
- private StreamDefinition createStream() {
- StreamDefinition sd = new StreamDefinition();
- StreamColumn tsColumn = new StreamColumn();
- tsColumn.setName("timestamp");
- tsColumn.setType(StreamColumn.Type.LONG);
-
- StreamColumn hostColumn = new StreamColumn();
- hostColumn.setName("host");
- hostColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn alertKeyColumn = new StreamColumn();
- alertKeyColumn.setName("alertKey");
- alertKeyColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn stateColumn = new StreamColumn();
- stateColumn.setName("state");
- stateColumn.setType(StreamColumn.Type.STRING);
-
- // dedupCount, dedupFirstOccurrence
-
- StreamColumn dedupCountColumn = new StreamColumn();
- dedupCountColumn.setName("dedupCount");
- dedupCountColumn.setType(StreamColumn.Type.LONG);
-
- StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
- dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
- dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-
- sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
- sd.setDataSource("testDatasource");
- sd.setStreamId("testStream");
- sd.setDescription("test stream");
- return sd;
- }
-
- private PolicyDefinition createPolicy(String streamName, String policyName) {
- PolicyDefinition pd = new PolicyDefinition();
- PolicyDefinition.Definition def = new PolicyDefinition.Definition();
- //expression, something like "PT5S,dynamic,1,host"
- def.setValue("test");
- def.setType("siddhi");
- pd.setDefinition(def);
- pd.setInputStreams(Arrays.asList("inputStream"));
- pd.setOutputStreams(Arrays.asList("outputStream"));
- pd.setName(policyName);
- pd.setDescription(String.format("Test policy for stream %s", streamName));
-
- StreamPartition sp = new StreamPartition();
- sp.setStreamId(streamName);
- sp.setColumns(Arrays.asList("host"));
- sp.setType(StreamPartition.Type.GROUPBY);
- pd.addPartition(sp);
- return pd;
- }
-
+
+ @Test
+ public void testNormal() throws Exception {
+ Config config = ConfigFactory.load();
+ DedupCache dedupCache = new DedupCache(config, "testPublishment");
+
+ StreamDefinition stream = createStream();
+ PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
+
+ String[] states = new String[] {"OPEN", "WARN", "CLOSE"};
+ Random random = new Random();
+ for (int i = 0; i < 20; i++) {
+ AlertStreamEvent event = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", states[random.nextInt(3)], 0, 0
+ });
+ HashMap<String, String> dedupFieldValues = new HashMap<String, String>();
+ dedupFieldValues.put("alertKey", (String) event.getData()[event.getSchema().getColumnIndex("alertKey")]);
+ List<AlertStreamEvent> result = dedupCache.dedup(event,
+ new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues),
+ "state",
+ (String) event.getData()[event.getSchema().getColumnIndex("state")], "closed");
+ System.out.println((i + 1) + " >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+
+ Assert.assertTrue(true);
+ }
+
+ private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
+ AlertStreamEvent event = new AlertStreamEvent();
+ event.setPolicyId(policy.getName());
+ event.setSchema(stream);
+ event.setStreamId(stream.getStreamId());
+ event.setTimestamp(System.currentTimeMillis());
+ event.setCreatedTime(System.currentTimeMillis());
+ event.setData(data);
+ return event;
+ }
+
+ private StreamDefinition createStream() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn alertKeyColumn = new StreamColumn();
+ alertKeyColumn.setName("alertKey");
+ alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn stateColumn = new StreamColumn();
+ stateColumn.setName("state");
+ stateColumn.setType(StreamColumn.Type.STRING);
+
+ // dedupCount, dedupFirstOccurrence
+
+ StreamColumn dedupCountColumn = new StreamColumn();
+ dedupCountColumn.setName("dedupCount");
+ dedupCountColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+ dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
+ dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+ sd.setDataSource("testDatasource");
+ sd.setStreamId("testStream");
+ sd.setDescription("test stream");
+ return sd;
+ }
+
+ private PolicyDefinition createPolicy(String streamName, String policyName) {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ //expression, something like "PT5S,dynamic,1,host"
+ def.setValue("test");
+ def.setType("siddhi");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList("inputStream"));
+ pd.setOutputStreams(Arrays.asList("outputStream"));
+ pd.setName(policyName);
+ pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId(streamName);
+ sp.setColumns(Arrays.asList("host"));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ return pd;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 72aef16..e6cbe6c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -32,195 +32,212 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-public class DefaultDeduplicatorTest extends MongoDependencyBaseTest {
-
- @Test
- public void testNormal() throws Exception {
- //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue
- // assume state: OPEN, WARN, CLOSE
- System.setProperty("config.resource", "/application-mongo-statestore.conf");
- Config config = ConfigFactory.load();
- DedupCache dedupCache = new DedupCache(config, "testPublishment");
- DefaultDeduplicator deduplicator = new DefaultDeduplicator(
- "PT1M", Arrays.asList(new String[] { "alertKey" }), "state", "close", dedupCache);
-
- StreamDefinition stream = createStream();
- PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
-
- AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
- });
- AlertStreamEvent e2 = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
- });
- AlertStreamEvent e3 = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
- });
- AlertStreamEvent e4 = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
- });
- AlertStreamEvent e5 = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "CLOSE", 0, 0
- });
- AlertStreamEvent e6 = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
- });
- AlertStreamEvent e7 = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
- });
- AlertStreamEvent e8 = createEvent(stream, policy, new Object[] {
- System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
- });
-
- List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>();
- new Thread(new Runnable() {
- @Override
- public void run() {
- List<AlertStreamEvent> result = deduplicator.dedup(e1);
- if (result != null) allResults.addAll(result);
- System.out.println("1 >>>> " + ToStringBuilder.reflectionToString(result));
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- List<AlertStreamEvent> result = deduplicator.dedup(e2);
- if (result != null) allResults.addAll(result);
- System.out.println("2 >>>> " + ToStringBuilder.reflectionToString(result));
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- List<AlertStreamEvent> result = deduplicator.dedup(e3);
- if (result != null) allResults.addAll(result);
- System.out.println("3 >>>> " + ToStringBuilder.reflectionToString(result));
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- List<AlertStreamEvent> result = deduplicator.dedup(e4);
- if (result != null) allResults.addAll(result);
- System.out.println("4 >>>> " + ToStringBuilder.reflectionToString(result));
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {}
-
- List<AlertStreamEvent> result = deduplicator.dedup(e5);
- if (result != null) allResults.addAll(result);
- System.out.println("5 >>>> " + ToStringBuilder.reflectionToString(result));
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- List<AlertStreamEvent> result = deduplicator.dedup(e6);
- if (result != null) allResults.addAll(result);
- System.out.println("6 >>>> " + ToStringBuilder.reflectionToString(result));
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- List<AlertStreamEvent> result = deduplicator.dedup(e7);
- if (result != null) allResults.addAll(result);
- System.out.println("7 >>>> " + ToStringBuilder.reflectionToString(result));
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- List<AlertStreamEvent> result = deduplicator.dedup(e8);
- if (result != null) allResults.addAll(result);
- System.out.println("8 >>>> " + ToStringBuilder.reflectionToString(result));
- }
- }).start();
-
- Thread.sleep(2000);
-
- long maxCount = 0;
- for (AlertStreamEvent event : allResults) {
- Assert.assertNotNull(event.getData()[4]);
- Assert.assertNotNull(event.getData()[5]);
-
- if (((Long) event.getData()[4]) > maxCount) {
- maxCount = (Long) event.getData()[4];
- System.out.println(String.format(">>>>>%s: %s", event, maxCount));
- }
- }
-
- }
-
- private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
- AlertStreamEvent event = new AlertStreamEvent();
- event.setPolicyId(policy.getName());
- event.setSchema(stream);
- event.setStreamId(stream.getStreamId());
- event.setTimestamp(System.currentTimeMillis());
- event.setCreatedTime(System.currentTimeMillis());
- event.setData(data);
- return event;
- }
-
- private StreamDefinition createStream() {
- StreamDefinition sd = new StreamDefinition();
- StreamColumn tsColumn = new StreamColumn();
- tsColumn.setName("timestamp");
- tsColumn.setType(StreamColumn.Type.LONG);
-
- StreamColumn hostColumn = new StreamColumn();
- hostColumn.setName("host");
- hostColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn alertKeyColumn = new StreamColumn();
- alertKeyColumn.setName("alertKey");
- alertKeyColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn stateColumn = new StreamColumn();
- stateColumn.setName("state");
- stateColumn.setType(StreamColumn.Type.STRING);
-
- // dedupCount, dedupFirstOccurrence
-
- StreamColumn dedupCountColumn = new StreamColumn();
- dedupCountColumn.setName("dedupCount");
- dedupCountColumn.setType(StreamColumn.Type.LONG);
-
- StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
- dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
- dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-
- sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
- sd.setDataSource("testDatasource");
- sd.setStreamId("testStream");
- sd.setDescription("test stream");
- return sd;
- }
-
- private PolicyDefinition createPolicy(String streamName, String policyName) {
- PolicyDefinition pd = new PolicyDefinition();
- PolicyDefinition.Definition def = new PolicyDefinition.Definition();
- //expression, something like "PT5S,dynamic,1,host"
- def.setValue("test");
- def.setType("siddhi");
- pd.setDefinition(def);
- pd.setInputStreams(Arrays.asList("inputStream"));
- pd.setOutputStreams(Arrays.asList("outputStream"));
- pd.setName(policyName);
- pd.setDescription(String.format("Test policy for stream %s", streamName));
-
- StreamPartition sp = new StreamPartition();
- sp.setStreamId(streamName);
- sp.setColumns(Arrays.asList("host"));
- sp.setType(StreamPartition.Type.GROUPBY);
- pd.addPartition(sp);
- return pd;
- }
-
+public class DefaultDeduplicatorTest {
+
+ @Test
+ public void testNormal() throws Exception {
+ //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue
+ // assume state: OPEN, WARN, CLOSE
+ System.setProperty("config.resource", "/application-mongo-statestore.conf");
+ Config config = ConfigFactory.load();
+ DedupCache dedupCache = new DedupCache(config, "testPublishment");
+ DefaultDeduplicator deduplicator = new DefaultDeduplicator(
+ "PT1M", Arrays.asList(new String[] {"alertKey"}), "state", "close", dedupCache);
+
+ StreamDefinition stream = createStream();
+ PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
+
+ AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+ AlertStreamEvent e2 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
+ });
+ AlertStreamEvent e3 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+ AlertStreamEvent e4 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
+ });
+ AlertStreamEvent e5 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "CLOSE", 0, 0
+ });
+ AlertStreamEvent e6 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+ AlertStreamEvent e7 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+ AlertStreamEvent e8 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
+ });
+
+ List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e1);
+ if (result != null) {
+ allResults.addAll(result);
+ }
+ System.out.println("1 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e2);
+ if (result != null) {
+ allResults.addAll(result);
+ }
+ System.out.println("2 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e3);
+ if (result != null) {
+ allResults.addAll(result);
+ }
+ System.out.println("3 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e4);
+ if (result != null) {
+ allResults.addAll(result);
+ }
+ System.out.println("4 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+
+ List<AlertStreamEvent> result = deduplicator.dedup(e5);
+ if (result != null) {
+ allResults.addAll(result);
+ }
+ System.out.println("5 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e6);
+ if (result != null) {
+ allResults.addAll(result);
+ }
+ System.out.println("6 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e7);
+ if (result != null) {
+ allResults.addAll(result);
+ }
+ System.out.println("7 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<AlertStreamEvent> result = deduplicator.dedup(e8);
+ if (result != null) {
+ allResults.addAll(result);
+ }
+ System.out.println("8 >>>> " + ToStringBuilder.reflectionToString(result));
+ }
+ }).start();
+
+ Thread.sleep(2000);
+
+ long maxCount = 0;
+ for (AlertStreamEvent event : allResults) {
+ Assert.assertNotNull(event.getData()[4]);
+ Assert.assertNotNull(event.getData()[5]);
+
+ if (((Long) event.getData()[4]) > maxCount) {
+ maxCount = (Long) event.getData()[4];
+ System.out.println(String.format(">>>>>%s: %s", event, maxCount));
+ }
+ }
+
+ }
+
+ private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
+ AlertStreamEvent event = new AlertStreamEvent();
+ event.setPolicyId(policy.getName());
+ event.setSchema(stream);
+ event.setStreamId(stream.getStreamId());
+ event.setTimestamp(System.currentTimeMillis());
+ event.setCreatedTime(System.currentTimeMillis());
+ event.setData(data);
+ return event;
+ }
+
+ private StreamDefinition createStream() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn alertKeyColumn = new StreamColumn();
+ alertKeyColumn.setName("alertKey");
+ alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn stateColumn = new StreamColumn();
+ stateColumn.setName("state");
+ stateColumn.setType(StreamColumn.Type.STRING);
+
+ // dedupCount, dedupFirstOccurrence
+
+ StreamColumn dedupCountColumn = new StreamColumn();
+ dedupCountColumn.setName("dedupCount");
+ dedupCountColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+ dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
+ dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+ sd.setDataSource("testDatasource");
+ sd.setStreamId("testStream");
+ sd.setDescription("test stream");
+ return sd;
+ }
+
+ private PolicyDefinition createPolicy(String streamName, String policyName) {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ //expression, something like "PT5S,dynamic,1,host"
+ def.setValue("test");
+ def.setType("siddhi");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList("inputStream"));
+ pd.setOutputStreams(Arrays.asList("outputStream"));
+ pd.setName(policyName);
+ pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId(streamName);
+ sp.setColumns(Arrays.asList("host"));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ return pd;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
index 52d4460..a788646 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
@@ -27,11 +27,8 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
import org.apache.eagle.alert.engine.router.TestAlertPublisherBolt;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -40,22 +37,9 @@ import com.fasterxml.jackson.databind.type.SimpleType;
public class ExtendedDeduplicatorTest {
- private DedupEventsStore store;
-
- @Before
- public void setUp() {
- store = Mockito.mock(DedupEventsStore.class);
- DedupEventsStoreFactory.customizeStore(store);
- }
-
- @After
- public void tearDown() {
- Mockito.reset(store);
- }
-
- @Test
- public void testNormal() throws Exception {
- List<Publishment> pubs = loadEntities("/router/publishments-extended-deduplicator.json", Publishment.class);
+ @Test
+ public void testNormal() throws Exception {
+ List<Publishment> pubs = loadEntities("/router/publishments-extended-deduplicator.json", Publishment.class);
AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
AlertStreamEvent event1 = createWithStreamDef("extended_dedup_host1", "extended_dedup_testapp1", "OPEN");
@@ -65,11 +49,10 @@ public class ExtendedDeduplicatorTest {
Assert.assertNotNull(plugin.dedup(event1));
Assert.assertNull(plugin.dedup(event2));
Assert.assertNotNull(plugin.dedup(event3));
-
- Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject());
- }
-
- private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
+
+ }
+
+ private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type);
@@ -95,7 +78,7 @@ public class ExtendedDeduplicatorTest {
StreamColumn hostColumn = new StreamColumn();
hostColumn.setName("hostname");
hostColumn.setType(StreamColumn.Type.STRING);
-
+
StreamColumn stateColumn = new StreamColumn();
stateColumn.setName("state");
stateColumn.setType(StreamColumn.Type.STRING);
@@ -105,5 +88,5 @@ public class ExtendedDeduplicatorTest {
alert.setSchema(sd);
return alert;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
deleted file mode 100644
index ed44267..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-public class MongoDedupStoreTest extends MongoDependencyBaseTest {
-
- @Test
- public void testNormal() throws Exception {
- Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = store.getEvents();
- Assert.assertNotNull(events);
- Assert.assertEquals(0, events.size());
-
- String streamId = "testStream";
- String policyId = "testPolicy";
- long timestamp = System.currentTimeMillis();
- HashMap<String, String> customFieldValues = new HashMap<String, String>();
- customFieldValues.put("alertKey", "test-alert-key");
- EventUniq eventEniq = new EventUniq(streamId, policyId, timestamp, customFieldValues);
-
- ConcurrentLinkedDeque<DedupValue> dedupStateValues = new ConcurrentLinkedDeque<DedupValue>();
- DedupValue one = new DedupValue();
- one.setStateFieldValue("OPEN");
- one.setCount(2);
- one.setCloseTime(0);
- one.setDocId("doc-id-...");
- one.setFirstOccurrence(System.currentTimeMillis());
- dedupStateValues.add(one);
- store.add(eventEniq, dedupStateValues);
-
- events = store.getEvents();
- Assert.assertNotNull(events);
- Assert.assertEquals(1, events.size());
-
- Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry = events.entrySet().iterator().next();
- Assert.assertEquals(streamId, entry.getKey().streamId);
- Assert.assertEquals(1, entry.getValue().size());
- Assert.assertEquals(2, entry.getValue().getLast().getCount());
-
- store.remove(events.keySet().iterator().next());
- events = store.getEvents();
- Assert.assertNotNull(events);
- Assert.assertEquals(0, events.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
deleted file mode 100644
index b7d7613..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.mongodb.client.MongoDatabase;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-public abstract class MongoDependencyBaseTest {
-
- private static Logger LOG = LoggerFactory.getLogger(MongoDependencyBaseTest.class);
-
- private static SimpleEmbedMongo mongo;
- @SuppressWarnings("unused")
- private static MongoDatabase testDB;
- private static Config config;
-
- protected static MongoDedupEventsStore store;
-
- public static void before() {
- try {
- mongo = new SimpleEmbedMongo();
- mongo.start();
- testDB = mongo.getMongoClient().getDatabase("testDb");
- } catch (Exception e) {
- LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e);
- }
- }
-
- @BeforeClass
- public static void setup() throws Exception {
- before();
-
- System.setProperty("config.resource", "/application-mongo-statestore.conf");
- ConfigFactory.invalidateCaches();
- config = ConfigFactory.load();
-
- store = new MongoDedupEventsStore(config, "testPublishment");
- }
-
- @AfterClass
- public static void teardown() {
- if (mongo != null) {
- mongo.shutdown();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 79c03bc..5cdb6f1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,12 +18,11 @@
package org.apache.eagle.alert.engine.router;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.eagle.alert.coordination.model.PublishSpec;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
@@ -32,38 +31,26 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStore;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
import org.apache.eagle.alert.engine.runner.MapComparator;
import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import org.junit.*;
-import org.mockito.Mockito;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
/**
* @Since 5/14/16.
*/
public class TestAlertPublisherBolt {
-
- private DedupEventsStore store;
-
- @Before
- public void setUp() {
- store = Mockito.mock(DedupEventsStore.class);
- DedupEventsStoreFactory.customizeStore(store);
- }
-
- @After
- public void tearDown() {
- Mockito.reset(store);
- }
@SuppressWarnings("rawtypes")
@Ignore
@@ -86,7 +73,7 @@ public class TestAlertPublisherBolt {
policy.setName("policy1");
alert.setPolicyId(policy.getName());
alert.setCreatedTime(System.currentTimeMillis());
- alert.setData(new Object[]{"field_1", 2, "field_3"});
+ alert.setData(new Object[] {"field_1", 2, "field_3"});
alert.setStreamId(streamId);
alert.setCreatedBy(this.toString());
return alert;
@@ -198,7 +185,7 @@ public class TestAlertPublisherBolt {
StreamColumn hostColumn = new StreamColumn();
hostColumn.setName("hostname");
hostColumn.setType(StreamColumn.Type.STRING);
-
+
StreamColumn stateColumn = new StreamColumn();
stateColumn.setName("state");
stateColumn.setType(StreamColumn.Type.STRING);
@@ -209,7 +196,7 @@ public class TestAlertPublisherBolt {
return alert;
}
- @Test
+ @Test
public void testCustomFieldDedupEvent() throws Exception {
List<Publishment> pubs = loadEntities("/router/publishments.json", Publishment.class);
@@ -221,12 +208,9 @@ public class TestAlertPublisherBolt {
Assert.assertNotNull(plugin.dedup(event1));
Assert.assertNull(plugin.dedup(event2));
Assert.assertNotNull(plugin.dedup(event3));
-
- Mockito.verify(store).getEvents();
- Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject());
}
- @Test
+ @Test
public void testEmptyCustomFieldDedupEvent() throws Exception {
List<Publishment> pubs = loadEntities("/router/publishments-empty-dedup-field.json", Publishment.class);
@@ -236,8 +220,6 @@ public class TestAlertPublisherBolt {
Assert.assertNotNull(plugin.dedup(event1));
Assert.assertNull(plugin.dedup(event2));
-
- Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject());
}
private AlertStreamEvent createSeverityWithStreamDef(String hostname, String appName, String message, String severity, String docId, String df_device, String df_type, String colo) {