You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/04/18 21:31:44 UTC

[incubator-pinot] branch master updated: Per User Dimension Alerter - consolidate across dimensions and send per user (#4135)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 044413f  Per User Dimension Alerter - consolidate across dimensions and send per user (#4135)
044413f is described below

commit 044413f12332028a3bcb9509743876342e879e3b
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Thu Apr 18 14:31:39 2019 -0700

    Per User Dimension Alerter - consolidate across dimensions and send per user (#4135)
---
 .../alert/filter/PerUserDimensionAlertFilter.java  | 146 +++++++++++++++++++++
 .../filter/DimensionDetectionAlertFilterTest.java  |   2 -
 ...t.java => PerUserDimensionAlertFilterTest.java} |  94 +++----------
 3 files changed, 162 insertions(+), 80 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilter.java
new file mode 100644
index 0000000..37f69f3
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilter.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.alert.filter;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
+import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.annotation.AlertFilter;
+
+
+/**
+ * The detection alert filter that sends the anomaly email to a set of unconditional and another
+ * set of conditional recipients, based on the value of a specified anomaly dimension
+ *
+ * However, unlike the {$link {@link DimensionDetectionAlertFilter}}, this filter consolidates
+ * anomalies across dimensions and sends alert per user. This is used in scenarios where there
+ * is an overlap of recipients across dimensions, to reduce the number of alerts sent out to a
+ * specific user.
+ */
+@AlertFilter(type = "PER_USER_DIMENSION_ALERTER_PIPELINE")
+public class PerUserDimensionAlertFilter extends StatefulDetectionAlertFilter {
+  private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
+  private static final String PROP_TO = "to";
+  private static final String PROP_CC = "cc";
+  private static final String PROP_BCC = "bcc";
+  private static final String PROP_RECIPIENTS = "recipients";
+  private static final String PROP_DIMENSION = "dimension";
+  private static final String PROP_DIMENSION_RECIPIENTS = "dimensionRecipients";
+  private static final String PROP_SEND_ONCE = "sendOnce";
+
+  final String dimension;
+  final Map<String, Set<String>> recipients;
+  final SetMultimap<String, String> dimensionRecipients;
+  final List<Long> detectionConfigIds;
+  final boolean sendOnce;
+
+  public PerUserDimensionAlertFilter(DataProvider provider, DetectionAlertConfigDTO config, long endTime) {
+    super(provider, config, endTime);
+    Preconditions.checkNotNull(config.getProperties().get(PROP_DIMENSION), "Dimension name not specified");
+
+    this.recipients = ConfigUtils.getMap(this.config.getProperties().get(PROP_RECIPIENTS));
+    this.dimension = MapUtils.getString(this.config.getProperties(), PROP_DIMENSION);
+    this.dimensionRecipients = HashMultimap.create(ConfigUtils.<String, String>getMultimap(this.config.getProperties().get(PROP_DIMENSION_RECIPIENTS)));
+    this.detectionConfigIds = ConfigUtils.getLongs(this.config.getProperties().get(PROP_DETECTION_CONFIG_IDS));
+    this.sendOnce = MapUtils.getBoolean(this.config.getProperties(), PROP_SEND_ONCE, true);
+  }
+
+  @Override
+  public DetectionAlertFilterResult run(Map<Long, Long> vectorClocks, long highWaterMark) {
+    DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+
+    final long minId = getMinId(highWaterMark);
+
+    Set<MergedAnomalyResultDTO> anomalies = this.filter(this.makeVectorClocks(this.detectionConfigIds), minId);
+
+    // group anomalies by dimensions value
+    Multimap<String, MergedAnomalyResultDTO> grouped = Multimaps.index(anomalies, new Function<MergedAnomalyResultDTO, String>() {
+      @Override
+      public String apply(MergedAnomalyResultDTO mergedAnomalyResultDTO) {
+        return MapUtils.getString(mergedAnomalyResultDTO.getDimensions(), PerUserDimensionAlertFilter.this.dimension, "");
+      }
+    });
+
+    // generate recipients-anomalies mapping
+    Map<String, List<MergedAnomalyResultDTO>> perUserAnomalies = new HashMap<>();
+    for (Map.Entry<String, Collection<MergedAnomalyResultDTO>> entry : grouped.asMap().entrySet()) {
+      Set<String> recipients = getRecipients(entry.getKey());
+      for (String recipient : recipients) {
+        if (!perUserAnomalies.containsKey(recipient)) {
+          List<MergedAnomalyResultDTO> userAnomalies = new ArrayList<>();
+          perUserAnomalies.put(recipient, userAnomalies);
+        }
+        perUserAnomalies.get(recipient).addAll(entry.getValue());
+      }
+    }
+
+    for (Map.Entry<String, List<MergedAnomalyResultDTO>> userAnomalyMapping : perUserAnomalies.entrySet()) {
+      result.addMapping(
+          new DetectionAlertFilterRecipients(
+              this.makeGroupRecipients(userAnomalyMapping.getKey()),
+              this.recipients.get(PROP_CC),
+              this.recipients.get(PROP_BCC)),
+          new HashSet<>(userAnomalyMapping.getValue())
+      );
+    }
+
+    return result;
+  }
+
+  private Set<String> getRecipients(String key) {
+    Set<String> recipients = new HashSet<>();
+    if (this.dimensionRecipients.containsKey(key)) {
+      recipients.addAll(this.dimensionRecipients.get(key));
+    }
+    return recipients;
+  }
+
+  private Set<String> makeGroupRecipients(String key) {
+    Set<String> recipients = new HashSet<>(this.recipients.get(PROP_TO));
+    recipients.add(key);
+    return recipients;
+  }
+
+  private long getMinId(long highWaterMark) {
+    if (this.sendOnce) {
+      return highWaterMark + 1;
+    } else {
+      return 0;
+    }
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
index 1c9ec6f..58e78d9 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
@@ -67,7 +67,6 @@ public class DimensionDetectionAlertFilterTest {
 
   private MockDataProvider provider;
   private DetectionAlertConfigDTO alertConfig;
-  private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() {
@@ -90,7 +89,6 @@ public class DimensionDetectionAlertFilterTest {
         .setAnomalies(this.detectedAnomalies);
 
     this.alertConfig = createDetectionAlertConfig();
-    this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
   }
 
   private DetectionAlertConfigDTO createDetectionAlertConfig() {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilterTest.java
similarity index 61%
copy from thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
copy to thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilterTest.java
index 1c9ec6f..24b4f11 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/PerUserDimensionAlertFilterTest.java
@@ -16,14 +16,6 @@
 
 package org.apache.pinot.thirdeye.detection.alert.filter;
 
-import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
-import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter;
-import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
-import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -33,6 +25,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -40,7 +38,7 @@ import org.testng.annotations.Test;
 import static org.apache.pinot.thirdeye.detection.DetectionTestUtils.*;
 
 
-public class DimensionDetectionAlertFilterTest {
+public class PerUserDimensionAlertFilterTest {
 
   private static final String PROP_RECIPIENTS = "recipients";
   private static final String PROP_TO = "to";
@@ -50,7 +48,7 @@ public class DimensionDetectionAlertFilterTest {
   private static final Set<String> PROP_CC_VALUE = new HashSet<>(Arrays.asList("cctest@example.com", "cctest@example.org"));
   private static final Set<String> PROP_BCC_VALUE = new HashSet<>(Arrays.asList("bcctest@example.com", "bcctest@example.org"));
   private static final Set<String> PROP_TO_FOR_VALUE = new HashSet<>(Arrays.asList("myTest@example.com", "myTest@example.org"));
-  private static final Set<String> PROP_TO_FOR_ANOTHER_VALUE = Collections.singleton("myTest@example.net");
+  private static final Set<String> PROP_TO_FOR_ANOTHER_VALUE = new HashSet<>(Arrays.asList("myTest@example.net", "myTest@example.com"));
   private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
   private static final List<Long> PROP_ID_VALUE = Arrays.asList(1001L, 1002L);
   private static final String PROP_DIMENSION = "dimension";
@@ -67,7 +65,6 @@ public class DimensionDetectionAlertFilterTest {
 
   private MockDataProvider provider;
   private DetectionAlertConfigDTO alertConfig;
-  private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() {
@@ -81,16 +78,10 @@ public class DimensionDetectionAlertFilterTest {
     this.detectedAnomalies.add(makeAnomaly(1002L,3333, 9999, Collections.singletonMap("key", "value")));
     this.detectedAnomalies.add(makeAnomaly(1003L,1111, 9999, Collections.singletonMap("key", "value")));
 
-    // Anomalies generated by legacy pipeline
-    this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
-    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
-    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
-
     this.provider = new MockDataProvider()
         .setAnomalies(this.detectedAnomalies);
 
     this.alertConfig = createDetectionAlertConfig();
-    this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
   }
 
   private DetectionAlertConfigDTO createDetectionAlertConfig() {
@@ -117,70 +108,17 @@ public class DimensionDetectionAlertFilterTest {
 
   @Test
   public void testAlertFilterRecipients() throws Exception {
-    this.alertFilter = new DimensionDetectionAlertFilter(provider, alertConfig,2500L);
-
-    DetectionAlertFilterRecipients recDefault = makeRecipients();
-    DetectionAlertFilterRecipients recValue = makeRecipients(PROP_TO_FOR_VALUE);
-    DetectionAlertFilterRecipients recAnotherValue = makeRecipients(PROP_TO_FOR_ANOTHER_VALUE);
-
-    DetectionAlertFilterResult result = this.alertFilter.run();
-    Assert.assertEquals(result.getResult().get(recDefault), makeSet(0, 3, 4));
-    Assert.assertEquals(result.getResult().get(recValue), makeSet(1, 5));
-    Assert.assertEquals(result.getResult().get(recAnotherValue), makeSet(2));
-    // 6, 7 are out of search range
-  }
-
-  @Test
-  public void testAlertFilterNoChildren() throws Exception {
-    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
-    this.alertFilter = new DimensionDetectionAlertFilter(provider, alertConfig,2500L);
-
-    MergedAnomalyResultDTO child = makeAnomaly(1003L, 1234, 9999);
-    child.setChild(true);
-
-    this.detectedAnomalies.add(child);
-
-    DetectionAlertFilterRecipients recValue = makeRecipients(PROP_TO_FOR_VALUE);
-
-    DetectionAlertFilterResult result = this.alertFilter.run();
-    Assert.assertEquals(result.getResult().size(), 1);
-    Assert.assertTrue(result.getResult().containsKey(recValue));
-  }
-
-  @Test
-  public void testAlertFilterFeedback() throws Exception {
-    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
-    this.alertFilter = new DimensionDetectionAlertFilter(provider, alertConfig,2500L);
-
-    AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();
-    feedbackAnomaly.setFeedbackType(AnomalyFeedbackType.ANOMALY);
-
-    AnomalyFeedbackDTO feedbackNoFeedback = new AnomalyFeedbackDTO();
-    feedbackNoFeedback.setFeedbackType(AnomalyFeedbackType.NO_FEEDBACK);
-
-    MergedAnomalyResultDTO anomalyWithFeedback = makeAnomaly(1003L, 1234, 9999);
-    anomalyWithFeedback.setFeedback(feedbackAnomaly);
-
-    MergedAnomalyResultDTO anomalyWithoutFeedback = makeAnomaly(1003L, 1235, 9999);
-    anomalyWithoutFeedback.setFeedback(feedbackNoFeedback);
-
-    MergedAnomalyResultDTO anomalyWithNull = makeAnomaly(1003L, 1236, 9999);
-    anomalyWithNull.setFeedback(null);
-
-    this.detectedAnomalies.add(anomalyWithFeedback);
-    this.detectedAnomalies.add(anomalyWithoutFeedback);
-    this.detectedAnomalies.add(anomalyWithNull);
+    this.alertFilter = new PerUserDimensionAlertFilter(provider, alertConfig,2500L);
 
-    DetectionAlertFilterRecipients recDefault = makeRecipients();
-    DetectionAlertFilterRecipients recValue = makeRecipients(PROP_TO_FOR_VALUE);
+    DetectionAlertFilterRecipients recipient1 = makeRecipients(Collections.singleton("myTest@example.com"));
+    DetectionAlertFilterRecipients recipient2 = makeRecipients(Collections.singleton("myTest@example.org"));
+    DetectionAlertFilterRecipients recipient3 = makeRecipients(Collections.singleton("myTest@example.net"));
 
     DetectionAlertFilterResult result = this.alertFilter.run();
-    Assert.assertEquals(result.getResult().size(), 2);
-    Assert.assertTrue(result.getResult().containsKey(recDefault));
-    Assert.assertEquals(result.getResult().get(recDefault).size(), 2);
-    Assert.assertTrue(result.getResult().get(recDefault).contains(anomalyWithoutFeedback));
-    Assert.assertTrue(result.getResult().get(recDefault).contains(anomalyWithNull));
-    Assert.assertTrue(result.getResult().containsKey(recValue));
+    Assert.assertEquals(result.getResult().size(), 3);
+    Assert.assertEquals(result.getResult().get(recipient1), makeSet(1, 2, 5));
+    Assert.assertEquals(result.getResult().get(recipient2), makeSet(1, 5));
+    Assert.assertEquals(result.getResult().get(recipient3), makeSet(2));
   }
 
   private Set<MergedAnomalyResultDTO> makeSet(int... anomalyIndices) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org