You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/04/18 21:31:44 UTC
[incubator-pinot] branch master updated: Per User Dimension Alerter
- consolidate across dimensions and send per user (#4135)
This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 044413f Per User Dimension Alerter - consolidate across dimensions and send per user (#4135)
044413f is described below
commit 044413f12332028a3bcb9509743876342e879e3b
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Thu Apr 18 14:31:39 2019 -0700
Per User Dimension Alerter - consolidate across dimensions and send per user (#4135)
---
.../alert/filter/PerUserDimensionAlertFilter.java | 146 +++++++++++++++++++++
.../filter/DimensionDetectionAlertFilterTest.java | 2 -
...t.java => PerUserDimensionAlertFilterTest.java} | 94 +++----------
3 files changed, 162 insertions(+), 80 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilter.java
new file mode 100644
index 0000000..37f69f3
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilter.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.alert.filter;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
+import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.annotation.AlertFilter;
+
+
+/**
+ * The detection alert filter that sends the anomaly email to a set of unconditional and another
+ * set of conditional recipients, based on the value of a specified anomaly dimension
+ *
+ * However, unlike the {$link {@link DimensionDetectionAlertFilter}}, this filter consolidates
+ * anomalies across dimensions and sends alert per user. This is used in scenarios where there
+ * is an overlap of recipients across dimensions, to reduce the number of alerts sent out to a
+ * specific user.
+ */
+@AlertFilter(type = "PER_USER_DIMENSION_ALERTER_PIPELINE")
+public class PerUserDimensionAlertFilter extends StatefulDetectionAlertFilter {
+ private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
+ private static final String PROP_TO = "to";
+ private static final String PROP_CC = "cc";
+ private static final String PROP_BCC = "bcc";
+ private static final String PROP_RECIPIENTS = "recipients";
+ private static final String PROP_DIMENSION = "dimension";
+ private static final String PROP_DIMENSION_RECIPIENTS = "dimensionRecipients";
+ private static final String PROP_SEND_ONCE = "sendOnce";
+
+ final String dimension;
+ final Map<String, Set<String>> recipients;
+ final SetMultimap<String, String> dimensionRecipients;
+ final List<Long> detectionConfigIds;
+ final boolean sendOnce;
+
+ public PerUserDimensionAlertFilter(DataProvider provider, DetectionAlertConfigDTO config, long endTime) {
+ super(provider, config, endTime);
+ Preconditions.checkNotNull(config.getProperties().get(PROP_DIMENSION), "Dimension name not specified");
+
+ this.recipients = ConfigUtils.getMap(this.config.getProperties().get(PROP_RECIPIENTS));
+ this.dimension = MapUtils.getString(this.config.getProperties(), PROP_DIMENSION);
+ this.dimensionRecipients = HashMultimap.create(ConfigUtils.<String, String>getMultimap(this.config.getProperties().get(PROP_DIMENSION_RECIPIENTS)));
+ this.detectionConfigIds = ConfigUtils.getLongs(this.config.getProperties().get(PROP_DETECTION_CONFIG_IDS));
+ this.sendOnce = MapUtils.getBoolean(this.config.getProperties(), PROP_SEND_ONCE, true);
+ }
+
+ @Override
+ public DetectionAlertFilterResult run(Map<Long, Long> vectorClocks, long highWaterMark) {
+ DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+
+ final long minId = getMinId(highWaterMark);
+
+ Set<MergedAnomalyResultDTO> anomalies = this.filter(this.makeVectorClocks(this.detectionConfigIds), minId);
+
+ // group anomalies by dimensions value
+ Multimap<String, MergedAnomalyResultDTO> grouped = Multimaps.index(anomalies, new Function<MergedAnomalyResultDTO, String>() {
+ @Override
+ public String apply(MergedAnomalyResultDTO mergedAnomalyResultDTO) {
+ return MapUtils.getString(mergedAnomalyResultDTO.getDimensions(), PerUserDimensionAlertFilter.this.dimension, "");
+ }
+ });
+
+ // generate recipients-anomalies mapping
+ Map<String, List<MergedAnomalyResultDTO>> perUserAnomalies = new HashMap<>();
+ for (Map.Entry<String, Collection<MergedAnomalyResultDTO>> entry : grouped.asMap().entrySet()) {
+ Set<String> recipients = getRecipients(entry.getKey());
+ for (String recipient : recipients) {
+ if (!perUserAnomalies.containsKey(recipient)) {
+ List<MergedAnomalyResultDTO> userAnomalies = new ArrayList<>();
+ perUserAnomalies.put(recipient, userAnomalies);
+ }
+ perUserAnomalies.get(recipient).addAll(entry.getValue());
+ }
+ }
+
+ for (Map.Entry<String, List<MergedAnomalyResultDTO>> userAnomalyMapping : perUserAnomalies.entrySet()) {
+ result.addMapping(
+ new DetectionAlertFilterRecipients(
+ this.makeGroupRecipients(userAnomalyMapping.getKey()),
+ this.recipients.get(PROP_CC),
+ this.recipients.get(PROP_BCC)),
+ new HashSet<>(userAnomalyMapping.getValue())
+ );
+ }
+
+ return result;
+ }
+
+ private Set<String> getRecipients(String key) {
+ Set<String> recipients = new HashSet<>();
+ if (this.dimensionRecipients.containsKey(key)) {
+ recipients.addAll(this.dimensionRecipients.get(key));
+ }
+ return recipients;
+ }
+
+ private Set<String> makeGroupRecipients(String key) {
+ Set<String> recipients = new HashSet<>(this.recipients.get(PROP_TO));
+ recipients.add(key);
+ return recipients;
+ }
+
+ private long getMinId(long highWaterMark) {
+ if (this.sendOnce) {
+ return highWaterMark + 1;
+ } else {
+ return 0;
+ }
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
index 1c9ec6f..58e78d9 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
@@ -67,7 +67,6 @@ public class DimensionDetectionAlertFilterTest {
private MockDataProvider provider;
private DetectionAlertConfigDTO alertConfig;
- private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
@BeforeMethod
public void beforeMethod() {
@@ -90,7 +89,6 @@ public class DimensionDetectionAlertFilterTest {
.setAnomalies(this.detectedAnomalies);
this.alertConfig = createDetectionAlertConfig();
- this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
}
private DetectionAlertConfigDTO createDetectionAlertConfig() {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilterTest.java
similarity index 61%
copy from thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
copy to thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilterTest.java
index 1c9ec6f..24b4f11 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilterTest.java
@@ -16,14 +16,6 @@
package org.apache.pinot.thirdeye.detection.alert.filter;
-import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
-import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter;
-import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
-import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -33,6 +25,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -40,7 +38,7 @@ import org.testng.annotations.Test;
import static org.apache.pinot.thirdeye.detection.DetectionTestUtils.*;
-public class DimensionDetectionAlertFilterTest {
+public class PerUserDimensionAlertFilterTest {
private static final String PROP_RECIPIENTS = "recipients";
private static final String PROP_TO = "to";
@@ -50,7 +48,7 @@ public class DimensionDetectionAlertFilterTest {
private static final Set<String> PROP_CC_VALUE = new HashSet<>(Arrays.asList("cctest@example.com", "cctest@example.org"));
private static final Set<String> PROP_BCC_VALUE = new HashSet<>(Arrays.asList("bcctest@example.com", "bcctest@example.org"));
private static final Set<String> PROP_TO_FOR_VALUE = new HashSet<>(Arrays.asList("myTest@example.com", "myTest@example.org"));
- private static final Set<String> PROP_TO_FOR_ANOTHER_VALUE = Collections.singleton("myTest@example.net");
+ private static final Set<String> PROP_TO_FOR_ANOTHER_VALUE = new HashSet<>(Arrays.asList("myTest@example.net", "myTest@example.com"));
private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
private static final List<Long> PROP_ID_VALUE = Arrays.asList(1001L, 1002L);
private static final String PROP_DIMENSION = "dimension";
@@ -67,7 +65,6 @@ public class DimensionDetectionAlertFilterTest {
private MockDataProvider provider;
private DetectionAlertConfigDTO alertConfig;
- private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
@BeforeMethod
public void beforeMethod() {
@@ -81,16 +78,10 @@ public class DimensionDetectionAlertFilterTest {
this.detectedAnomalies.add(makeAnomaly(1002L,3333, 9999, Collections.singletonMap("key", "value")));
this.detectedAnomalies.add(makeAnomaly(1003L,1111, 9999, Collections.singletonMap("key", "value")));
- // Anomalies generated by legacy pipeline
- this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
- this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
- this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
-
this.provider = new MockDataProvider()
.setAnomalies(this.detectedAnomalies);
this.alertConfig = createDetectionAlertConfig();
- this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
}
private DetectionAlertConfigDTO createDetectionAlertConfig() {
@@ -117,70 +108,17 @@ public class DimensionDetectionAlertFilterTest {
@Test
public void testAlertFilterRecipients() throws Exception {
- this.alertFilter = new DimensionDetectionAlertFilter(provider, alertConfig,2500L);
-
- DetectionAlertFilterRecipients recDefault = makeRecipients();
- DetectionAlertFilterRecipients recValue = makeRecipients(PROP_TO_FOR_VALUE);
- DetectionAlertFilterRecipients recAnotherValue = makeRecipients(PROP_TO_FOR_ANOTHER_VALUE);
-
- DetectionAlertFilterResult result = this.alertFilter.run();
- Assert.assertEquals(result.getResult().get(recDefault), makeSet(0, 3, 4));
- Assert.assertEquals(result.getResult().get(recValue), makeSet(1, 5));
- Assert.assertEquals(result.getResult().get(recAnotherValue), makeSet(2));
- // 6, 7 are out of search range
- }
-
- @Test
- public void testAlertFilterNoChildren() throws Exception {
- this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
- this.alertFilter = new DimensionDetectionAlertFilter(provider, alertConfig,2500L);
-
- MergedAnomalyResultDTO child = makeAnomaly(1003L, 1234, 9999);
- child.setChild(true);
-
- this.detectedAnomalies.add(child);
-
- DetectionAlertFilterRecipients recValue = makeRecipients(PROP_TO_FOR_VALUE);
-
- DetectionAlertFilterResult result = this.alertFilter.run();
- Assert.assertEquals(result.getResult().size(), 1);
- Assert.assertTrue(result.getResult().containsKey(recValue));
- }
-
- @Test
- public void testAlertFilterFeedback() throws Exception {
- this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
- this.alertFilter = new DimensionDetectionAlertFilter(provider, alertConfig,2500L);
-
- AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();
- feedbackAnomaly.setFeedbackType(AnomalyFeedbackType.ANOMALY);
-
- AnomalyFeedbackDTO feedbackNoFeedback = new AnomalyFeedbackDTO();
- feedbackNoFeedback.setFeedbackType(AnomalyFeedbackType.NO_FEEDBACK);
-
- MergedAnomalyResultDTO anomalyWithFeedback = makeAnomaly(1003L, 1234, 9999);
- anomalyWithFeedback.setFeedback(feedbackAnomaly);
-
- MergedAnomalyResultDTO anomalyWithoutFeedback = makeAnomaly(1003L, 1235, 9999);
- anomalyWithoutFeedback.setFeedback(feedbackNoFeedback);
-
- MergedAnomalyResultDTO anomalyWithNull = makeAnomaly(1003L, 1236, 9999);
- anomalyWithNull.setFeedback(null);
-
- this.detectedAnomalies.add(anomalyWithFeedback);
- this.detectedAnomalies.add(anomalyWithoutFeedback);
- this.detectedAnomalies.add(anomalyWithNull);
+ this.alertFilter = new PerUserDimensionAlertFilter(provider, alertConfig,2500L);
- DetectionAlertFilterRecipients recDefault = makeRecipients();
- DetectionAlertFilterRecipients recValue = makeRecipients(PROP_TO_FOR_VALUE);
+ DetectionAlertFilterRecipients recipient1 = makeRecipients(Collections.singleton("myTest@example.com"));
+ DetectionAlertFilterRecipients recipient2 = makeRecipients(Collections.singleton("myTest@example.org"));
+ DetectionAlertFilterRecipients recipient3 = makeRecipients(Collections.singleton("myTest@example.net"));
DetectionAlertFilterResult result = this.alertFilter.run();
- Assert.assertEquals(result.getResult().size(), 2);
- Assert.assertTrue(result.getResult().containsKey(recDefault));
- Assert.assertEquals(result.getResult().get(recDefault).size(), 2);
- Assert.assertTrue(result.getResult().get(recDefault).contains(anomalyWithoutFeedback));
- Assert.assertTrue(result.getResult().get(recDefault).contains(anomalyWithNull));
- Assert.assertTrue(result.getResult().containsKey(recValue));
+ Assert.assertEquals(result.getResult().size(), 3);
+ Assert.assertEquals(result.getResult().get(recipient1), makeSet(1, 2, 5));
+ Assert.assertEquals(result.getResult().get(recipient2), makeSet(1, 5));
+ Assert.assertEquals(result.getResult().get(recipient3), makeSet(2));
}
private Set<MergedAnomalyResultDTO> makeSet(int... anomalyIndices) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org