You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/09/28 08:05:00 UTC
[1/2] incubator-eagle git commit: EAGLE-576: dedup enhancements
Repository: incubator-eagle
Updated Branches:
refs/heads/master afb897940 -> a1285351d
EAGLE-576: dedup enhancements
1. per publishment per cache, without global cache
2. support dedup without state field
3. emit raw alert into configured namespace
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/eb262861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/eb262861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/eb262861
Branch: refs/heads/master
Commit: eb26286126c34272967d1159d07b0d645157c61a
Parents: 1fa490e
Author: Xiancheng Li <xi...@ebay.com>
Authored: Wed Sep 28 08:54:43 2016 +0800
Committer: Xiancheng Li <xi...@ebay.com>
Committed: Wed Sep 28 14:25:01 2016 +0800
----------------------------------------------------------------------
.../engine/publisher/PublishConstants.java | 3 +
.../engine/publisher/dedup/DedupCache.java | 63 ++++---
.../publisher/impl/AbstractPublishPlugin.java | 3 +-
.../publisher/impl/AlertKafkaPublisher.java | 29 ++-
.../publisher/impl/DefaultDeduplicator.java | 32 +++-
.../publisher/dedup/DedupCacheStoreTest.java | 2 +-
.../engine/publisher/dedup/DedupCacheTest.java | 2 +-
.../dedup/DefaultDedupWithoutStateTest.java | 183 +++++++++++++++++++
.../dedup/DefaultDeduplicatorTest.java | 2 +-
9 files changed, 270 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
index ce57a6e..7408779 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
@@ -50,5 +50,8 @@ public class PublishConstants {
public static final String ALERT_EMAIL_TIMESTAMP = "alertTime";
public static final String ALERT_EMAIL_POLICY = "policyId";
public static final String ALERT_EMAIL_CREATOR = "creator";
+
+ public static final String RAW_ALERT_NAMESPACE_LABEL = "rawAlertNamespaceLabel";
+ public static final String RAW_ALERT_NAMESPACE_VALUE = "rawAlertNamespaceValue";
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index fc2d6e6..503a1ce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -48,38 +48,45 @@ public class DedupCache {
private long lastUpdated = -1;
private Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
+ private static final ConcurrentLinkedDeque<DedupCache> caches = new ConcurrentLinkedDeque<DedupCache>();
+
private Config config;
- private static DedupCache INSTANCE;
-
- public static synchronized DedupCache getInstance(Config config) {
- if (INSTANCE == null) {
- INSTANCE = new DedupCache();
- INSTANCE.config = config;
-
- // create daemon to clean up old removable events periodically
- ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setDaemon(true);
- return t;
- }
- });
- scheduleSrv.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- HashSet<EventUniq> eventUniqs = new HashSet<EventUniq>(INSTANCE.getEvents().keySet());
- for (EventUniq one : eventUniqs) {
- if (one.removable && one.createdTime < System.currentTimeMillis() - 3600000 * 24) {
- INSTANCE.removeEvent(one);
- LOG.info("Remove dedup key {} from cache & db", one);
+ public DedupCache(Config config) {
+ this.config = config;
+ // only happens during startup, won't introduce perf issue here
+ synchronized (caches) {
+ if (caches.size() == 0) {
+ // create daemon to clean up old removable events periodically
+ ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ scheduleSrv.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ for (DedupCache cache : caches) {
+ if (cache == null || cache.getEvents() == null) {
+ continue;
+ }
+ HashSet<EventUniq> eventUniqs = new HashSet<EventUniq>(cache.getEvents().keySet());
+ for (EventUniq one : eventUniqs) {
+ if (one.removable && one.createdTime < System.currentTimeMillis() - 3600000 * 24) {
+ cache.removeEvent(one);
+ LOG.info("Remove dedup key {} from cache & db", one);
+ }
+ }
}
}
- }
- }, 5, 60, TimeUnit.MINUTES);
+ }, 5, 60, TimeUnit.MINUTES);
+ }
+ caches.add(this);
}
- return INSTANCE;
+ LOG.info("Create daemon to clean up old removable events periodicall");
}
public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
@@ -201,7 +208,7 @@ public class DedupCache {
DedupValue dedupValue = dedupValues.getLast();
dedupValue.setCount(dedupValue.getCount() + 1);
String updateMsg = String.format(
- "Update count for dedup key {}, value %s and count %s", eventEniq,
+ "Update count for dedup key %s, value %s and count %s", eventEniq,
dedupValue.getStateFieldValue(), dedupValue.getCount());
if (dedupValue.getCount() > 0 && dedupValue.getCount() % 100 == 0) {
LOG.info(updateMsg);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 743ce91..3e293cd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -47,8 +47,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
@SuppressWarnings("rawtypes")
@Override
public void init(Config config, Publishment publishment, Map conf) throws Exception {
- DedupCache dedupCache = DedupCache.getInstance(config);
-
+ DedupCache dedupCache = new DedupCache(config);
OverrideDeduplicatorSpec spec = publishment.getOverrideDeduplicator();
if (spec != null && StringUtils.isNotBlank(spec.getClassName())) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index 27314bf..e5c351b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -18,6 +18,7 @@
package org.apache.eagle.alert.engine.publisher.impl;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,17 +29,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import com.typesafe.config.Config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import com.typesafe.config.Config;
public class AlertKafkaPublisher extends AbstractPublishPlugin {
@@ -49,6 +45,8 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
private KafkaProducer producer;
private String brokerList;
private String topic;
+ private String namespaceLabel;
+ private String namespaceValue;
@Override
@SuppressWarnings("rawtypes")
@@ -60,6 +58,8 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim();
producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, kafkaConfig);
topic = kafkaConfig.get(PublishConstants.TOPIC).trim();
+ namespaceLabel = kafkaConfig.getOrDefault(PublishConstants.RAW_ALERT_NAMESPACE_LABEL, "namespace");
+ namespaceValue = kafkaConfig.getOrDefault(PublishConstants.RAW_ALERT_NAMESPACE_VALUE, "network");
}
}
@@ -70,9 +70,20 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
LOG.warn("KafkaProducer is null due to the incorrect configurations");
return;
}
- List<AlertStreamEvent> outputEvents = dedup(event);
- if (outputEvents == null) {
- return;
+ List<AlertStreamEvent> outputEvents = new ArrayList<AlertStreamEvent>();
+
+ int namespaceColumnIndex = event.getSchema().getColumnIndex(namespaceLabel);
+ if (namespaceColumnIndex < 0 || namespaceColumnIndex >= event.getData().length) {
+ LOG.warn("Namespace column {} is not found, the found index {} is invalid",
+ namespaceLabel, namespaceColumnIndex);
+ } else {
+ event.getData()[namespaceColumnIndex] = namespaceValue;
+ outputEvents.add(event);
+ }
+
+ List<AlertStreamEvent> dedupResults = dedup(event);
+ if (dedupResults != null) {
+ outputEvents.addAll(dedupResults);
}
PublishStatus status = new PublishStatus();
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index d284da5..54ff346 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -31,19 +32,23 @@ import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
public class DefaultDeduplicator implements AlertDeduplicator {
private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
- @SuppressWarnings("unused")
- private long dedupIntervalMin;
+ private long dedupIntervalSec;
private List<String> customDedupFields = new ArrayList<>();
private String dedupStateField;
private DedupCache dedupCache;
+ private Cache<EventUniq, String> withoutStatesCache;
+
public DefaultDeduplicator() {
- this.dedupIntervalMin = 0;
+ this.dedupIntervalSec = 0;
}
public DefaultDeduplicator(String intervalMin) {
@@ -51,7 +56,7 @@ public class DefaultDeduplicator implements AlertDeduplicator {
}
public DefaultDeduplicator(long intervalMin) {
- this.dedupIntervalMin = intervalMin;
+ this.dedupIntervalSec = intervalMin;
}
public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
@@ -64,6 +69,9 @@ public class DefaultDeduplicator implements AlertDeduplicator {
this.dedupStateField = dedupStateField;
}
this.dedupCache = dedupCache;
+
+ withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
+ this.dedupIntervalSec, TimeUnit.SECONDS).build();
}
/*
@@ -74,6 +82,16 @@ public class DefaultDeduplicator implements AlertDeduplicator {
if (StringUtils.isBlank(stateFiledValue)) {
// without state field, we cannot determine whether it is duplicated
// without custom filed values, we cannot determine whether it is duplicated
+ synchronized (withoutStatesCache) {
+ if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key) != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Alert event {} with key {} is skipped since it is duplicated", event, key);
+ }
+ return null;
+ } else if (withoutStatesCache != null) {
+ withoutStatesCache.put(key, "");
+ }
+ }
return Arrays.asList(event);
}
return dedupCache.dedup(event, key, dedupStateField, stateFiledValue);
@@ -126,15 +144,15 @@ public class DefaultDeduplicator implements AlertDeduplicator {
@Override
public void setDedupIntervalMin(String newDedupIntervalMin) {
if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
- dedupIntervalMin = 0;
+ dedupIntervalSec = 0;
return;
}
try {
Period period = Period.parse(newDedupIntervalMin);
- this.dedupIntervalMin = period.toStandardMinutes().getMinutes();
+ this.dedupIntervalSec = period.toStandardSeconds().getSeconds();
} catch (Exception e) {
LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
- this.dedupIntervalMin = 0;
+ this.dedupIntervalSec = 0;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
index c830aca..5e56bb7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
@@ -50,7 +50,7 @@ public class DedupCacheStoreTest extends MongoDependencyBaseTest {
System.setProperty("config.resource", "/application-mongo-statestore.conf");
Config config = ConfigFactory.load();
- DedupCache cache = DedupCache.getInstance(config);
+ DedupCache cache = new DedupCache(config);
cache.addOrUpdate(eventUniq, (String) event.getData()[event.getSchema().getColumnIndex("state")]);
DedupEventsStore accessor = DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index 779abb0..e996376 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -55,7 +55,7 @@ public class DedupCacheTest {
@Test
public void testNormal() throws Exception {
Config config = ConfigFactory.load();
- DedupCache dedupCache = DedupCache.getInstance(config);
+ DedupCache dedupCache = new DedupCache(config);
StreamDefinition stream = createStream();
PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
new file mode 100644
index 0000000..57c113b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class DefaultDedupWithoutStateTest {
+
+ @Test
+ public void testNormal() throws Exception {
+ //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue
+ // assume state: OPEN, WARN, CLOSE
+ System.setProperty("config.resource", "/application-mongo-statestore.conf");
+ Config config = ConfigFactory.load();
+ DedupCache dedupCache = new DedupCache(config);
+ DefaultDeduplicator deduplicator = new DefaultDeduplicator(
+ "PT10S", Arrays.asList(new String[] { "alertKey" }), null, dedupCache);
+
+ StreamDefinition stream = createStream();
+ PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
+
+ int[] hostIndex = new int[] { 1, 2, 3 };
+ String[] states = new String[] { "OPEN", "WARN", "CLOSE" };
+ Random random = new Random();
+
+ final ConcurrentLinkedDeque<AlertStreamEvent> nonDedupResult = new ConcurrentLinkedDeque<AlertStreamEvent>();
+
+ for (int i = 0; i < 100; i ++) {
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ int index = hostIndex[random.nextInt(3)];
+ AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host" + index,
+ String.format("testPolicy-host%s-01", index),
+ states[random.nextInt(3)], 0, 0
+ });
+ List<AlertStreamEvent> result = deduplicator.dedup(e1);
+ if (result != null) {
+ System.out.println(">>>" + Joiner.on(",").join(result));
+ nonDedupResult.addAll(result);
+ } else {
+ System.out.println(">>>" + result);
+ }
+ }
+
+ }).start();
+ }
+
+ Thread.sleep(1000);
+
+ System.out.println("old size: " + nonDedupResult.size());
+ Assert.assertTrue(nonDedupResult.size() > 0 && nonDedupResult.size() <= 3);
+
+ Thread.sleep(15000);
+
+ for (int i = 0; i < 100; i ++) {
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ int index = hostIndex[random.nextInt(3)];
+ AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
+ System.currentTimeMillis(), "host" + index,
+ String.format("testPolicy-host%s-01", index),
+ states[random.nextInt(3)], 0, 0
+ });
+ List<AlertStreamEvent> result = deduplicator.dedup(e1);
+ if (result != null) {
+ System.out.println(">>>" + Joiner.on(",").join(result));
+ nonDedupResult.addAll(result);
+ } else {
+ System.out.println(">>>" + result);
+ }
+ }
+
+ }).start();
+ }
+
+ Thread.sleep(1000);
+
+ System.out.println("new size: " + nonDedupResult.size());
+ Assert.assertTrue(nonDedupResult.size() > 3 && nonDedupResult.size() <= 6);
+ }
+
+ private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
+ AlertStreamEvent event = new AlertStreamEvent();
+ event.setPolicyId(policy.getName());
+ event.setSchema(stream);
+ event.setStreamId(stream.getStreamId());
+ event.setTimestamp(System.currentTimeMillis());
+ event.setCreatedTime(System.currentTimeMillis());
+ event.setData(data);
+ return event;
+ }
+
+ private StreamDefinition createStream() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn alertKeyColumn = new StreamColumn();
+ alertKeyColumn.setName("alertKey");
+ alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn stateColumn = new StreamColumn();
+ stateColumn.setName("state");
+ stateColumn.setType(StreamColumn.Type.STRING);
+
+ // dedupCount, dedupFirstOccurrence
+
+ StreamColumn dedupCountColumn = new StreamColumn();
+ dedupCountColumn.setName("dedupCount");
+ dedupCountColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+ dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
+ dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+ sd.setDataSource("testDatasource");
+ sd.setStreamId("testStream");
+ sd.setDescription("test stream");
+ return sd;
+ }
+
+ private PolicyDefinition createPolicy(String streamName, String policyName) {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ //expression, something like "PT5S,dynamic,1,host"
+ def.setValue("test");
+ def.setType("siddhi");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList("inputStream"));
+ pd.setOutputStreams(Arrays.asList("outputStream"));
+ pd.setName(policyName);
+ pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId(streamName);
+ sp.setColumns(Arrays.asList("host"));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ return pd;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 81ba35e..3a17e53 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -40,7 +40,7 @@ public class DefaultDeduplicatorTest extends MongoDependencyBaseTest {
// assume state: OPEN, WARN, CLOSE
System.setProperty("config.resource", "/application-mongo-statestore.conf");
Config config = ConfigFactory.load();
- DedupCache dedupCache = DedupCache.getInstance(config);
+ DedupCache dedupCache = new DedupCache(config);
DefaultDeduplicator deduplicator = new DefaultDeduplicator(
"PT1M", Arrays.asList(new String[] { "alertKey" }), "state", dedupCache);
[2/2] incubator-eagle git commit: Merge branch 'master' of
https://github.com/garrettlish/incubator-eagle
Posted by ra...@apache.org.
Merge branch 'master' of https://github.com/garrettlish/incubator-eagle
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a1285351
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a1285351
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a1285351
Branch: refs/heads/master
Commit: a1285351d82c86b27d12bf0a4872f5e3e745c72e
Parents: afb8979 eb26286
Author: Ralph, Su <su...@gmail.com>
Authored: Wed Sep 28 00:57:44 2016 -0700
Committer: Ralph, Su <su...@gmail.com>
Committed: Wed Sep 28 00:57:44 2016 -0700
----------------------------------------------------------------------
.../engine/publisher/PublishConstants.java | 3 +
.../engine/publisher/dedup/DedupCache.java | 63 ++++---
.../publisher/impl/AbstractPublishPlugin.java | 3 +-
.../publisher/impl/AlertKafkaPublisher.java | 29 ++-
.../publisher/impl/DefaultDeduplicator.java | 32 +++-
.../publisher/dedup/DedupCacheStoreTest.java | 2 +-
.../engine/publisher/dedup/DedupCacheTest.java | 2 +-
.../dedup/DefaultDedupWithoutStateTest.java | 183 +++++++++++++++++++
.../dedup/DefaultDeduplicatorTest.java | 2 +-
9 files changed, 270 insertions(+), 49 deletions(-)
----------------------------------------------------------------------