You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/10/15 20:22:15 UTC

[incubator-pinot] branch master updated: [TE] [notification] Multi-dimension based conditional alerting (#4703)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eed7c1a  [TE] [notification] Multi-dimension based conditional alerting (#4703)
eed7c1a is described below

commit eed7c1a7a18602b2eabd711eb01610d0ec6ab752
Author: Akshay Rai <ak...@linkedin.com>
AuthorDate: Tue Oct 15 13:22:07 2019 -0700

    [TE] [notification] Multi-dimension based conditional alerting (#4703)
    
    * [TE][notification] Multi-dimension based conditional alerting
    
    * Addressed comments from Jihao
    
    * fixed the tests
---
 .../thirdeye/common/dimension/DimensionMap.java    |  16 ++
 .../datalayer/pojo/MergedAnomalyResultBean.java    |   7 +
 .../filter/DimensionsRecipientAlertFilter.java     | 134 ++++++++++
 .../alert/scheme/DetectionEmailAlerter.java        |   8 +-
 .../alert/scheme/DetectionJiraAlerter.java         |   2 +-
 .../validators/SubscriptionConfigValidator.java    |   6 -
 .../thirdeye/detection/yaml/YamlResource.java      |   2 +-
 .../pinot/thirdeye/rootcause/util/EntityUtils.java |   2 +-
 .../thirdeye/detection/DetectionTestUtils.java     |   9 +
 .../filter/DimensionsDetectionAlertFilterTest.java | 274 +++++++++++++++++++++
 10 files changed, 448 insertions(+), 12 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/dimension/DimensionMap.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/dimension/DimensionMap.java
index 88d1576..4317e68 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/dimension/DimensionMap.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/dimension/DimensionMap.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
  * converting to/from Json string in Map format, i.e., instead of storing {"sortedDimensionMap":{"country":"US",
  * "page_name":"front_page"}}, we only need to store {"country":"US","page_name":"front_page"}.
  */
+@Deprecated
 public class DimensionMap implements SortedMap<String, String>, Comparable<DimensionMap>, Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(DimensionMap.class);
   private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -114,6 +115,21 @@ public class DimensionMap implements SortedMap<String, String>, Comparable<Dimen
     return dimensionMap;
   }
 
+  // Check if this dimension map contains filters configured in that
+  public boolean contains(Map<String, String> that) {
+    if (that == null || that.size() == 0) {
+      return this.size() == 0;
+    }
+
+    for (Entry<String, String> filter : that.entrySet()) {
+      if (this.get(filter.getKey()) == null || !filter.getValue().equalsIgnoreCase(this.get(filter.getKey()))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
   /**
    * Returns if this dimension map equals or is a child of the given dimension map, i.e., the given dimension map is
    * a subset of this dimension map.
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
index f78bba1..ea87766 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
@@ -136,10 +136,17 @@ public class MergedAnomalyResultBean extends AbstractBean implements Comparable<
     this.endTime = endTime;
   }
 
+  /**
+   * DimensionMap class is deprecated.
+   *
+   * Please use MetricEntity.fromURN(anomaly.getMetricUrn()).getFilters()
+   */
+  @Deprecated
   public DimensionMap getDimensions() {
     return dimensions;
   }
 
+  @Deprecated
   public void setDimensions(DimensionMap dimensions) {
     this.dimensions = dimensions;
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionsRecipientAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionsRecipientAlertFilter.java
new file mode 100644
index 0000000..05a5692
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionsRecipientAlertFilter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.alert.filter;
+
+import com.google.common.collect.Multimap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterNotification;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
+import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.annotation.AlertFilter;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+
+/**
+ * The detection alert filter that can send notifications through multiple channels
+ * to a set of unconditional and another set of conditional recipients, based on the
+ * value of a specified anomaly dimension combinations.
+ *
+ * <pre>
+ * dimensionRecipients:
+ *   - dimensions:
+ *       country: IN
+ *       device: Android
+ *     notify:
+ *       jiraScheme:
+ *         project: ANDROID
+ *         assignee: android-oncall
+ *       emailScheme:
+ *         recipients:
+ *           to:
+ *           - "android-team@comany.com"
+ *   - dimension:
+ *       country: US
+ *       device: IOS
+ *     notify:
+ *       jiraScheme:
+ *         project: IOS
+ *         assignee: ios-oncall
+ * </pre>
+ */
+@AlertFilter(type = "DIMENSIONS_ALERTER_PIPELINE")
+public class DimensionsRecipientAlertFilter extends StatefulDetectionAlertFilter {
+  protected static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
+  protected static final String PROP_DIMENSION = "dimensions";
+  protected static final String PROP_NOTIFY = "notify";
+  protected static final String PROP_DIMENSION_RECIPIENTS = "dimensionRecipients";
+  private static final String PROP_SEND_ONCE = "sendOnce";
+
+  private Map<String, Object> defaultNotificationSchemeProps = new HashMap<>();
+  final List<Map<String, Object>> dimensionRecipients;
+  final List<Long> detectionConfigIds;
+  final boolean sendOnce;
+
+  public DimensionsRecipientAlertFilter(DataProvider provider, DetectionAlertConfigDTO config, long endTime) {
+    super(provider, config, endTime);
+    for (Map.Entry<String, Map<String, Object>> schemeProps : this.config.getAlertSchemes().entrySet()) {
+      defaultNotificationSchemeProps.put(schemeProps.getKey(), new HashMap<>(schemeProps.getValue()));
+    }
+    this.dimensionRecipients = ConfigUtils.getList(this.config.getProperties().get(PROP_DIMENSION_RECIPIENTS));
+    this.detectionConfigIds = ConfigUtils.getLongs(this.config.getProperties().get(PROP_DETECTION_CONFIG_IDS));
+    this.sendOnce = MapUtils.getBoolean(this.config.getProperties(), PROP_SEND_ONCE, true);
+  }
+
+  @Override
+  public DetectionAlertFilterResult run(Map<Long, Long> vectorClocks, long highWaterMark) {
+    DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+    final long minId = getMinId(highWaterMark);
+
+    Set<MergedAnomalyResultDTO> anomalies = this.filter(this.makeVectorClocks(this.detectionConfigIds), minId);
+
+    for (Map<String, Object> dimensionRecipient : this.dimensionRecipients) {
+      Map<String, String> dimensionFilters = ConfigUtils.getMap(dimensionRecipient.get(PROP_DIMENSION));
+      Set<MergedAnomalyResultDTO> notifyAnomalies = new HashSet<>();
+      for (MergedAnomalyResultDTO anomaly : anomalies) {
+        Multimap<String, String> anamolousDims = MetricEntity.fromURN(anomaly.getMetricUrn()).getFilters();
+        if (anamolousDims.entries().containsAll(dimensionFilters.entrySet())) {
+          notifyAnomalies.add(anomaly);
+        }
+      }
+
+      if (!notifyAnomalies.isEmpty()) {
+        result.addMapping(new DetectionAlertFilterNotification(ConfigUtils.getMap(dimensionRecipient.get(PROP_NOTIFY))),
+            notifyAnomalies);
+      }
+    }
+
+    Set<MergedAnomalyResultDTO> notifiedAnomalies = new HashSet<>(result.getAllAnomalies());
+    Set<MergedAnomalyResultDTO> defaultAnomalies = new HashSet<>();
+    for (MergedAnomalyResultDTO anomaly : anomalies) {
+      if (!notifiedAnomalies.contains(anomaly)) {
+        defaultAnomalies.add(anomaly);
+      }
+    }
+    if (!defaultAnomalies.isEmpty()) {
+      result.addMapping(new DetectionAlertFilterNotification(defaultNotificationSchemeProps), defaultAnomalies);
+    }
+
+    return result;
+  }
+
+  private long getMinId(long highWaterMark) {
+    if (this.sendOnce) {
+      return highWaterMark + 1;
+    } else {
+      return 0;
+    }
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
index 5682bc3..b6dbbf0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
@@ -202,14 +202,16 @@ public class DetectionEmailAlerter extends DetectionAlertScheme {
         anomalyResultListOfGroup.sort(COMPARATOR_DESC);
 
         if (emailClientConfigs.get(PROP_RECIPIENTS) != null) {
-          Map<String, Set<String>> emailRecipients = (Map<String, Set<String>>) emailClientConfigs.get(PROP_RECIPIENTS);
+          Map<String, List<String>> emailRecipients = (Map<String, List<String>>) emailClientConfigs.get(PROP_RECIPIENTS);
           if (emailRecipients.get(PROP_TO) == null || emailRecipients.get(PROP_TO).isEmpty()) {
             LOG.warn("Skipping! No email recipients found for alert {}.", this.adContext.getNotificationConfig().getId());
             return;
           }
 
-          DetectionAlertFilterRecipients recipients = new DetectionAlertFilterRecipients(emailRecipients.get(PROP_TO),
-              emailRecipients.get(PROP_CC), emailRecipients.get(PROP_BCC));
+          DetectionAlertFilterRecipients recipients = new DetectionAlertFilterRecipients(
+              new HashSet<>(emailRecipients.get(PROP_TO)),
+              new HashSet<>(emailRecipients.get(PROP_CC)),
+              new HashSet<>(emailRecipients.get(PROP_BCC)));
           sendEmail(emailClientConfigs, anomalyResultListOfGroup, recipients);
         }
       } catch (IllegalArgumentException e) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionJiraAlerter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionJiraAlerter.java
index 99a8b44..efd2ea0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionJiraAlerter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/scheme/DetectionJiraAlerter.java
@@ -112,7 +112,7 @@ public class DetectionJiraAlerter extends DetectionAlertScheme {
             .getJiraEntity(anomalyResultListOfGroup);
 
         String issueKey = createIssue(jiraIssueInput);
-        LOG.info("Jira created/updated with issue key {}", issueKey);
+        LOG.info("Jira ticket created with issue key {} : anomalies reported = {}", issueKey, result.getValue().size());
       } catch (IllegalArgumentException e) {
         LOG.warn("Skipping! Found illegal arguments while sending {} anomalies for alert {}."
             + " Exception message: ", result.getValue().size(), this.adContext.getNotificationConfig().getId(), e);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/SubscriptionConfigValidator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/SubscriptionConfigValidator.java
index 11be01e..ea0bea0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/SubscriptionConfigValidator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/SubscriptionConfigValidator.java
@@ -68,12 +68,6 @@ public class SubscriptionConfigValidator implements ConfigValidator<DetectionAle
     Preconditions.checkArgument((alertConfig.getAlertSchemes() != null && !alertConfig.getAlertSchemes().isEmpty()),
         "Alert scheme cannot be left empty");
 
-    // At least one recipient must be specified
-    Map<String, Object> recipients = ConfigUtils.getMap(alertConfig.getProperties().get(PROP_RECIPIENTS));
-    Preconditions.checkArgument((!recipients.isEmpty() && !ConfigUtils.getList(recipients.get("to")).isEmpty()),
-        "Please specify at least one recipient in the notification group. If you wish to unsubscribe, set"
-            + " active to false.");
-
     // Properties cannot be empty
     Preconditions.checkArgument((alertConfig.getProperties() != null && !alertConfig.getProperties().isEmpty()),
         "Alert properties cannot be left empty. Please specify the recipients, subscribed detections, and"
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
index 76f069e..2ffdb70 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
@@ -562,7 +562,7 @@ public class YamlResource {
     return Response.ok().entity(responseMessage).build();
   }
 
-  long createSubscriptionGroup(String yamlConfig) throws IllegalArgumentException {
+  public long createSubscriptionGroup(String yamlConfig) throws IllegalArgumentException {
     Preconditions.checkArgument(StringUtils.isNotBlank(yamlConfig),
         "The Yaml Payload in the request is empty.");
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/util/EntityUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/util/EntityUtils.java
index 44c72e9..befb1c6 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/util/EntityUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/util/EntityUtils.java
@@ -511,7 +511,7 @@ public class EntityUtils {
    * @return ParsedUrn
    */
   public static ParsedUrn parseUrnString(String urn, EntityType type, int filterOffset) {
-    if (!type.isType(urn)) {
+    if (urn == null || !type.isType(urn)) {
       throw new IllegalArgumentException(String.format("Expected type '%s' but got '%s'", type.getPrefix(), urn));
     }
     return parseUrnString(urn, filterOffset);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
index 0e3999a..745ba95 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
@@ -19,12 +19,15 @@
 
 package org.apache.pinot.thirdeye.detection;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import java.util.HashMap;
 import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 
 
 public class DetectionTestUtils {
@@ -39,6 +42,12 @@ public class DetectionTestUtils {
     anomaly.setCollection(dataset);
     anomaly.setFunctionId(legacyFunctionId);
 
+    Multimap<String, String> filters = HashMultimap.create();
+    for (Map.Entry<String, String> dimension : dimensions.entrySet()) {
+      filters.put(dimension.getKey(), dimension.getValue());
+    }
+    anomaly.setMetricUrn(MetricEntity.fromMetric(1.0, 1l, filters).getUrn());
+
     DimensionMap dimMap = new DimensionMap();
     dimMap.putAll(dimensions);
     anomaly.setDimensions(dimMap);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionsDetectionAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionsDetectionAlertFilterTest.java
new file mode 100644
index 0000000..102e1bf
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/DimensionsDetectionAlertFilterTest.java
@@ -0,0 +1,274 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.alert.filter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
+import org.apache.pinot.thirdeye.dashboard.DetectionPreviewConfiguration;
+import org.apache.pinot.thirdeye.datalayer.bao.ApplicationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.ApplicationDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterNotification;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionAlertRegistry;
+import org.apache.pinot.thirdeye.detection.yaml.YamlResource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.thirdeye.detection.DetectionTestUtils.*;
+import static org.apache.pinot.thirdeye.detection.alert.filter.DimensionsRecipientAlertFilter.*;
+
+
+public class DimensionsDetectionAlertFilterTest {
+
+  private static final String PROP_RECIPIENTS = "recipients";
+  private static final String PROP_TO = "to";
+  private static final String PROP_CC = "cc";
+  private static final String PROP_BCC = "bcc";
+  private static final Set<String> PROP_TO_VALUE = new HashSet<>(Arrays.asList("test@example.com", "test@example.org"));
+  private static final Set<String> PROP_CC_VALUE = new HashSet<>(Arrays.asList("cctest@example.com", "cctest@example.org"));
+  private static final Set<String> PROP_BCC_VALUE = new HashSet<>(Arrays.asList("bcctest@example.com", "bcctest@example.org"));
+  private static final Set<String> PROP_TO_FOR_VALUE = new HashSet<>(Arrays.asList("myTest@example.com", "myTest@example.org"));
+  private static final Set<String> PROP_TO_FOR_ANOTHER_VALUE = Collections.singleton("myTest@example.net");
+  //private static final List<Long> PROP_ID_VALUE = Arrays.asList(1001l, 1002l);
+  private static final List<Long> PROP_ID_VALUE = new ArrayList<>();
+  private static final List<Map<String, Object>> PROP_DIMENSION_RECIPIENTS_VALUE = new ArrayList<>();
+
+  private DetectionAlertFilter alertFilter;
+  private List<MergedAnomalyResultDTO> detectedAnomalies;
+  private YamlResource yamlResource;
+  private MockDataProvider provider;
+  private DetectionAlertConfigDTO alertConfig;
+  private DAOTestBase testDAOProvider;
+
+  private long detectionId1;
+  private long detectionId2;
+  private long detectionId3;
+
+  @BeforeMethod
+  public void beforeMethod() throws IOException {
+    testDAOProvider = DAOTestBase.getInstance();
+
+    DetectionAlertRegistry.getInstance().registerAlertFilter("DIMENSIONS_ALERTER_PIPELINE",
+        DimensionsRecipientAlertFilter.class.getName());
+
+    ApplicationManager appDAO = DAORegistry.getInstance().getApplicationDAO();
+    ApplicationDTO app = new ApplicationDTO();
+    app.setApplication("test_application");
+    app.setRecipients("test@thirdeye.com");
+    appDAO.save(app);
+
+    DetectionConfigManager detDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    DetectionConfigDTO detection1 = new DetectionConfigDTO();
+    detection1.setName("test_detection_1");
+    this.detectionId1 = detDAO.save(detection1);
+
+    DetectionConfigDTO detection2 = new DetectionConfigDTO();
+    detection2.setName("test_detection_2");
+    this.detectionId2 = detDAO.save(detection2);
+
+    DetectionConfigDTO detection3 = new DetectionConfigDTO();
+    detection3.setName("test_detection_3");
+    this.detectionId3 = detDAO.save(detection3);
+
+    PROP_ID_VALUE.add(detectionId1);
+    PROP_ID_VALUE.add(detectionId2);
+
+    Map<String, Object> dimensionRecipient1 = new HashMap<>();
+    Map<String, Object> dimensionKeys1 = new HashMap<>();
+    dimensionKeys1.put("key", "value");
+    dimensionRecipient1.put(PROP_DIMENSION, dimensionKeys1);
+    Map<String, Object> notificationValues1 = new HashMap<>();
+    Map<String, Object> notificationEmailParams1 = new HashMap<>();
+    Map<String, Object> recipients1 = new HashMap<>();
+    recipients1.put("to", PROP_TO_FOR_VALUE);
+    recipients1.put("cc", PROP_CC_VALUE);
+    recipients1.put("bcc", PROP_BCC_VALUE);
+    notificationEmailParams1.put(PROP_RECIPIENTS, recipients1);
+    notificationValues1.put("emailScheme", notificationEmailParams1);
+    dimensionRecipient1.put(PROP_NOTIFY, notificationValues1);
+
+    Map<String, Object> dimensionRecipient2 = new HashMap<>();
+    Map<String, Object> dimensionKeys2 = new HashMap<>();
+    dimensionKeys2.put("key1", "anotherValue1");
+    dimensionKeys2.put("key2", "anotherValue2");
+    dimensionRecipient2.put(PROP_DIMENSION, dimensionKeys2);
+    Map<String, Object> notificationValues2 = new HashMap<>();
+    Map<String, Object> notificationEmailParams2 = new HashMap<>();
+    Map<String, Object> recipients2 = new HashMap<>();
+    recipients2.put("to", PROP_TO_FOR_ANOTHER_VALUE);
+    recipients2.put("cc", PROP_CC_VALUE);
+    recipients2.put("bcc", PROP_BCC_VALUE);
+    notificationEmailParams2.put(PROP_RECIPIENTS, recipients2);
+    notificationValues2.put("emailScheme", notificationEmailParams2);
+    dimensionRecipient2.put(PROP_NOTIFY, notificationValues2);
+
+    PROP_DIMENSION_RECIPIENTS_VALUE.add(dimensionRecipient1);
+    PROP_DIMENSION_RECIPIENTS_VALUE.add(dimensionRecipient2);
+
+    Map<String, String> anomalousDimensions = new HashMap<>();
+    anomalousDimensions.put("key1", "anotherValue1");
+    anomalousDimensions.put("key2", "anotherValue2");
+    anomalousDimensions.put("key3", "anotherValue3");
+
+    this.detectedAnomalies = new ArrayList<>();
+    this.detectedAnomalies.add(makeAnomaly(detectionId1, 1500, 2000, Collections.<String, String>emptyMap()));
+    this.detectedAnomalies.add(makeAnomaly(detectionId1,0, 1000, Collections.singletonMap("key", "value")));
+    this.detectedAnomalies.add(makeAnomaly(detectionId1,0, 1100, anomalousDimensions));
+    this.detectedAnomalies.add(makeAnomaly(detectionId1,0, 1200, Collections.singletonMap("key", "unknownValue")));
+    this.detectedAnomalies.add(makeAnomaly(detectionId2,1100, 1500, Collections.singletonMap("unknownKey", "value")));
+    this.detectedAnomalies.add(makeAnomaly(detectionId2,1200, 1600, Collections.singletonMap("key", "value")));
+    this.detectedAnomalies.add(makeAnomaly(detectionId2,3333, 9999, Collections.singletonMap("key", "value")));
+    this.detectedAnomalies.add(makeAnomaly(detectionId3,1111, 9999, Collections.singletonMap("key", "value")));
+
+    this.provider = new MockDataProvider()
+        .setAnomalies(this.detectedAnomalies);
+
+    this.alertConfig = createDetectionAlertConfig();
+  }
+
+  private DetectionAlertConfigDTO createDetectionAlertConfig() {
+    DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+    Map<String, Set<String>> recipients = new HashMap<>();
+    recipients.put(PROP_TO, PROP_TO_VALUE);
+    recipients.put(PROP_CC, PROP_CC_VALUE);
+    recipients.put(PROP_BCC, PROP_BCC_VALUE);
+
+    Map<String, Object> emailScheme = new HashMap<>();
+    emailScheme.put(PROP_RECIPIENTS, recipients);
+    alertConfig.setAlertSchemes(Collections.singletonMap("emailScheme", emailScheme));
+
+    Map<String, Object> properties = new HashMap<>();
+
+    properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
+    properties.put(PROP_DIMENSION_RECIPIENTS, PROP_DIMENSION_RECIPIENTS_VALUE);
+    alertConfig.setProperties(properties);
+
+    Map<Long, Long> vectorClocks = new HashMap<>();
+    vectorClocks.put(PROP_ID_VALUE.get(0), 0L);
+    alertConfig.setVectorClocks(vectorClocks);
+
+    return alertConfig;
+  }
+
+  @Test
+  public void testAlertFilterRecipients() throws Exception {
+    this.alertFilter = new DimensionsRecipientAlertFilter(provider, alertConfig,2500L);
+
+    DetectionAlertFilterResult result = this.alertFilter.run();
+
+    // Send anomalies on un-configured dimensions to default recipients
+    // Anomaly 0, 3 and 4 do not fall into any of the dimensionRecipients bucket. Send them to default recipients
+    DetectionAlertFilterNotification recDefault = AlertFilterUtils.makeEmailNotifications();
+    Assert.assertEquals(result.getResult().get(recDefault), makeSet(0, 3, 4));
+
+    // Send anomalies who dimensions are configured to appropriate recipients
+    DetectionAlertFilterNotification recValue = AlertFilterUtils.makeEmailNotifications(PROP_TO_FOR_VALUE, PROP_CC_VALUE, PROP_BCC_VALUE);
+    Assert.assertEquals(result.getResult().get(recValue), makeSet(1, 5));
+
+    // Send alert when configured dimensions is a subset of anomaly dimensions
+    // Anomaly 2 occurs on 3 dimensions (key 1, 2 & 3), dimensionRecipients is configured on (key 1 & 2) - send alert
+    DetectionAlertFilterNotification recAnotherValue = AlertFilterUtils.makeEmailNotifications(PROP_TO_FOR_ANOTHER_VALUE, PROP_CC_VALUE, PROP_BCC_VALUE);
+    Assert.assertEquals(result.getResult().get(recAnotherValue), makeSet(2));
+  }
+
+  @Test
+  public void testAlertFilterNoChildren() throws Exception {
+    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(detectionId3));
+    this.alertFilter = new DimensionsRecipientAlertFilter(provider, alertConfig,2500L);
+
+    MergedAnomalyResultDTO child = makeAnomaly(detectionId3, 1234, 9999);
+    child.setChild(true);
+
+    this.detectedAnomalies.add(child);
+
+    DetectionAlertFilterNotification recValue = AlertFilterUtils.makeEmailNotifications(PROP_TO_FOR_VALUE, PROP_CC_VALUE, PROP_BCC_VALUE);
+
+    DetectionAlertFilterResult result = this.alertFilter.run();
+
+    Assert.assertEquals(result.getResult().size(), 1);
+    Assert.assertTrue(result.getResult().containsKey(recValue));
+  }
+
+  @Test
+  public void testAlertFilterFeedback() throws Exception {
+    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(detectionId3));
+    this.alertFilter = new DimensionsRecipientAlertFilter(provider, alertConfig,2500L);
+
+    AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();
+    feedbackAnomaly.setFeedbackType(AnomalyFeedbackType.ANOMALY);
+
+    AnomalyFeedbackDTO feedbackNoFeedback = new AnomalyFeedbackDTO();
+    feedbackNoFeedback.setFeedbackType(AnomalyFeedbackType.NO_FEEDBACK);
+
+    MergedAnomalyResultDTO anomalyWithFeedback = makeAnomaly(detectionId3, 1234, 9999);
+    anomalyWithFeedback.setFeedback(feedbackAnomaly);
+
+    MergedAnomalyResultDTO anomalyWithoutFeedback = makeAnomaly(detectionId3, 1235, 9999);
+    anomalyWithoutFeedback.setFeedback(feedbackNoFeedback);
+
+    MergedAnomalyResultDTO anomalyWithNull = makeAnomaly(detectionId3, 1236, 9999);
+    anomalyWithNull.setFeedback(null);
+
+    this.detectedAnomalies.add(anomalyWithFeedback);
+    this.detectedAnomalies.add(anomalyWithoutFeedback);
+    this.detectedAnomalies.add(anomalyWithNull);
+
+    DetectionAlertFilterResult result = this.alertFilter.run();
+    Assert.assertEquals(result.getResult().size(), 2);
+
+    DetectionAlertFilterNotification recDefault = AlertFilterUtils.makeEmailNotifications(PROP_TO_VALUE, PROP_CC_VALUE, PROP_BCC_VALUE);
+    Assert.assertTrue(result.getResult().containsKey(recDefault));
+    Assert.assertEquals(result.getResult().get(recDefault).size(), 2);
+    Assert.assertTrue(result.getResult().get(recDefault).contains(anomalyWithoutFeedback));
+    Assert.assertTrue(result.getResult().get(recDefault).contains(anomalyWithNull));
+
+    DetectionAlertFilterNotification recValue = AlertFilterUtils.makeEmailNotifications(PROP_TO_FOR_VALUE, PROP_CC_VALUE, PROP_BCC_VALUE);
+    Assert.assertTrue(result.getResult().containsKey(recValue));
+    Assert.assertEquals(result.getResult().get(recValue).size(), 1);
+  }
+
+  private Set<MergedAnomalyResultDTO> makeSet(int... anomalyIndices) {
+    Set<MergedAnomalyResultDTO> output = new HashSet<>();
+    for (int anomalyIndex : anomalyIndices) {
+      output.add(this.detectedAnomalies.get(anomalyIndex));
+    }
+    return output;
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org