You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2020/09/02 17:08:28 UTC

[incubator-pinot] 02/03: severity alerter & unit tests

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch severity-alerter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 45d34f050ac628588634e17102c598b7588fd723
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Tue Sep 1 13:54:04 2020 -0700

    severity alerter & unit tests
---
 .../anomaly/monitor/MonitorTaskRunner.java         |  10 +
 .../pinot/thirdeye/constant/AnomalySeverity.java   |  35 ++++
 .../datalayer/dto/MergedAnomalyResultDTO.java      |  10 +
 .../datalayer/pojo/MergedAnomalyResultBean.java    |  10 +
 .../detection/DetectionPipelineTaskRunner.java     |  30 +++
 .../thirdeye/detection/DetectionResource.java      |  22 +-
 .../detection/alert/DetectionAlertJob.java         |  14 +-
 .../alert/filter/AnomalySeverityAlertFilter.java   | 163 +++++++++++++++
 .../translator/SubscriptionConfigTranslator.java   |   3 +-
 .../detection/alert/filter/AlertFilterUtils.java   |   9 +-
 .../filter/AnomalySeverityAlertFilterTest.java     | 232 +++++++++++++++++++++
 11 files changed, 526 insertions(+), 12 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
index 763e75b..33fafee 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
@@ -281,6 +281,16 @@ public class MonitorTaskRunner implements TaskRunner {
     } catch (Exception e) {
       LOG.error("Exception when deleting old evaluations.", e);
     }
+
+    // Delete old anomaly subscription notifications.
+    try {
+      int deletedRecords = DAO_REGISTRY.getAnomalySubscriptionGroupNotificationManager()
+          .deleteRecordsOlderThanDays(monitorTaskInfo.getDefaultRetentionDays());
+      LOG.info("Deleted {} anomaly subscription notifications that are older than {} days.", deletedRecords,
+          monitorTaskInfo.getDefaultRetentionDays());
+    } catch (Exception e) {
+      LOG.error("Exception when deleting old anomaly subscription notifications.", e);
+    }
   }
 
   private Map<Long, JobDTO> findScheduledJobsWithinDays(int days) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java
new file mode 100644
index 0000000..48e542e
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.pinot.thirdeye.constant;
+
+public enum AnomalySeverity {
+  LOW("low"), MODERATE("moderate"), HIGH("high"), CRITICAL("critical"), DEFAULT("default");
+
+  String userReadableName;
+
+  AnomalySeverity(String userReadableName) {
+    this.userReadableName = userReadableName;
+  }
+
+  public String getUserReadableName() {
+    return this.userReadableName;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java
index 415a252..620b0de 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java
@@ -44,6 +44,8 @@ public class MergedAnomalyResultDTO extends MergedAnomalyResultBean implements A
 
   private Set<MergedAnomalyResultDTO> children = new HashSet<>();
 
+  private boolean renotify;
+
   public MergedAnomalyResultDTO() {
     setCreatedTime(System.currentTimeMillis());
   }
@@ -94,6 +96,14 @@ public class MergedAnomalyResultDTO extends MergedAnomalyResultBean implements A
     return function;
   }
 
+  public boolean shouldRenotify() {
+    return renotify;
+  }
+
+  public void setRenotify(boolean renotify) {
+    this.renotify = renotify;
+  }
+
   @Deprecated
   public void setFunction(AnomalyFunctionDTO function) {
     this.function = function;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
index 834356d..e29de67 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.commons.lang3.ObjectUtils;
+import org.apache.pinot.thirdeye.constant.AnomalySeverity;
 
 
 public class MergedAnomalyResultBean extends AbstractBean implements Comparable<MergedAnomalyResultBean>, Serializable {
@@ -60,6 +61,7 @@ public class MergedAnomalyResultBean extends AbstractBean implements Comparable<
   private Set<Long> childIds; // ids of the anomalies this anomaly merged from
   private boolean isChild;
   private AnomalyType type;
+  private AnomalySeverity severity = AnomalySeverity.DEFAULT;
 
   public Set<Long> getChildIds() {
     return childIds;
@@ -264,6 +266,14 @@ public class MergedAnomalyResultBean extends AbstractBean implements Comparable<
     this.type = type;
   }
 
+  public AnomalySeverity getSeverity() {
+    return severity;
+  }
+
+  public void setSeverity(AnomalySeverity severity) {
+    this.severity = severity;
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(getId(), startTime, endTime, collection, metric, dimensions, score, impactToGlobal, avgBaselineVal, avgCurrentVal, anomalyResultSource, metricUrn, detectionConfigId, childIds, isChild);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java
index 86126c7..183be6f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java
@@ -27,6 +27,7 @@ import org.apache.pinot.thirdeye.anomaly.task.TaskInfo;
 import org.apache.pinot.thirdeye.anomaly.task.TaskResult;
 import org.apache.pinot.thirdeye.anomaly.task.TaskRunner;
 import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
@@ -34,9 +35,11 @@ import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
 import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
 import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.EvaluationDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
 import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
 import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
@@ -57,6 +60,7 @@ public class DetectionPipelineTaskRunner implements TaskRunner {
   private final MergedAnomalyResultManager anomalyDAO;
   private final EvaluationManager evaluationDAO;
   private final TaskManager taskDAO;
+  private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO;
   private final DetectionPipelineLoader loader;
   private final DataProvider provider;
   private final ModelMaintenanceFlow maintenanceFlow;
@@ -76,6 +80,8 @@ public class DetectionPipelineTaskRunner implements TaskRunner {
     this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
     this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
     this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.anomalySubscriptionGroupNotificationDAO =
+        DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
     MetricConfigManager metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
     DatasetConfigManager datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
     EventManager eventDAO = DAORegistry.getInstance().getEventDAO();
@@ -112,6 +118,8 @@ public class DetectionPipelineTaskRunner implements TaskRunner {
     this.loader = loader;
     this.provider = provider;
     this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.anomalySubscriptionGroupNotificationDAO =
+        DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
     this.maintenanceFlow = new ModelRetuneFlow(this.provider, DetectionRegistry.getInstance());
   }
 
@@ -167,6 +175,28 @@ public class DetectionPipelineTaskRunner implements TaskRunner {
       }
       this.detectionDAO.update(config);
 
+      // re-notify the anomalies if any
+      for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+        // if an anomaly should be re-notified, update the notification lookup table in the database
+        if (anomaly.shouldRenotify()) {
+          List<AnomalySubscriptionGroupNotificationDTO> subscriptionGroupNotificationDTOs =
+              this.anomalySubscriptionGroupNotificationDAO.findByPredicate(Predicate.EQ("anomalyId", anomaly.getId()));
+          AnomalySubscriptionGroupNotificationDTO anomalyNotificationDTO;
+          if (subscriptionGroupNotificationDTOs.isEmpty()) {
+            // create a new record if it is not existed yet.
+            anomalyNotificationDTO = new AnomalySubscriptionGroupNotificationDTO();
+            new AnomalySubscriptionGroupNotificationDTO();
+            anomalyNotificationDTO.setAnomalyId(anomaly.getId());
+            anomalyNotificationDTO.setDetectionConfigId(anomaly.getDetectionConfigId());
+          } else {
+            // update the existing record if the anomaly needs to be re-notified
+            anomalyNotificationDTO = subscriptionGroupNotificationDTOs.get(0);
+            anomalyNotificationDTO.setNotifiedSubscriptionGroupIds(Collections.emptyList());
+          }
+          this.anomalySubscriptionGroupNotificationDAO.save(anomalyNotificationDTO);
+        }
+      }
+
       ThirdeyeMetricsUtil.detectionTaskSuccessCounter.inc();
       LOG.info("End detection for config {} between {} and {}. Detected {} anomalies.", config.getId(), info.start,
           info.end, result.getAnomalies());
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
index c0fdb51..f94f700 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
@@ -123,6 +123,7 @@ public class DetectionResource {
   private final DetectionAlertConfigFormatter subscriptionConfigFormatter;
   private final AggregationLoader aggregationLoader;
   private final DetectionConfigurationResource detectionConfigurationResource;
+  private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationManager;
 
   @Inject
   public DetectionResource(
@@ -137,6 +138,7 @@ public class DetectionResource {
     this.detectionAlertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager();
     this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
     this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.anomalySubscriptionGroupNotificationManager = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
 
     TimeSeriesLoader timeseriesLoader =
         new DefaultTimeSeriesLoader(metricDAO, datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache(), ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
@@ -573,14 +575,18 @@ public class DetectionResource {
     return Response.ok(health).build();
   }
 
-  @GET
-  @Path(value = "/alert")
-  public Response alert() {
-    AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationManager = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
-    AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotificationDTO = new AnomalySubscriptionGroupNotificationDTO();
-    anomalySubscriptionGroupNotificationDTO.setAnomalyId(1L);
-    anomalySubscriptionGroupNotificationDTO.setDetectionConfigId(2L);
-    anomalySubscriptionGroupNotificationManager.save(anomalySubscriptionGroupNotificationDTO);
+  @POST
+  @Path(value = "/re-notify")
+  @ApiOperation("Resend the notification for the anomalies to the subscribed notification groups, if the subscription group supports re-notify")
+  public Response alert(@QueryParam("id") List<Long> anomalyIds) {
+    List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByIds(anomalyIds);
+    for (MergedAnomalyResultDTO anomaly : anomalies) {
+      AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotificationDTO =
+          new AnomalySubscriptionGroupNotificationDTO();
+      anomalySubscriptionGroupNotificationDTO.setAnomalyId(anomaly.getId());
+      anomalySubscriptionGroupNotificationDTO.setDetectionConfigId(anomaly.getDetectionConfigId());
+      anomalySubscriptionGroupNotificationManager.save(anomalySubscriptionGroupNotificationDTO);
+    }
     return Response.ok().build();
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index 70b318c..4026869 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -24,9 +24,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
@@ -48,12 +50,14 @@ public class DetectionAlertJob implements Job {
   private DetectionAlertConfigManager alertConfigDAO;
   private TaskManager taskDAO;
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-  private MergedAnomalyResultManager anomalyDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO;
 
   public DetectionAlertJob() {
     this.alertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager();
     this.taskDAO = DAORegistry.getInstance().getTaskDAO();
     this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.anomalySubscriptionGroupNotificationDAO = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
   }
 
   @Override
@@ -110,6 +114,8 @@ public class DetectionAlertJob implements Job {
    * For example, if previous anomaly is from t1 to t2 generated at t3, then the timestamp in vectorLock is t3.
    * If there is a new anomaly from t2 to t4 generated at t5, then we can still get this anomaly as t5 > t3.
    *
+   * Also, check if there is any anomaly that needs re-notifying
+   *
    * @param configDTO The Subscription Configuration.
    * @return true if it needs notification task. false otherwise.
    */
@@ -123,6 +129,10 @@ public class DetectionAlertJob implements Job {
         return true;
       }
     }
-    return false;
+    // in addition to checking the watermarks, check if any anomalies need to be re-notified by querying the anomaly subscription group notification table
+    List<AnomalySubscriptionGroupNotificationDTO> anomalySubscriptionGroupNotifications =
+        this.anomalySubscriptionGroupNotificationDAO.findByPredicate(
+            Predicate.IN("detectionConfigId", vectorLocks.keySet().toArray()));
+    return !anomalySubscriptionGroupNotifications.isEmpty();
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java
new file mode 100644
index 0000000..1042ac7
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.alert.filter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.constant.AnomalySeverity;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterNotification;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
+import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.annotation.AlertFilter;
+
+
+/**
+ * The detection alert filter that can send notifications through multiple channels
+ * to a set of unconditional and another set of conditional recipients, based on the
+ * value of a specified anomaly severities.
+ *
+ * You can configure anomaly severities along with a variety of alerting
+ * channels and reference links.
+ *
+ * This alert pipeline have the capability of re-notify anomalies if the anomaly's severity is
+ * changed after it's created.
+ *
+ * <pre>
+ * severityRecipients:
+ *   - severity:
+ *       - LOW
+ *     notify:
+ *       jiraScheme:
+ *         project: PROJECT
+ *         assignee: oncall
+ *       emailScheme:
+ *         recipients:
+ *           to:
+ *           - "oncall@comany.com"
+ *   - severity:
+ *       - HIGH
+ *       - CRITICAL
+ *     notify:
+ *       jiraScheme:
+ *         project: PROJECT
+ *         assignee: manager
+ * </pre>
+ */
+@AlertFilter(type = "SEVERITY_ALERTER_PIPELINE")
+public class AnomalySeverityAlertFilter extends StatefulDetectionAlertFilter {
+  public static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
+  public static final String PROP_SEVERITY = "severity";
+  public static final String PROP_NOTIFY = "notify";
+  public static final String PROP_REF_LINKS = "referenceLinks";
+  public static final String PROP_SEVERITY_RECIPIENTS = "severityRecipients";
+
+  private final List<Map<String, Object>> severityRecipients;
+  private final List<Long> detectionConfigIds;
+
+  private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO;
+
+
+  public AnomalySeverityAlertFilter(DataProvider provider, DetectionAlertConfigDTO config, long endTime) {
+    super(provider, config, endTime);
+    this.severityRecipients = ConfigUtils.getList(this.config.getProperties().get(PROP_SEVERITY_RECIPIENTS));
+    this.detectionConfigIds = ConfigUtils.getLongs(this.config.getProperties().get(PROP_DETECTION_CONFIG_IDS));
+    this.anomalySubscriptionGroupNotificationDAO =
+        DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
+  }
+
+  @Override
+  public DetectionAlertFilterResult run() {
+    DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+
+    // retrieve the anomalies based on vector clocks
+    Set<MergedAnomalyResultDTO> anomalies = this.filter(this.makeVectorClocks(this.detectionConfigIds));
+    // find the anomalies that needs re-notifying.
+    anomalies.addAll(this.retrieveRenotifyAnomalies(this.detectionConfigIds));
+    // Prepare mapping from severity-recipients to anomalies
+    for (Map<String, Object> severityRecipient : this.severityRecipients) {
+      List<AnomalySeverity> severities = ConfigUtils.getList(severityRecipient.get(PROP_SEVERITY))
+          .stream()
+          .map(s -> AnomalySeverity.valueOf((String) s))
+          .collect(Collectors.toList());
+      Set<MergedAnomalyResultDTO> notifyAnomalies = new HashSet<>();
+      for (MergedAnomalyResultDTO anomaly : anomalies) {
+        if (severities.contains(anomaly.getSeverity())) {
+          notifyAnomalies.add(anomaly);
+        }
+      }
+
+      if (!notifyAnomalies.isEmpty()) {
+        DetectionAlertConfigDTO subsConfig = SubscriptionUtils.makeChildSubscriptionConfig(config,
+            ConfigUtils.getMap(severityRecipient.get(PROP_NOTIFY)),
+            ConfigUtils.getMap(severityRecipient.get(PROP_REF_LINKS)));
+        result.addMapping(new DetectionAlertFilterNotification(subsConfig), notifyAnomalies);
+      }
+    }
+
+    // Notify the remaining anomalies to default recipients
+    Set<MergedAnomalyResultDTO> allNotifiedAnomalies = new HashSet<>(result.getAllAnomalies());
+    Set<MergedAnomalyResultDTO> defaultAnomalies = new HashSet<>();
+    for (MergedAnomalyResultDTO anomaly : anomalies) {
+      if (!allNotifiedAnomalies.contains(anomaly)) {
+        defaultAnomalies.add(anomaly);
+      }
+    }
+    if (!defaultAnomalies.isEmpty()) {
+      result.addMapping(new DetectionAlertFilterNotification(config), defaultAnomalies);
+    }
+
+    return result;
+  }
+
+
+  protected Collection<MergedAnomalyResultDTO> retrieveRenotifyAnomalies(Collection<Long> detectionConfigIds) {
+    // find if any notification is needed
+    List<AnomalySubscriptionGroupNotificationDTO> anomalySubscriptionGroupNotificationDTOs =
+        this.anomalySubscriptionGroupNotificationDAO.findByPredicate(
+            Predicate.IN("detectionConfigId", detectionConfigIds.toArray()));
+
+    List<Long> anomalyIds = new ArrayList<>();
+    for (AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotification : anomalySubscriptionGroupNotificationDTOs) {
+      // notify the anomalies if this subscription group have not sent out this anomaly yet
+      if (!anomalySubscriptionGroupNotification.getNotifiedSubscriptionGroupIds().contains(this.config.getId())) {
+        anomalyIds.add(anomalySubscriptionGroupNotification.getAnomalyId());
+        // add this subscription group to the notification record and update
+        anomalySubscriptionGroupNotification.getNotifiedSubscriptionGroupIds().add(this.config.getId());
+        this.anomalySubscriptionGroupNotificationDAO.save(anomalySubscriptionGroupNotification);
+      }
+    }
+    return anomalyIds.isEmpty() ? Collections.emptyList()
+        : DAORegistry.getInstance().getMergedAnomalyResultDAO().findByIds(anomalyIds);
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java
index 9d4044f..a3fec01 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java
@@ -68,10 +68,11 @@ public class SubscriptionConfigTranslator extends ConfigTranslator<DetectionAler
 
   private static final String PROP_DIMENSION = "dimension";
   private static final String PROP_DIMENSION_RECIPIENTS = "dimensionRecipients";
+  private static final String PROP_SEVERITY_RECIPIENTS = "severityRecipients";
 
   private static final DetectionAlertRegistry DETECTION_ALERT_REGISTRY = DetectionAlertRegistry.getInstance();
   private static final Set<String> PROPERTY_KEYS = new HashSet<>(
-      Arrays.asList(PROP_RECIPIENTS, PROP_DIMENSION, PROP_DIMENSION_RECIPIENTS));
+      Arrays.asList(PROP_RECIPIENTS, PROP_DIMENSION, PROP_DIMENSION_RECIPIENTS, PROP_SEVERITY_RECIPIENTS));
   private final DetectionConfigManager detectionConfigDAO;
 
   public SubscriptionConfigTranslator(DetectionConfigManager detectionConfigDAO, String yamlConfig) {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java
index a1091cf..72f64db 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.thirdeye.anomaly.AnomalyType;
 import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
+import org.apache.pinot.thirdeye.constant.AnomalySeverity;
 import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -89,9 +90,14 @@ public class AlertFilterUtils {
 
     return new DetectionAlertFilterNotification(subsConfig);
   }
-
   static MergedAnomalyResultDTO makeAnomaly(Long configId, long baseTime, long start, long end,
       Map<String, String> dimensions, AnomalyFeedbackDTO feedback) {
+    return makeAnomaly(configId, baseTime, start, end, dimensions, feedback, AnomalySeverity.DEFAULT);
+  }
+
+
+  static MergedAnomalyResultDTO makeAnomaly(Long configId, long baseTime, long start, long end,
+      Map<String, String> dimensions, AnomalyFeedbackDTO feedback, AnomalySeverity severity) {
     MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(configId, baseTime + start, baseTime + end);
     anomaly.setType(AnomalyType.DEVIATION);
     anomaly.setChildIds(Collections.emptySet());
@@ -108,6 +114,7 @@ public class AlertFilterUtils {
 
     anomaly.setCreatedBy("no-auth-user");
     anomaly.setUpdatedBy("no-auth-user");
+    anomaly.setSeverity(severity);
     anomaly.setId(DAORegistry.getInstance().getMergedAnomalyResultDAO().save(anomaly));
 
     if (feedback != null) {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java
new file mode 100644
index 0000000..38f9189
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.pinot.thirdeye.detection.alert.filter;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.thirdeye.constant.AnomalySeverity;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterNotification;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.thirdeye.detection.alert.filter.AlertFilterUtils.*;
+
+
+public class AnomalySeverityAlertFilterTest {
+  private static final String PROP_RECIPIENTS = "recipients";
+  private static final String PROP_EMAIL_SCHEME = "emailScheme";
+  private static final String PROP_TO = "to";
+  private static final String PROP_CC = "cc";
+  private static final String PROP_BCC = "bcc";
+  private static final Set<String> PROP_TO_FOR_VALUE =
+      new HashSet<>(Arrays.asList("myTest@example.com", "myTest@example.org"));
+  private static final Set<String> PROP_TO_FOR_ANOTHER_VALUE =
+      new HashSet<>(Arrays.asList("myTest@example.net", "myTest@example.com"));
+  private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
+  private static final String PROP_SEVERITY_TO = "severityRecipients";
+  private static final List<Object> severityProperty = new ArrayList<>();
+
+  private DetectionAlertFilter alertFilter;
+
+  private DAOTestBase testDAOProvider;
+  private DetectionAlertConfigDTO alertConfig;
+  private List<MergedAnomalyResultDTO> detectionAnomalies;
+  private long baseTime;
+  private List<Long> detectionConfigIds;
+  private MergedAnomalyResultDTO renotifyAnomaly;
+  private final MockDataProvider provider = new MockDataProvider();
+  private final Map<String, Object> notify1 = new HashMap<>();
+  private final Map<String, Object> notify2 = new HashMap<>();
+  private final Map<String, Object> defaultScheme = new HashMap<>();
+
+  @BeforeMethod
+  public void beforeMethod() throws InterruptedException {
+    testDAOProvider = DAOTestBase.getInstance();
+
+    DetectionConfigDTO detectionConfig1 = new DetectionConfigDTO();
+    detectionConfig1.setName("test detection 1");
+    detectionConfig1.setActive(true);
+    long detectionConfigId1 = DAORegistry.getInstance().getDetectionConfigManager().save(detectionConfig1);
+
+    DetectionConfigDTO detectionConfig2 = new DetectionConfigDTO();
+    detectionConfig2.setName("test detection 2");
+    detectionConfig2.setActive(true);
+    long detectionConfigId2 = DAORegistry.getInstance().getDetectionConfigManager().save(detectionConfig2);
+
+    detectionConfigIds = Arrays.asList(detectionConfigId1, detectionConfigId2);
+
+    // Anomaly notification is tracked through create time. Start and end time doesn't matter here.
+    this.detectionAnomalies = new ArrayList<>();
+    renotifyAnomaly =
+        makeAnomaly(detectionConfigId1, System.currentTimeMillis(), 0, 50, Collections.singletonMap("key", "value"),
+            null, AnomalySeverity.LOW);
+    Thread.sleep(100);
+    this.baseTime = System.currentTimeMillis();
+    Thread.sleep(100);
+    this.detectionAnomalies.add(
+        makeAnomaly(detectionConfigId1, this.baseTime, 0, 100, Collections.singletonMap("key", "value"), null,
+            AnomalySeverity.LOW));
+    Thread.sleep(10);
+    this.detectionAnomalies.add(
+        makeAnomaly(detectionConfigId1, this.baseTime, 0, 110, Collections.singletonMap("key", "anotherValue"), null,
+            AnomalySeverity.MODERATE));
+    Thread.sleep(20);
+    this.detectionAnomalies.add(
+        makeAnomaly(detectionConfigId1, this.baseTime, 0, 120, Collections.singletonMap("key", "unknownValue"), null,
+            AnomalySeverity.HIGH));
+    Thread.sleep(30);
+    this.detectionAnomalies.add(
+        makeAnomaly(detectionConfigId2, this.baseTime, 110, 150, Collections.singletonMap("unknownKey", "value"),
+            null));
+    Thread.sleep(10);
+    this.detectionAnomalies.add(
+        makeAnomaly(detectionConfigId2, this.baseTime, 120, 160, Collections.singletonMap("key", "value"), null));
+    Thread.sleep(40);
+    this.detectionAnomalies.add(
+        makeAnomaly(detectionConfigId1, this.baseTime, 150, 200, Collections.<String, String>emptyMap(), null));
+    Thread.sleep(200);
+    this.detectionAnomalies.add(
+        makeAnomaly(detectionConfigId2, this.baseTime, 300, 400, Collections.singletonMap("key", "value"), null));
+    Thread.sleep(100);
+
+    this.alertConfig = createDetectionAlertConfig();
+  }
+
+  private DetectionAlertConfigDTO createDetectionAlertConfig() {
+    DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+    notify1.put("severity", Arrays.asList("LOW", "MODERATE"));
+    notify1.put("notify", ImmutableMap.of("emailScheme", ImmutableMap.of("recipients", PROP_TO_FOR_VALUE)));
+    notify2.put("severity", Collections.singleton("HIGH"));
+    notify2.put("notify", ImmutableMap.of("emailScheme", ImmutableMap.of("recipients", PROP_TO_FOR_ANOTHER_VALUE)));
+    severityProperty.add(notify1);
+    severityProperty.add(notify2);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(PROP_DETECTION_CONFIG_IDS, detectionConfigIds);
+    properties.put(PROP_SEVERITY_TO, severityProperty);
+    alertConfig.setProperties(properties);
+
+    Map<String, Object> emailScheme = new HashMap<>();
+    Map<String, Set<String>> recipients = new HashMap<>();
+    recipients.put(PROP_TO, AlertFilterUtils.PROP_TO_VALUE);
+    recipients.put(PROP_CC, AlertFilterUtils.PROP_CC_VALUE);
+    recipients.put(PROP_BCC, AlertFilterUtils.PROP_BCC_VALUE);
+    emailScheme.put(PROP_RECIPIENTS, recipients);
+    defaultScheme.put(PROP_EMAIL_SCHEME, emailScheme);
+    alertConfig.setAlertSchemes(defaultScheme);
+
+    Map<Long, Long> vectorClocks = new HashMap<>();
+    vectorClocks.put(detectionConfigIds.get(0), this.baseTime);
+    vectorClocks.put(detectionConfigIds.get(1), this.baseTime);
+    alertConfig.setVectorClocks(vectorClocks);
+
+    return alertConfig;
+  }
+
+  @Test
+  public void testAlertFilterRecipients() throws Exception {
+    this.alertFilter = new AnomalySeverityAlertFilter(provider, alertConfig, this.baseTime + 350L);
+
+    DetectionAlertFilterResult result = this.alertFilter.run();
+    Assert.assertEquals(result.getResult().size(), 3);
+
+    int verifiedResult = 0;
+    for (Map.Entry<DetectionAlertFilterNotification, Set<MergedAnomalyResultDTO>> entry : result.getResult()
+        .entrySet()) {
+      if (entry.getValue().equals(makeSet(0, 1))) {
+        Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify1.get("notify"));
+        verifiedResult++;
+      } else if (entry.getValue().equals(makeSet(2))) {
+        Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify2.get("notify"));
+        verifiedResult++;
+      } else if (entry.getValue().equals(makeSet(3, 4, 5))) {
+        Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), defaultScheme);
+        verifiedResult++;
+      }
+    }
+    Assert.assertEquals(verifiedResult, 3);
+  }
+
+  @Test
+  public void testRenotifyAnomaly() throws Exception {
+    AnomalySubscriptionGroupNotificationManager renotificationManager =
+        DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
+    AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotification =
+        new AnomalySubscriptionGroupNotificationDTO();
+    anomalySubscriptionGroupNotification.setAnomalyId(renotifyAnomaly.getId());
+    anomalySubscriptionGroupNotification.setDetectionConfigId(renotifyAnomaly.getDetectionConfigId());
+    renotificationManager.save(anomalySubscriptionGroupNotification);
+
+    this.alertFilter = new AnomalySeverityAlertFilter(provider, alertConfig, this.baseTime + 350L);
+
+    DetectionAlertFilterResult result = this.alertFilter.run();
+    Assert.assertEquals(result.getResult().size(), 3);
+
+    int verifiedResult = 0;
+    for (Map.Entry<DetectionAlertFilterNotification, Set<MergedAnomalyResultDTO>> entry : result.getResult()
+        .entrySet()) {
+      if (entry.getValue().equals(makeSet(renotifyAnomaly, 0, 1))) {
+        Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify1.get("notify"));
+        verifiedResult++;
+      } else if (entry.getValue().equals(makeSet(2))) {
+        Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify2.get("notify"));
+        verifiedResult++;
+      } else if (entry.getValue().equals(makeSet(3, 4, 5))) {
+        Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), defaultScheme);
+        verifiedResult++;
+      }
+    }
+    Assert.assertEquals(verifiedResult, 3);
+  }
+
+  private Set<MergedAnomalyResultDTO> makeSet(MergedAnomalyResultDTO anomaly, int... anomalyIndices) {
+    Set<MergedAnomalyResultDTO> set = makeSet(anomalyIndices);
+    set.add(anomaly);
+    return set;
+  }
+
+  private Set<MergedAnomalyResultDTO> makeSet(int... anomalyIndices) {
+    Set<MergedAnomalyResultDTO> output = new HashSet<>();
+    for (int anomalyIndex : anomalyIndices) {
+      output.add(this.detectionAnomalies.get(anomalyIndex));
+    }
+    return output;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org